3 * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4 * Author: Thiago Santos <thiagoss@osg.samsung.com>
6 * Copyright (C) 2021-2022 Centricular Ltd
7 * Author: Edward Hervey <edward@centricular.com>
8 * Author: Jan Schmidt <jan@centricular.com>
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Library General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Library General Public License for more details.
20 * You should have received a copy of the GNU Library General Public
21 * License along with this library; if not, write to the
22 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23 * Boston, MA 02110-1301, USA.
27 * SECTION:plugin-adaptivedemux2
28 * @short_description: Next Generation adaptive demuxers
30 * What is an adaptive demuxer? Adaptive demuxers are special demuxers in the
31 * sense that they don't actually demux data received from upstream but download
32 * the data themselves.
34 * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and a set
35 * of fragments. The manifest describes the available media and the sequence of
36 * fragments to use. Each fragment contains a small part of the media (typically
37 * only a few seconds). It is possible for the manifest to have the same media
38 * available in different configurations (bitrates for example) so that the
39 * client can select the one that best suits its scenario (network fluctuation,
40 * hardware requirements...).
42 * Furthermore, that manifest can also specify alternative medias (such as audio
43 * or subtitle tracks in different languages). Only the fragments for the
44 * requested selection will be download.
46 * These elements can therefore "adapt" themselves to the network conditions (as
47 * opposed to the server doing that adaptation) and user choices, which is why
48 * they are called "adaptive" demuxers.
50 * Note: These elements require a "streams-aware" container to work
51 * (i.e. urisourcebin, decodebin3, playbin3, or any bin/pipeline with the
52 * GST_BIN_FLAG_STREAMS_AWARE flag set).
55 * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
56 * about the intrinsics of the subclass formats, so the subclasses are
57 * responsible for maintaining the manifest data structures and stream
64 See the adaptive-demuxer.md design documentation for more information
67 The following rules were observed while implementing MT safety in adaptive demux:
68 1. If a variable is accessed from multiple threads and at least one thread
69 writes to it, then all the accesses needs to be done from inside a critical section.
70 2. If thread A wants to join thread B then at the moment it calls gst_task_join
71 it must not hold any mutexes that thread B might take.
73 Adaptive demux API can be called from several threads. More, adaptive demux
74 starts some threads to monitor the download of fragments. In order to protect
75 accesses to shared variables (demux and streams) all the API functions that
76 can be run in different threads will need to get a mutex (manifest_lock)
77 when they start and release it when they end. Because some of those functions
78 can indirectly call other API functions (eg they can generate events or messages
79 that are processed in the same thread) the manifest_lock must be recursive.
81 The manifest_lock will serialize the public API making access to shared
82 variables safe. But some of these functions will try at some moment to join
83 threads created by adaptive demux, or to change the state of src elements
84 (which will block trying to join the src element streaming thread). Because
85 of rule 2, those functions will need to release the manifest_lock during the
86 call of gst_task_join. During this time they can be interrupted by other API calls.
87 For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
88 is called and this will join all threads. In order to prevent interruptions
89 during such period, all the API functions will also use a second lock: api_lock.
90 This will be taken at the beginning of the function and released at the end,
91 but this time this lock will not be temporarily released during join.
92 This lock will be used only by API calls (not by the SCHEDULER task)
93 so it is safe to hold it while joining the threads or changing the src element state. The
94 api_lock will serialise all external requests to adaptive demux. In order to
95 avoid deadlocks, if a function needs to acquire both manifest and api locks,
96 the api_lock will be taken first and the manifest_lock second.
98 By using the api_lock a thread is protected against other API calls.
105 #include "gstadaptivedemux.h"
106 #include "gstadaptivedemux-private.h"
108 #include <glib/gi18n-lib.h>
109 #include <gst/base/gstadapter.h>
110 #include <gst/app/gstappsrc.h>
112 GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
113 #define GST_CAT_DEFAULT adaptivedemux2_debug
115 #define DEFAULT_FAILED_COUNT 3
116 #define DEFAULT_CONNECTION_BITRATE 0
117 #define DEFAULT_BANDWIDTH_TARGET_RATIO 0.8f
119 #define DEFAULT_MIN_BITRATE 0
120 #define DEFAULT_MAX_BITRATE 0
122 #define DEFAULT_MAX_BUFFERING_TIME (30 * GST_SECOND)
124 #define DEFAULT_BUFFERING_HIGH_WATERMARK_TIME (30 * GST_SECOND)
125 #define DEFAULT_BUFFERING_LOW_WATERMARK_TIME 0 /* Automatic */
126 #define DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS 0.0
127 #define DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS 0.0
129 #define DEFAULT_CURRENT_LEVEL_TIME_VIDEO 0
130 #define DEFAULT_CURRENT_LEVEL_TIME_AUDIO 0
132 #define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
133 #define GST_API_LOCK(d) g_mutex_lock (GST_API_GET_LOCK (d));
134 #define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
139 PROP_CONNECTION_SPEED,
140 PROP_BANDWIDTH_TARGET_RATIO,
141 PROP_CONNECTION_BITRATE,
144 PROP_CURRENT_BANDWIDTH,
145 PROP_MAX_BUFFERING_TIME,
146 PROP_BUFFERING_HIGH_WATERMARK_TIME,
147 PROP_BUFFERING_LOW_WATERMARK_TIME,
148 PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
149 PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
150 PROP_CURRENT_LEVEL_TIME_VIDEO,
151 PROP_CURRENT_LEVEL_TIME_AUDIO,
155 static GstStaticPadTemplate gst_adaptive_demux_videosrc_template =
156 GST_STATIC_PAD_TEMPLATE ("video_%02u",
159 GST_STATIC_CAPS_ANY);
161 static GstStaticPadTemplate gst_adaptive_demux_audiosrc_template =
162 GST_STATIC_PAD_TEMPLATE ("audio_%02u",
165 GST_STATIC_CAPS_ANY);
167 static GstStaticPadTemplate gst_adaptive_demux_subtitlesrc_template =
168 GST_STATIC_PAD_TEMPLATE ("subtitle_%02u",
171 GST_STATIC_CAPS_ANY);
173 /* Private structure for a track being outputted */
174 typedef struct _OutputSlot
179 /* Last flow return */
180 GstFlowReturn flow_ret;
185 /* Target track (reference) */
186 GstAdaptiveDemuxTrack *track;
188 /* Pending track (which will replace track) */
189 GstAdaptiveDemuxTrack *pending_track;
191 /* TRUE if a buffer or a gap event was pushed through this slot. */
192 gboolean pushed_timed_data;
195 static GstBinClass *parent_class = NULL;
196 static gint private_offset = 0;
198 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
199 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
200 GstAdaptiveDemuxClass * klass);
201 static void gst_adaptive_demux_finalize (GObject * object);
202 static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
203 element, GstStateChange transition);
204 static gboolean gst_adaptive_demux_query (GstElement * element,
206 static gboolean gst_adaptive_demux_send_event (GstElement * element,
209 static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
211 static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
213 static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
214 GstObject * parent, GstBuffer * buffer);
215 static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
217 static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
219 static gboolean gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux,
221 static gboolean gst_adaptive_demux_handle_select_streams_event (GstAdaptiveDemux
222 * demux, GstEvent * event);
225 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
227 static void gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux);
228 static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
229 static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
230 gboolean first_and_live);
233 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
235 static void gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux *
237 static void gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux *
240 static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux);
241 static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux,
242 gboolean stop_updates);
245 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
248 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
249 * method to get to the padtemplates */
251 gst_adaptive_demux_ng_get_type (void)
253 static gsize type = 0;
255 if (g_once_init_enter (&type)) {
257 static const GTypeInfo info = {
258 sizeof (GstAdaptiveDemuxClass),
261 (GClassInitFunc) gst_adaptive_demux_class_init,
264 sizeof (GstAdaptiveDemux),
266 (GInstanceInitFunc) gst_adaptive_demux_init,
269 _type = g_type_register_static (GST_TYPE_BIN,
270 "GstAdaptiveDemux2", &info, G_TYPE_FLAG_ABSTRACT);
273 g_type_add_instance_private (_type, sizeof (GstAdaptiveDemuxPrivate));
275 g_once_init_leave (&type, _type);
280 static inline GstAdaptiveDemuxPrivate *
281 gst_adaptive_demux_get_instance_private (GstAdaptiveDemux * self)
283 return (G_STRUCT_MEMBER_P (self, private_offset));
287 gst_adaptive_demux_set_property (GObject * object, guint prop_id,
288 const GValue * value, GParamSpec * pspec)
290 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
292 GST_OBJECT_LOCK (demux);
295 case PROP_CONNECTION_SPEED:
296 demux->connection_speed = g_value_get_uint (value) * 1000;
297 GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
298 demux->connection_speed);
300 case PROP_BANDWIDTH_TARGET_RATIO:
301 demux->bandwidth_target_ratio = g_value_get_float (value);
303 case PROP_MIN_BITRATE:
304 demux->min_bitrate = g_value_get_uint (value);
306 case PROP_MAX_BITRATE:
307 demux->max_bitrate = g_value_get_uint (value);
309 case PROP_CONNECTION_BITRATE:
310 demux->connection_speed = g_value_get_uint (value);
312 /* FIXME: Recalculate track and buffering levels
313 * when watermarks change? */
314 case PROP_MAX_BUFFERING_TIME:
315 demux->max_buffering_time = g_value_get_uint64 (value);
317 case PROP_BUFFERING_HIGH_WATERMARK_TIME:
318 demux->buffering_high_watermark_time = g_value_get_uint64 (value);
320 case PROP_BUFFERING_LOW_WATERMARK_TIME:
321 demux->buffering_low_watermark_time = g_value_get_uint64 (value);
323 case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
324 demux->buffering_high_watermark_fragments = g_value_get_double (value);
326 case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
327 demux->buffering_low_watermark_fragments = g_value_get_double (value);
330 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
334 GST_OBJECT_UNLOCK (demux);
338 gst_adaptive_demux_get_property (GObject * object, guint prop_id,
339 GValue * value, GParamSpec * pspec)
341 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
343 GST_OBJECT_LOCK (demux);
346 case PROP_CONNECTION_SPEED:
347 g_value_set_uint (value, demux->connection_speed / 1000);
349 case PROP_BANDWIDTH_TARGET_RATIO:
350 g_value_set_float (value, demux->bandwidth_target_ratio);
352 case PROP_MIN_BITRATE:
353 g_value_set_uint (value, demux->min_bitrate);
355 case PROP_MAX_BITRATE:
356 g_value_set_uint (value, demux->max_bitrate);
358 case PROP_CONNECTION_BITRATE:
359 g_value_set_uint (value, demux->connection_speed);
361 case PROP_CURRENT_BANDWIDTH:
362 g_value_set_uint (value, demux->current_download_rate);
364 case PROP_MAX_BUFFERING_TIME:
365 g_value_set_uint64 (value, demux->max_buffering_time);
367 case PROP_BUFFERING_HIGH_WATERMARK_TIME:
368 g_value_set_uint64 (value, demux->buffering_high_watermark_time);
370 case PROP_BUFFERING_LOW_WATERMARK_TIME:
371 g_value_set_uint64 (value, demux->buffering_low_watermark_time);
373 case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
374 g_value_set_double (value, demux->buffering_high_watermark_fragments);
376 case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
377 g_value_set_double (value, demux->buffering_low_watermark_fragments);
379 case PROP_CURRENT_LEVEL_TIME_VIDEO:
380 g_value_set_uint64 (value, demux->current_level_time_video);
382 case PROP_CURRENT_LEVEL_TIME_AUDIO:
383 g_value_set_uint64 (value, demux->current_level_time_audio);
386 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
390 GST_OBJECT_UNLOCK (demux);
394 gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
396 GObjectClass *gobject_class;
397 GstElementClass *gstelement_class;
398 GstBinClass *gstbin_class;
400 gobject_class = G_OBJECT_CLASS (klass);
401 gstelement_class = GST_ELEMENT_CLASS (klass);
402 gstbin_class = GST_BIN_CLASS (klass);
404 GST_DEBUG_CATEGORY_INIT (adaptivedemux2_debug, "adaptivedemux2", 0,
405 "Base Adaptive Demux (ng)");
407 parent_class = g_type_class_peek_parent (klass);
409 if (private_offset != 0)
410 g_type_class_adjust_private_offset (klass, &private_offset);
412 gobject_class->set_property = gst_adaptive_demux_set_property;
413 gobject_class->get_property = gst_adaptive_demux_get_property;
414 gobject_class->finalize = gst_adaptive_demux_finalize;
416 g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
417 g_param_spec_uint ("connection-speed", "Connection Speed",
418 "Network connection speed to use in kbps (0 = calculate from downloaded"
419 " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_BITRATE / 1000,
420 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
422 g_object_class_install_property (gobject_class, PROP_BANDWIDTH_TARGET_RATIO,
423 g_param_spec_float ("bandwidth-target-ratio",
424 "Ratio of target bandwidth / available bandwidth",
425 "Limit of the available bitrate to use when switching to alternates",
426 0, 1, DEFAULT_BANDWIDTH_TARGET_RATIO,
427 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
429 g_object_class_install_property (gobject_class, PROP_CONNECTION_BITRATE,
430 g_param_spec_uint ("connection-bitrate", "Connection Speed (bits/s)",
431 "Network connection speed to use (0 = automatic) (bits/s)",
432 0, G_MAXUINT, DEFAULT_CONNECTION_BITRATE,
433 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
435 g_object_class_install_property (gobject_class, PROP_MIN_BITRATE,
436 g_param_spec_uint ("min-bitrate", "Minimum Bitrate",
437 "Minimum bitrate to use when switching to alternates (bits/s)",
438 0, G_MAXUINT, DEFAULT_MIN_BITRATE,
439 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
441 g_object_class_install_property (gobject_class, PROP_MAX_BITRATE,
442 g_param_spec_uint ("max-bitrate", "Maximum Bitrate",
443 "Maximum bitrate to use when switching to alternates (bits/s)",
444 0, G_MAXUINT, DEFAULT_MAX_BITRATE,
445 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
447 g_object_class_install_property (gobject_class, PROP_CURRENT_BANDWIDTH,
448 g_param_spec_uint ("current-bandwidth",
449 "Current download bandwidth (bits/s)",
450 "Report of current download bandwidth (based on arriving data) (bits/s)",
451 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
453 g_object_class_install_property (gobject_class, PROP_MAX_BUFFERING_TIME,
454 g_param_spec_uint64 ("max-buffering-time",
455 "Buffering maximum size (ns)",
456 "Upper limit on the high watermark for parsed data, above which downloads are paused (in ns, 0=disable)",
457 0, G_MAXUINT64, DEFAULT_MAX_BUFFERING_TIME,
458 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
459 G_PARAM_STATIC_STRINGS));
461 g_object_class_install_property (gobject_class,
462 PROP_BUFFERING_HIGH_WATERMARK_TIME,
463 g_param_spec_uint64 ("high-watermark-time",
464 "High buffering watermark size (ns)",
465 "High watermark for parsed data above which downloads are paused (in ns, 0=disable)",
466 0, G_MAXUINT64, DEFAULT_BUFFERING_HIGH_WATERMARK_TIME,
467 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
468 G_PARAM_STATIC_STRINGS));
470 g_object_class_install_property (gobject_class,
471 PROP_BUFFERING_LOW_WATERMARK_TIME,
472 g_param_spec_uint64 ("low-watermark-time",
473 "Low buffering watermark size (ns)",
474 "Low watermark for parsed data below which downloads are resumed (in ns, 0=automatic)",
475 0, G_MAXUINT64, DEFAULT_BUFFERING_LOW_WATERMARK_TIME,
476 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
477 G_PARAM_STATIC_STRINGS));
479 g_object_class_install_property (gobject_class,
480 PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
481 g_param_spec_double ("high-watermark-fragments",
482 "High buffering watermark size (fragments)",
483 "High watermark for parsed data above which downloads are paused (in fragments, 0=disable)",
484 0, G_MAXFLOAT, DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
485 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
486 G_PARAM_STATIC_STRINGS));
488 g_object_class_install_property (gobject_class,
489 PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
490 g_param_spec_double ("low-watermark-fragments",
491 "Low buffering watermark size (fragments)",
492 "Low watermark for parsed data below which downloads are resumed (in fragments, 0=disable)",
493 0, G_MAXFLOAT, DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS,
494 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
495 G_PARAM_STATIC_STRINGS));
497 g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_VIDEO,
498 g_param_spec_uint64 ("current-level-time-video",
499 "Currently buffered level of video (ns)",
500 "Currently buffered level of video track(s) (ns)",
501 0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_VIDEO,
502 G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
503 G_PARAM_STATIC_STRINGS));
505 g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_AUDIO,
506 g_param_spec_uint64 ("current-level-time-audio",
507 "Currently buffered level of audio (ns)",
508 "Currently buffered level of audio track(s) (ns)",
509 0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_AUDIO,
510 G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
511 G_PARAM_STATIC_STRINGS));
513 gst_element_class_add_static_pad_template (gstelement_class,
514 &gst_adaptive_demux_audiosrc_template);
515 gst_element_class_add_static_pad_template (gstelement_class,
516 &gst_adaptive_demux_videosrc_template);
517 gst_element_class_add_static_pad_template (gstelement_class,
518 &gst_adaptive_demux_subtitlesrc_template);
520 gstelement_class->change_state = gst_adaptive_demux_change_state;
521 gstelement_class->query = gst_adaptive_demux_query;
522 gstelement_class->send_event = gst_adaptive_demux_send_event;
524 gstbin_class->handle_message = gst_adaptive_demux_handle_message;
526 klass->update_manifest = gst_adaptive_demux_update_manifest_default;
527 klass->requires_periodical_playlist_update =
528 gst_adaptive_demux_requires_periodical_playlist_update_default;
529 gst_type_mark_as_plugin_api (GST_TYPE_ADAPTIVE_DEMUX, 0);
533 gst_adaptive_demux_init (GstAdaptiveDemux * demux,
534 GstAdaptiveDemuxClass * klass)
536 GstPadTemplate *pad_template;
538 GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
540 demux->priv = gst_adaptive_demux_get_instance_private (demux);
541 demux->priv->input_adapter = gst_adapter_new ();
542 demux->realtime_clock = gst_adaptive_demux_clock_new ();
544 demux->download_helper = downloadhelper_new (demux->realtime_clock);
545 demux->priv->segment_seqnum = gst_util_seqnum_next ();
546 demux->have_group_id = FALSE;
547 demux->group_id = G_MAXUINT;
549 gst_segment_init (&demux->segment, GST_FORMAT_TIME);
550 demux->instant_rate_multiplier = 1.0;
552 GST_OBJECT_FLAG_SET (demux, GST_BIN_FLAG_STREAMS_AWARE);
553 gst_bin_set_suppressed_flags (GST_BIN_CAST (demux),
554 GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
556 g_rec_mutex_init (&demux->priv->manifest_lock);
558 demux->priv->scheduler_task = gst_adaptive_demux_loop_new ();
560 g_mutex_init (&demux->priv->api_lock);
561 g_mutex_init (&demux->priv->segment_lock);
563 g_mutex_init (&demux->priv->tracks_lock);
564 g_cond_init (&demux->priv->tracks_add);
566 g_mutex_init (&demux->priv->buffering_lock);
568 demux->priv->periods = g_queue_new ();
571 gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
572 g_return_if_fail (pad_template != NULL);
574 demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
575 gst_pad_set_event_function (demux->sinkpad,
576 GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
577 gst_pad_set_chain_function (demux->sinkpad,
578 GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));
581 demux->bandwidth_target_ratio = DEFAULT_BANDWIDTH_TARGET_RATIO;
582 demux->connection_speed = DEFAULT_CONNECTION_BITRATE;
583 demux->min_bitrate = DEFAULT_MIN_BITRATE;
584 demux->max_bitrate = DEFAULT_MAX_BITRATE;
586 demux->max_buffering_time = DEFAULT_MAX_BUFFERING_TIME;
587 demux->buffering_high_watermark_time = DEFAULT_BUFFERING_HIGH_WATERMARK_TIME;
588 demux->buffering_low_watermark_time = DEFAULT_BUFFERING_LOW_WATERMARK_TIME;
589 demux->buffering_high_watermark_fragments =
590 DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS;
591 demux->buffering_low_watermark_fragments =
592 DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS;
594 demux->current_level_time_video = DEFAULT_CURRENT_LEVEL_TIME_VIDEO;
595 demux->current_level_time_audio = DEFAULT_CURRENT_LEVEL_TIME_AUDIO;
597 gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
599 demux->priv->duration = GST_CLOCK_TIME_NONE;
601 /* Output combiner */
602 demux->priv->flowcombiner = gst_flow_combiner_new ();
605 g_rec_mutex_init (&demux->priv->output_lock);
606 demux->priv->output_task =
607 gst_task_new ((GstTaskFunction) gst_adaptive_demux_output_loop, demux,
609 gst_task_set_lock (demux->priv->output_task, &demux->priv->output_lock);
613 gst_adaptive_demux_finalize (GObject * object)
615 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
616 GstAdaptiveDemuxPrivate *priv = demux->priv;
618 GST_DEBUG_OBJECT (object, "finalize");
620 g_object_unref (priv->input_adapter);
622 downloadhelper_free (demux->download_helper);
624 g_rec_mutex_clear (&demux->priv->manifest_lock);
625 g_mutex_clear (&demux->priv->api_lock);
626 g_mutex_clear (&demux->priv->segment_lock);
628 g_mutex_clear (&demux->priv->buffering_lock);
630 gst_adaptive_demux_loop_unref (demux->priv->scheduler_task);
632 /* The input period is present after a reset, clear it now */
633 if (demux->input_period)
634 gst_adaptive_demux_period_unref (demux->input_period);
636 if (demux->realtime_clock) {
637 gst_adaptive_demux_clock_unref (demux->realtime_clock);
638 demux->realtime_clock = NULL;
640 g_object_unref (priv->output_task);
641 g_rec_mutex_clear (&priv->output_lock);
643 gst_flow_combiner_free (priv->flowcombiner);
645 g_queue_free (priv->periods);
647 G_OBJECT_CLASS (parent_class)->finalize (object);
651 gst_adaptive_demux_check_streams_aware (GstAdaptiveDemux * demux)
653 gboolean ret = FALSE;
654 GstObject *parent = gst_object_get_parent (GST_OBJECT (demux));
657 ret = GST_OBJECT_FLAG_IS_SET (parent, GST_BIN_FLAG_STREAMS_AWARE);
658 gst_object_unref (parent);
664 static GstStateChangeReturn
665 gst_adaptive_demux_change_state (GstElement * element,
666 GstStateChange transition)
668 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
669 GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
671 switch (transition) {
672 case GST_STATE_CHANGE_NULL_TO_READY:
673 if (!gst_adaptive_demux_check_streams_aware (demux)) {
674 GST_ELEMENT_ERROR (demux, CORE, STATE_CHANGE,
675 (_("Element requires a streams-aware context.")), (NULL));
679 case GST_STATE_CHANGE_PAUSED_TO_READY:
680 if (g_atomic_int_compare_and_exchange (&demux->running, TRUE, FALSE))
681 GST_DEBUG_OBJECT (demux, "demuxer has stopped running");
683 gst_adaptive_demux_loop_stop (demux->priv->scheduler_task, TRUE);
684 downloadhelper_stop (demux->download_helper);
687 demux->priv->flushing = TRUE;
688 g_cond_signal (&demux->priv->tracks_add);
689 gst_task_stop (demux->priv->output_task);
690 TRACKS_UNLOCK (demux);
692 gst_task_join (demux->priv->output_task);
694 GST_API_LOCK (demux);
695 gst_adaptive_demux_reset (demux);
696 GST_API_UNLOCK (demux);
698 case GST_STATE_CHANGE_READY_TO_PAUSED:
699 GST_API_LOCK (demux);
700 gst_adaptive_demux_reset (demux);
702 gst_adaptive_demux_loop_start (demux->priv->scheduler_task);
703 if (g_atomic_int_get (&demux->priv->have_manifest))
704 gst_adaptive_demux_start_manifest_update_task (demux);
705 GST_API_UNLOCK (demux);
706 if (g_atomic_int_compare_and_exchange (&demux->running, FALSE, TRUE))
707 GST_DEBUG_OBJECT (demux, "demuxer has started running");
708 /* gst_task_start (demux->priv->output_task); */
714 /* this must be run with the scheduler and output tasks stopped. */
715 result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
717 switch (transition) {
718 case GST_STATE_CHANGE_READY_TO_PAUSED:
719 /* Start download task */
720 downloadhelper_start (demux->download_helper);
731 gst_adaptive_demux_output_slot_free (GstAdaptiveDemux * demux,
734 GstEvent *eos = gst_event_new_eos ();
735 GST_DEBUG_OBJECT (slot->pad, "Releasing slot");
737 /* FIXME: The slot might not have output any data, caps or segment yet */
738 gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
739 gst_pad_push_event (slot->pad, eos);
740 gst_pad_set_active (slot->pad, FALSE);
741 gst_flow_combiner_remove_pad (demux->priv->flowcombiner, slot->pad);
742 gst_element_remove_pad (GST_ELEMENT_CAST (demux), slot->pad);
744 gst_adaptive_demux_track_unref (slot->track);
745 if (slot->pending_track)
746 gst_adaptive_demux_track_unref (slot->pending_track);
748 g_slice_free (OutputSlot, slot);
752 gst_adaptive_demux_output_slot_new (GstAdaptiveDemux * demux,
753 GstStreamType streamtype)
756 GstPadTemplate *tmpl;
759 switch (streamtype) {
760 case GST_STREAM_TYPE_AUDIO:
761 name = g_strdup_printf ("audio_%02u", demux->priv->n_audio_streams++);
763 gst_static_pad_template_get (&gst_adaptive_demux_audiosrc_template);
765 case GST_STREAM_TYPE_VIDEO:
766 name = g_strdup_printf ("video_%02u", demux->priv->n_video_streams++);
768 gst_static_pad_template_get (&gst_adaptive_demux_videosrc_template);
770 case GST_STREAM_TYPE_TEXT:
772 g_strdup_printf ("subtitle_%02u", demux->priv->n_subtitle_streams++);
774 gst_static_pad_template_get
775 (&gst_adaptive_demux_subtitlesrc_template);
778 g_assert_not_reached ();
782 slot = g_slice_new0 (OutputSlot);
783 slot->type = streamtype;
784 slot->pushed_timed_data = FALSE;
786 /* Create and activate new pads */
787 slot->pad = gst_pad_new_from_template (tmpl, name);
789 gst_object_unref (tmpl);
791 gst_element_add_pad (GST_ELEMENT_CAST (demux), slot->pad);
792 gst_flow_combiner_add_pad (demux->priv->flowcombiner, slot->pad);
793 gst_pad_set_active (slot->pad, TRUE);
795 gst_pad_set_query_function (slot->pad,
796 GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
797 gst_pad_set_event_function (slot->pad,
798 GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_event));
800 gst_pad_set_element_private (slot->pad, slot);
802 GST_INFO_OBJECT (demux, "Created output slot %s:%s",
803 GST_DEBUG_PAD_NAME (slot->pad));
808 * * After `process_manifest` or when a period starts
809 * * Or when all tracks have been created
811 * Goes over tracks and creates the collection
813 * Returns TRUE if the collection was fully created.
815 * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
818 gst_adaptive_demux_update_collection (GstAdaptiveDemux * demux,
819 GstAdaptiveDemuxPeriod * period)
821 GstStreamCollection *collection;
824 GST_DEBUG_OBJECT (demux, "tracks_changed : %d", period->tracks_changed);
826 if (!period->tracks_changed) {
827 GST_DEBUG_OBJECT (demux, "Tracks didn't change");
831 if (!period->tracks) {
832 GST_WARNING_OBJECT (demux, "No tracks registered/present");
836 if (gst_adaptive_demux_period_has_pending_tracks (period)) {
837 GST_DEBUG_OBJECT (demux,
838 "Streams still have pending tracks, not creating/updating collection");
842 /* Update collection */
843 collection = gst_stream_collection_new ("adaptivedemux");
845 for (iter = period->tracks; iter; iter = iter->next) {
846 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
848 GST_DEBUG_OBJECT (demux, "Adding '%s' to collection", track->stream_id);
849 gst_stream_collection_add_stream (collection,
850 gst_object_ref (track->stream_object));
853 if (period->collection)
854 gst_object_unref (period->collection);
855 period->collection = collection;
861 * Called for the output period:
862 * * after `update_collection()` if the input period is the same as the output period
863 * * When the output period changes
865 * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
868 gst_adaptive_demux_post_collection (GstAdaptiveDemux * demux)
870 GstStreamCollection *collection;
871 GstAdaptiveDemuxPeriod *period = demux->output_period;
872 guint32 seqnum = g_atomic_int_get (&demux->priv->requested_selection_seqnum);
874 g_return_val_if_fail (period, FALSE);
875 if (!period->collection) {
876 GST_DEBUG_OBJECT (demux, "No collection available yet");
880 collection = period->collection;
882 GST_DEBUG_OBJECT (demux, "Posting collection for period %d",
885 /* Post collection */
886 TRACKS_UNLOCK (demux);
887 GST_MANIFEST_UNLOCK (demux);
889 gst_element_post_message (GST_ELEMENT_CAST (demux),
890 gst_message_new_stream_collection (GST_OBJECT (demux), collection));
892 GST_MANIFEST_LOCK (demux);
895 /* If no stream selection was handled, make a default selection */
896 if (seqnum == g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
897 gst_adaptive_demux_period_select_default_tracks (demux,
898 demux->output_period);
901 /* Make sure the output task is running */
902 if (gst_adaptive_demux2_is_running (demux)) {
903 demux->priv->flushing = FALSE;
904 GST_DEBUG_OBJECT (demux, "Starting the output task");
905 gst_task_start (demux->priv->output_task);
912 handle_incoming_manifest (GstAdaptiveDemux * demux)
914 GstAdaptiveDemuxClass *demux_class;
919 GstBuffer *manifest_buffer;
921 GST_API_LOCK (demux);
922 GST_MANIFEST_LOCK (demux);
924 demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
926 available = gst_adapter_available (demux->priv->input_adapter);
929 goto eos_without_data;
931 GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
933 /* Need to get the URI to use it as a base to generate the fragment's
935 query = gst_query_new_uri ();
936 query_res = gst_pad_peer_query (demux->sinkpad, query);
938 gchar *uri, *redirect_uri;
941 gst_query_parse_uri (query, &uri);
942 gst_query_parse_uri_redirection (query, &redirect_uri);
943 gst_query_parse_uri_redirection_permanent (query, &permanent);
945 if (permanent && redirect_uri) {
946 demux->manifest_uri = redirect_uri;
947 demux->manifest_base_uri = NULL;
950 demux->manifest_uri = uri;
951 demux->manifest_base_uri = redirect_uri;
954 GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
955 demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
957 if (!g_str_has_prefix (demux->manifest_uri, "data:")
958 && !g_str_has_prefix (demux->manifest_uri, "http://")
959 && !g_str_has_prefix (demux->manifest_uri, "https://")) {
960 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
961 (_("Invalid manifest URI")),
962 ("Manifest URI needs to use either data:, http:// or https://"));
963 gst_query_unref (query);
968 GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
970 gst_query_unref (query);
972 /* If somehow we didn't receive a stream-start with a group_id, pick one now */
973 if (!demux->have_group_id) {
974 demux->have_group_id = TRUE;
975 demux->group_id = gst_util_group_id_next ();
978 /* Let the subclass parse the manifest */
980 gst_adapter_take_buffer (demux->priv->input_adapter, available);
981 ret = demux_class->process_manifest (demux, manifest_buffer);
982 gst_buffer_unref (manifest_buffer);
984 gst_element_post_message (GST_ELEMENT_CAST (demux),
985 gst_message_new_element (GST_OBJECT_CAST (demux),
986 gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
987 "manifest-uri", G_TYPE_STRING,
988 demux->manifest_uri, "uri", G_TYPE_STRING,
990 "manifest-download-start", GST_TYPE_CLOCK_TIME,
992 "manifest-download-stop", GST_TYPE_CLOCK_TIME,
993 gst_util_get_timestamp (), NULL)));
996 goto invalid_manifest;
998 /* Streams should have been added to the input period if the manifest parsing
1000 if (!demux->input_period->streams)
1003 g_atomic_int_set (&demux->priv->have_manifest, TRUE);
1005 GST_DEBUG_OBJECT (demux, "Manifest was processed, setting ourselves up");
1006 /* Send duration message */
1007 if (!gst_adaptive_demux_is_live (demux)) {
1008 GstClockTime duration = demux_class->get_duration (demux);
1010 demux->priv->duration = duration;
1011 if (duration != GST_CLOCK_TIME_NONE) {
1012 GST_DEBUG_OBJECT (demux,
1013 "Sending duration message : %" GST_TIME_FORMAT,
1014 GST_TIME_ARGS (duration));
1015 gst_element_post_message (GST_ELEMENT (demux),
1016 gst_message_new_duration_changed (GST_OBJECT (demux)));
1018 GST_DEBUG_OBJECT (demux,
1019 "media duration unknown, can not send the duration message");
1023 TRACKS_LOCK (demux);
1024 /* New streams/tracks will have been added to the input period */
1025 /* The input period has streams, make it the active output period */
1026 /* FIXME : Factorize this into a function to make a period active */
1027 demux->output_period = gst_adaptive_demux_period_ref (demux->input_period);
1028 ret = gst_adaptive_demux_update_collection (demux, demux->output_period) &&
1029 gst_adaptive_demux_post_collection (demux);
1030 TRACKS_UNLOCK (demux);
1032 gst_adaptive_demux_prepare_streams (demux,
1033 gst_adaptive_demux_is_live (demux));
1034 gst_adaptive_demux_start_tasks (demux);
1035 gst_adaptive_demux_start_manifest_update_task (demux);
1038 GST_MANIFEST_UNLOCK (demux);
1039 GST_API_UNLOCK (demux);
1046 GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
1054 GST_WARNING_OBJECT (demux, "No streams created from manifest");
1055 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1056 (_("This file contains no playable streams.")),
1057 ("No known stream formats found at the Manifest"));
1064 GST_MANIFEST_UNLOCK (demux);
1065 GST_API_UNLOCK (demux);
1067 /* In most cases, this will happen if we set a wrong url in the
1068 * source element and we have received the 404 HTML response instead of
1070 GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."), (NULL));
1075 struct http_headers_collector
1077 GstAdaptiveDemux *demux;
1082 gst_adaptive_demux_handle_upstream_http_header (GQuark field_id,
1083 const GValue * value, gpointer userdata)
1085 struct http_headers_collector *hdr_data = userdata;
1086 GstAdaptiveDemux *demux = hdr_data->demux;
1087 const gchar *field_name = g_quark_to_string (field_id);
1089 if (G_UNLIKELY (value == NULL))
1090 return TRUE; /* This should not happen */
1092 if (g_ascii_strcasecmp (field_name, "User-Agent") == 0) {
1093 const gchar *user_agent = g_value_get_string (value);
1095 GST_INFO_OBJECT (demux, "User-Agent : %s", GST_STR_NULL (user_agent));
1096 downloadhelper_set_user_agent (demux->download_helper, user_agent);
1099 if ((g_ascii_strcasecmp (field_name, "Cookie") == 0) ||
1100 g_ascii_strcasecmp (field_name, "Set-Cookie") == 0) {
1101 guint i = 0, prev_len = 0, total_len = 0;
1102 gchar **cookies = NULL;
1104 if (hdr_data->cookies != NULL)
1105 prev_len = g_strv_length (hdr_data->cookies);
1107 if (GST_VALUE_HOLDS_ARRAY (value)) {
1108 total_len = gst_value_array_get_size (value) + prev_len;
1109 cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1111 for (i = 0; i < gst_value_array_get_size (value); i++) {
1112 GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1113 g_value_get_string (gst_value_array_get_value (value, i)));
1114 cookies[i] = g_value_dup_string (gst_value_array_get_value (value, i));
1116 } else if (G_VALUE_HOLDS_STRING (value)) {
1117 total_len = 1 + prev_len;
1118 cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1120 GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1121 g_value_get_string (value));
1122 cookies[0] = g_value_dup_string (value);
1124 GST_WARNING_OBJECT (demux, "%s field is not string or array",
1125 g_quark_to_string (field_id));
1131 for (j = 0; j < prev_len; j++) {
1132 GST_DEBUG_OBJECT (demux,
1133 "Append existing cookie %s", hdr_data->cookies[j]);
1134 cookies[i + j] = g_strdup (hdr_data->cookies[j]);
1137 cookies[total_len] = NULL;
1139 g_strfreev (hdr_data->cookies);
1140 hdr_data->cookies = cookies;
1144 if (g_ascii_strcasecmp (field_name, "Referer") == 0) {
1145 const gchar *referer = g_value_get_string (value);
1146 GST_INFO_OBJECT (demux, "Referer : %s", GST_STR_NULL (referer));
1148 downloadhelper_set_referer (demux->download_helper, referer);
1151 /* Date header can be used to estimate server offset */
1152 if (g_ascii_strcasecmp (field_name, "Date") == 0) {
1153 const gchar *http_date = g_value_get_string (value);
1156 GstDateTime *datetime =
1157 gst_adaptive_demux_util_parse_http_head_date (http_date);
1160 GDateTime *utc_now = gst_date_time_to_g_date_time (datetime);
1161 gchar *date_string = gst_date_time_to_iso8601_string (datetime);
1163 GST_INFO_OBJECT (demux,
1164 "HTTP response Date %s", GST_STR_NULL (date_string));
1165 g_free (date_string);
1167 gst_adaptive_demux_clock_set_utc_time (demux->realtime_clock, utc_now);
1169 g_date_time_unref (utc_now);
1170 gst_date_time_unref (datetime);
1179 gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
1182 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1185 switch (event->type) {
1186 case GST_EVENT_FLUSH_STOP:{
1187 GST_API_LOCK (demux);
1188 GST_MANIFEST_LOCK (demux);
1190 gst_adaptive_demux_reset (demux);
1192 ret = gst_pad_event_default (pad, parent, event);
1194 GST_MANIFEST_UNLOCK (demux);
1195 GST_API_UNLOCK (demux);
1201 if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1202 if (!handle_incoming_manifest (demux)) {
1203 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1204 return gst_pad_event_default (pad, parent, event);
1206 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1208 GST_ERROR_OBJECT (demux,
1209 "Failed to acquire scheduler to handle manifest");
1210 return gst_pad_event_default (pad, parent, event);
1212 gst_event_unref (event);
1215 case GST_EVENT_STREAM_START:
1216 if (gst_event_parse_group_id (event, &demux->group_id))
1217 demux->have_group_id = TRUE;
1219 demux->have_group_id = FALSE;
1220 /* Swallow stream-start, we'll push our own */
1221 gst_event_unref (event);
1223 case GST_EVENT_SEGMENT:
1224 /* Swallow newsegments, we'll push our own */
1225 gst_event_unref (event);
1227 case GST_EVENT_CUSTOM_DOWNSTREAM_STICKY:{
1228 const GstStructure *structure = gst_event_get_structure (event);
1229 struct http_headers_collector c = { demux, NULL };
1231 if (gst_structure_has_name (structure, "http-headers")) {
1232 if (gst_structure_has_field (structure, "request-headers")) {
1233 GstStructure *req_headers = NULL;
1234 gst_structure_get (structure, "request-headers", GST_TYPE_STRUCTURE,
1235 &req_headers, NULL);
1237 gst_structure_foreach (req_headers,
1238 gst_adaptive_demux_handle_upstream_http_header, &c);
1239 gst_structure_free (req_headers);
1242 if (gst_structure_has_field (structure, "response-headers")) {
1243 GstStructure *res_headers = NULL;
1244 gst_structure_get (structure, "response-headers", GST_TYPE_STRUCTURE,
1245 &res_headers, NULL);
1247 gst_structure_foreach (res_headers,
1248 gst_adaptive_demux_handle_upstream_http_header, &c);
1249 gst_structure_free (res_headers);
1254 downloadhelper_set_cookies (demux->download_helper, c.cookies);
1262 return gst_pad_event_default (pad, parent, event);
1265 static GstFlowReturn
1266 gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
1269 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1271 GST_MANIFEST_LOCK (demux);
1273 gst_adapter_push (demux->priv->input_adapter, buffer);
1275 GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
1276 (gint) gst_adapter_available (demux->priv->input_adapter));
1278 GST_MANIFEST_UNLOCK (demux);
1283 /* Called with TRACKS_LOCK taken */
1285 gst_adaptive_demux_period_reset_tracks (GstAdaptiveDemuxPeriod * period)
1289 for (tmp = period->tracks; tmp; tmp = tmp->next) {
1290 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1292 gst_adaptive_demux_track_flush (track);
1293 if (gst_pad_is_active (track->sinkpad)) {
1294 gst_pad_set_active (track->sinkpad, FALSE);
1295 gst_pad_set_active (track->sinkpad, TRUE);
1300 /* Resets all tracks to their initial state, ready to receive new data. */
1302 gst_adaptive_demux_reset_tracks (GstAdaptiveDemux * demux)
1304 TRACKS_LOCK (demux);
1305 g_queue_foreach (demux->priv->periods,
1306 (GFunc) gst_adaptive_demux_period_reset_tracks, NULL);
1307 TRACKS_UNLOCK (demux);
1310 /* Subclasses will call this function to ensure that a new input period is
1311 * available to receive new streams and tracks */
1313 gst_adaptive_demux_start_new_period (GstAdaptiveDemux * demux)
1315 if (demux->input_period && !demux->input_period->prepared) {
1316 GST_DEBUG_OBJECT (demux, "Using existing input period");
1320 if (demux->input_period) {
1321 GST_DEBUG_OBJECT (demux, "Marking that previous period has a next one");
1322 demux->input_period->has_next_period = TRUE;
1324 GST_DEBUG_OBJECT (demux, "Setting up new period");
1326 demux->input_period = gst_adaptive_demux_period_new (demux);
1331 /* must be called with manifest_lock taken */
1333 gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
1335 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1338 gst_adaptive_demux_stop_tasks (demux, TRUE);
1341 klass->reset (demux);
1343 /* Disable and remove all outputs */
1344 GST_DEBUG_OBJECT (demux, "Disabling and removing all outputs");
1345 for (iter = demux->priv->outputs; iter; iter = iter->next) {
1346 gst_adaptive_demux_output_slot_free (demux, (OutputSlot *) iter->data);
1348 g_list_free (demux->priv->outputs);
1349 demux->priv->outputs = NULL;
1351 g_queue_clear_full (demux->priv->periods,
1352 (GDestroyNotify) gst_adaptive_demux_period_unref);
1354 /* The output period always has an extra ref taken on it */
1355 if (demux->output_period)
1356 gst_adaptive_demux_period_unref (demux->output_period);
1357 demux->output_period = NULL;
1358 /* The input period doesn't have an extra ref taken on it */
1359 demux->input_period = NULL;
1361 gst_adaptive_demux_start_new_period (demux);
1363 g_free (demux->manifest_uri);
1364 g_free (demux->manifest_base_uri);
1365 demux->manifest_uri = NULL;
1366 demux->manifest_base_uri = NULL;
1368 gst_adapter_clear (demux->priv->input_adapter);
1369 g_atomic_int_set (&demux->priv->have_manifest, FALSE);
1371 gst_segment_init (&demux->segment, GST_FORMAT_TIME);
1372 demux->instant_rate_multiplier = 1.0;
1374 demux->priv->duration = GST_CLOCK_TIME_NONE;
1376 demux->priv->percent = -1;
1377 demux->priv->is_buffering = TRUE;
1379 demux->have_group_id = FALSE;
1380 demux->group_id = G_MAXUINT;
1381 demux->priv->segment_seqnum = gst_util_seqnum_next ();
1383 demux->priv->global_output_position = 0;
1385 demux->priv->n_audio_streams = 0;
1386 demux->priv->n_video_streams = 0;
1387 demux->priv->n_subtitle_streams = 0;
1389 gst_flow_combiner_reset (demux->priv->flowcombiner);
1393 gst_adaptive_demux_send_event (GstElement * element, GstEvent * event)
1395 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
1396 gboolean res = FALSE;
1398 GST_DEBUG_OBJECT (demux, "Received event %" GST_PTR_FORMAT, event);
1400 switch (GST_EVENT_TYPE (event)) {
1401 case GST_EVENT_SEEK:
1403 res = gst_adaptive_demux_handle_seek_event (demux, event);
1406 case GST_EVENT_SELECT_STREAMS:
1408 res = gst_adaptive_demux_handle_select_streams_event (demux, event);
1412 res = GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
1418 /* MANIFEST_LOCK held. Find the stream that owns the given element */
1419 static GstAdaptiveDemux2Stream *
1420 find_stream_for_element_locked (GstAdaptiveDemux * demux, GstObject * o)
1424 /* We only look in the streams of the input period (i.e. with active streams) */
1425 for (iter = demux->input_period->streams; iter; iter = iter->next) {
1426 GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1427 if (gst_object_has_as_ancestor (o, GST_OBJECT_CAST (stream->parsebin))) {
1436 gst_adaptive_demux_handle_stream_collection_msg (GstAdaptiveDemux * demux,
1439 GstAdaptiveDemux2Stream *stream;
1440 GstStreamCollection *collection = NULL;
1441 gboolean pending_tracks_activated = FALSE;
1443 GST_MANIFEST_LOCK (demux);
1445 stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1446 if (stream == NULL) {
1447 GST_WARNING_OBJECT (demux,
1448 "Failed to locate stream for collection message");
1452 gst_message_parse_stream_collection (msg, &collection);
1456 TRACKS_LOCK (demux);
1458 if (!gst_adaptive_demux2_stream_handle_collection (stream, collection,
1459 &pending_tracks_activated)) {
1460 TRACKS_UNLOCK (demux);
1462 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1463 (_("Stream format can't be handled")),
1464 ("The streams provided by the multiplex are ambiguous"));
1468 if (pending_tracks_activated) {
1469 /* If pending tracks were handled, then update the demuxer collection */
1470 if (gst_adaptive_demux_update_collection (demux, demux->input_period) &&
1471 demux->input_period == demux->output_period) {
1472 gst_adaptive_demux_post_collection (demux);
1475 /* If we discovered pending tracks and we no longer have any, we can ensure
1476 * selected tracks are started */
1477 if (!gst_adaptive_demux_period_has_pending_tracks (demux->input_period)) {
1478 GList *iter = demux->input_period->streams;
1479 for (; iter; iter = iter->next) {
1480 GstAdaptiveDemux2Stream *new_stream = iter->data;
1482 /* The stream that posted this collection was already started. If a
1483 * different stream is now selected, start it */
1484 if (stream != new_stream
1485 && gst_adaptive_demux2_stream_is_selected_locked (new_stream))
1486 gst_adaptive_demux2_stream_start (new_stream);
1490 TRACKS_UNLOCK (demux);
1493 GST_MANIFEST_UNLOCK (demux);
1496 gst_object_unref (collection);
1497 gst_message_unref (msg);
1502 gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
1504 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);
1506 switch (GST_MESSAGE_TYPE (msg)) {
1507 case GST_MESSAGE_STREAM_COLLECTION:
1509 gst_adaptive_demux_handle_stream_collection_msg (demux, msg);
1512 case GST_MESSAGE_ERROR:{
1513 GstAdaptiveDemux2Stream *stream = NULL;
1515 gchar *debug = NULL;
1516 gchar *new_error = NULL;
1517 const GstStructure *details = NULL;
1519 GST_MANIFEST_LOCK (demux);
1521 stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1522 if (stream == NULL) {
1523 GST_WARNING_OBJECT (demux,
1524 "Failed to locate stream for errored element");
1525 GST_MANIFEST_UNLOCK (demux);
1529 gst_message_parse_error (msg, &err, &debug);
1531 GST_WARNING_OBJECT (demux,
1532 "Source posted error: %d:%d %s (%s)", err->domain, err->code,
1533 err->message, debug);
1536 new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
1538 g_free (err->message);
1539 err->message = new_error;
1542 gst_message_parse_error_details (msg, &details);
1544 gst_structure_get_uint (details, "http-status-code",
1545 &stream->last_status_code);
1548 /* error, but ask to retry */
1549 if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1550 gst_adaptive_demux2_stream_parse_error (stream, err);
1551 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1557 GST_MANIFEST_UNLOCK (demux);
1559 gst_message_unref (msg);
1568 GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
1571 /* must be called with manifest_lock taken */
1573 gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
1575 GstAdaptiveDemuxClass *klass;
1577 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1579 if (klass->get_period_start_time == NULL)
1582 return klass->get_period_start_time (demux);
1585 /* must be called with manifest_lock taken */
1587 gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
1588 gboolean first_and_live)
1591 GstClockTime period_start;
1592 GstClockTimeDiff min_stream_time = GST_CLOCK_STIME_NONE;
1595 g_return_val_if_fail (demux->input_period->streams, FALSE);
1596 g_assert (demux->input_period->prepared == FALSE);
1598 new_streams = demux->input_period->streams;
1600 if (!gst_adaptive_demux2_is_running (demux)) {
1601 GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
1605 GST_DEBUG_OBJECT (demux,
1606 "Preparing %d streams for period %d , first_and_live:%d",
1607 g_list_length (new_streams), demux->input_period->period_num,
1610 for (iter = new_streams; iter; iter = g_list_next (iter)) {
1611 GstAdaptiveDemux2Stream *stream = iter->data;
1613 GST_DEBUG_OBJECT (stream, "Preparing stream");
1615 stream->need_header = TRUE;
1616 stream->discont = TRUE;
1618 /* Grab the first stream time for live streams
1619 * * If the stream is selected
1620 * * Or it provides dynamic tracks (in which case we need to force an update)
1623 && (gst_adaptive_demux2_stream_is_selected_locked (stream)
1624 || stream->pending_tracks)) {
1625 /* TODO we only need the first timestamp, maybe create a simple function to
1626 * get the current PTS of a fragment ? */
1627 GST_DEBUG_OBJECT (stream, "Calling update_fragment_info");
1628 gst_adaptive_demux2_stream_update_fragment_info (stream);
1630 GST_DEBUG_OBJECT (stream,
1631 "Got stream time %" GST_STIME_FORMAT,
1632 GST_STIME_ARGS (stream->fragment.stream_time));
1634 if (GST_CLOCK_STIME_IS_VALID (min_stream_time)) {
1635 min_stream_time = MIN (min_stream_time, stream->fragment.stream_time);
1637 min_stream_time = stream->fragment.stream_time;
1642 period_start = gst_adaptive_demux_get_period_start_time (demux);
1644 /* For live streams, the subclass is supposed to seek to the current fragment
1645 * and then tell us its stream time in stream->fragment.stream_time. We now
1646 * also have to seek our demuxer segment to reflect this.
1648 * FIXME: This needs some refactoring at some point.
1650 if (first_and_live) {
1651 gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
1652 GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_stream_time + period_start,
1653 GST_SEEK_TYPE_NONE, -1, NULL);
1656 GST_DEBUG_OBJECT (demux,
1657 "period_start:%" GST_TIME_FORMAT ", min_stream_time:%" GST_STIME_FORMAT
1658 " demux segment %" GST_SEGMENT_FORMAT,
1659 GST_TIME_ARGS (period_start), GST_STIME_ARGS (min_stream_time),
1662 /* Synchronize stream start/current positions */
1663 if (min_stream_time == GST_CLOCK_STIME_NONE)
1664 min_stream_time = period_start;
1666 min_stream_time += period_start;
1667 for (iter = new_streams; iter; iter = g_list_next (iter)) {
1668 GstAdaptiveDemux2Stream *stream = iter->data;
1669 stream->start_position = stream->current_position = min_stream_time;
1672 for (iter = new_streams; iter; iter = g_list_next (iter)) {
1673 GstAdaptiveDemux2Stream *stream = iter->data;
1674 stream->compute_segment = TRUE;
1675 stream->first_and_live = first_and_live;
1677 demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
1678 demux->input_period->prepared = TRUE;
1683 static GstAdaptiveDemuxTrack *
1684 find_track_for_stream_id (GstAdaptiveDemuxPeriod * period, gchar * stream_id)
1688 for (tmp = period->tracks; tmp; tmp = tmp->next) {
1689 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1690 if (!g_strcmp0 (track->stream_id, stream_id))
1697 /* TRACKS_LOCK hold */
1699 demux_update_buffering_locked (GstAdaptiveDemux * demux)
1701 GstClockTime min_level_time = GST_CLOCK_TIME_NONE;
1702 GstClockTime video_level_time = GST_CLOCK_TIME_NONE;
1703 GstClockTime audio_level_time = GST_CLOCK_TIME_NONE;
1705 gint min_percent = -1, percent;
1706 gboolean all_eos = TRUE;
1708 /* Go over all active tracks of the output period and update level */
1710 /* Check that all tracks are above their respective low thresholds (different
1711 * tracks may have different fragment durations yielding different buffering
1712 * percentages) Overall buffering percent is the lowest. */
1713 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
1714 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1716 GST_LOG_OBJECT (demux,
1717 "Checking track '%s' (period %u) active:%d selected:%d eos:%d level:%"
1718 GST_TIME_FORMAT " buffering_threshold:%" GST_TIME_FORMAT,
1719 track->stream_id, track->period_num, track->active, track->selected,
1720 track->eos, GST_TIME_ARGS (track->level_time),
1721 GST_TIME_ARGS (track->buffering_threshold));
1723 if (track->active && track->selected) {
1728 if (min_level_time == GST_CLOCK_TIME_NONE) {
1729 min_level_time = track->level_time;
1730 } else if (track->level_time < min_level_time) {
1731 min_level_time = track->level_time;
1734 if (track->type & GST_STREAM_TYPE_VIDEO
1735 && video_level_time > track->level_time)
1736 video_level_time = track->level_time;
1738 if (track->type & GST_STREAM_TYPE_AUDIO
1739 && audio_level_time > track->level_time)
1740 audio_level_time = track->level_time;
1742 if (track->level_time != GST_CLOCK_TIME_NONE
1743 && track->buffering_threshold != 0) {
1745 gst_util_uint64_scale (track->level_time, 100,
1746 track->buffering_threshold);
1747 if (min_percent < 0 || cur_percent < min_percent)
1748 min_percent = cur_percent;
1754 GST_DEBUG_OBJECT (demux,
1755 "Minimum time level %" GST_TIME_FORMAT " percent %d all_eos:%d",
1756 GST_TIME_ARGS (min_level_time), min_percent, all_eos);
1758 /* Update demuxer video/audio level properties */
1759 GST_OBJECT_LOCK (demux);
1760 demux->current_level_time_video = video_level_time;
1761 demux->current_level_time_audio = audio_level_time;
1762 GST_OBJECT_UNLOCK (demux);
1764 if (min_percent < 0 && !all_eos)
1767 if (min_percent > 100 || all_eos)
1770 percent = MAX (0, min_percent);
1772 GST_LOG_OBJECT (demux, "percent : %d %%", percent);
1774 if (demux->priv->is_buffering) {
1776 demux->priv->is_buffering = FALSE;
1777 if (demux->priv->percent != percent) {
1778 demux->priv->percent = percent;
1779 demux->priv->percent_changed = TRUE;
1781 } else if (percent < 1) {
1782 demux->priv->is_buffering = TRUE;
1783 if (demux->priv->percent != percent) {
1784 demux->priv->percent = percent;
1785 demux->priv->percent_changed = TRUE;
1789 if (demux->priv->percent_changed)
1790 GST_DEBUG_OBJECT (demux, "Percent changed, %d %% is_buffering:%d", percent,
1791 demux->priv->is_buffering);
1794 /* With TRACKS_LOCK held */
1796 demux_post_buffering_locked (GstAdaptiveDemux * demux)
1801 if (!demux->priv->percent_changed)
1804 BUFFERING_LOCK (demux);
1805 percent = demux->priv->percent;
1806 msg = gst_message_new_buffering ((GstObject *) demux, percent);
1807 TRACKS_UNLOCK (demux);
1808 gst_element_post_message ((GstElement *) demux, msg);
1810 BUFFERING_UNLOCK (demux);
1811 TRACKS_LOCK (demux);
1812 if (percent == demux->priv->percent)
1813 demux->priv->percent_changed = FALSE;
1816 /* MANIFEST_LOCK and TRACKS_LOCK hold */
1817 GstAdaptiveDemux2Stream *
1818 find_stream_for_track_locked (GstAdaptiveDemux * demux,
1819 GstAdaptiveDemuxTrack * track)
1823 for (iter = demux->output_period->streams; iter; iter = iter->next) {
1824 GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1825 if (g_list_find (stream->tracks, track))
1832 /* Called from seek handler
1834 * This function is used when a (flushing) seek caused a new period to be activated.
1836 * This will ensure that:
1837 * * the current output period is marked as finished (EOS)
1838 * * Any potential intermediate (non-input/non-output) periods are removed
1839 * * That the new input period is prepared and ready
1842 gst_adaptive_demux_seek_to_input_period (GstAdaptiveDemux * demux)
1846 GST_DEBUG_OBJECT (demux,
1847 "Preparing new input period %u", demux->input_period->period_num);
1849 /* Prepare the new input period */
1850 gst_adaptive_demux_update_collection (demux, demux->input_period);
1852 /* Transfer the previous selection to the new input period */
1853 gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
1854 demux->output_period);
1855 gst_adaptive_demux_prepare_streams (demux, FALSE);
1857 /* Remove all periods except for the input (last) and output (first) period */
1858 while (demux->priv->periods->length > 2) {
1859 GstAdaptiveDemuxPeriod *period = g_queue_pop_nth (demux->priv->periods, 1);
1860 /* Mark all tracks of the removed period as not selected and EOS so they
1861 * will be skipped / ignored */
1862 for (iter = period->tracks; iter; iter = iter->next) {
1863 GstAdaptiveDemuxTrack *track = iter->data;
1864 track->selected = FALSE;
1867 gst_adaptive_demux_period_unref (period);
1870 /* Mark all tracks of the output period as EOS so that the output loop
1871 * will immediately move to the new period */
1872 for (iter = demux->output_period->tracks; iter; iter = iter->next) {
1873 GstAdaptiveDemuxTrack *track = iter->data;
1877 /* Go over all slots, and clear any pending track */
1878 for (iter = demux->priv->outputs; iter; iter = iter->next) {
1879 OutputSlot *slot = (OutputSlot *) iter->data;
1881 if (slot->pending_track != NULL) {
1882 GST_DEBUG_OBJECT (demux,
1883 "Removing track '%s' as pending from output of current track '%s'",
1884 slot->pending_track->stream_id, slot->track->stream_id);
1885 gst_adaptive_demux_track_unref (slot->pending_track);
1886 slot->pending_track = NULL;
1891 /* must be called with manifest_lock taken */
1893 gst_adaptive_demux_get_live_seek_range (GstAdaptiveDemux * demux,
1894 gint64 * range_start, gint64 * range_stop)
1896 GstAdaptiveDemuxClass *klass;
1898 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1900 g_return_val_if_fail (klass->get_live_seek_range, FALSE);
1902 return klass->get_live_seek_range (demux, range_start, range_stop);
1905 /* must be called with manifest_lock taken */
1907 gst_adaptive_demux2_stream_in_live_seek_range (GstAdaptiveDemux * demux,
1908 GstAdaptiveDemux2Stream * stream)
1910 gint64 range_start, range_stop;
1911 if (gst_adaptive_demux_get_live_seek_range (demux, &range_start, &range_stop)) {
1912 GST_LOG_OBJECT (stream,
1913 "stream position %" GST_TIME_FORMAT " live seek range %"
1914 GST_STIME_FORMAT " - %" GST_STIME_FORMAT,
1915 GST_TIME_ARGS (stream->current_position), GST_STIME_ARGS (range_start),
1916 GST_STIME_ARGS (range_stop));
1917 return (stream->current_position >= range_start
1918 && stream->current_position <= range_stop);
1924 /* must be called with manifest_lock taken */
1926 gst_adaptive_demux_can_seek (GstAdaptiveDemux * demux)
1928 GstAdaptiveDemuxClass *klass;
1930 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1931 if (gst_adaptive_demux_is_live (demux)) {
1932 return klass->get_live_seek_range != NULL;
1935 return klass->seek != NULL;
1939 gst_adaptive_demux_setup_streams_for_restart (GstAdaptiveDemux * demux,
1940 GstSeekType start_type, GstSeekType stop_type)
1944 for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
1945 GstAdaptiveDemux2Stream *stream = iter->data;
1947 /* Make sure the download loop clears and restarts on the next start,
1948 * which will recompute the stream segment */
1949 g_assert (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED ||
1950 stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART);
1951 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
1952 stream->start_position = 0;
1954 if (demux->segment.rate > 0 && start_type != GST_SEEK_TYPE_NONE)
1955 stream->start_position = demux->segment.start;
1956 else if (demux->segment.rate < 0 && stop_type != GST_SEEK_TYPE_NONE)
1957 stream->start_position = demux->segment.stop;
1961 #define IS_SNAP_SEEK(f) (f & (GST_SEEK_FLAG_SNAP_BEFORE | \
1962 GST_SEEK_FLAG_SNAP_AFTER | \
1963 GST_SEEK_FLAG_SNAP_NEAREST | \
1964 GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | \
1965 GST_SEEK_FLAG_KEY_UNIT))
1966 #define REMOVE_SNAP_FLAGS(f) (f & ~(GST_SEEK_FLAG_SNAP_BEFORE | \
1967 GST_SEEK_FLAG_SNAP_AFTER | \
1968 GST_SEEK_FLAG_SNAP_NEAREST))
1971 gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux,
1974 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1978 GstSeekType start_type, stop_type;
1982 gboolean ret = FALSE;
1983 GstSegment oldsegment;
1984 GstEvent *flush_event;
1986 GST_INFO_OBJECT (demux, "Received seek event");
1988 gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
1991 if (format != GST_FORMAT_TIME) {
1992 GST_WARNING_OBJECT (demux,
1993 "Adaptive demuxers only support TIME-based seeking");
1994 gst_event_unref (event);
1998 if (flags & GST_SEEK_FLAG_SEGMENT) {
1999 GST_FIXME_OBJECT (demux, "Handle segment seeks");
2000 gst_event_unref (event);
2004 seqnum = gst_event_get_seqnum (event);
2006 GST_API_LOCK (demux);
2007 if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
2008 GST_LOG_OBJECT (demux, "Failed to acquire scheduler context");
2009 GST_API_UNLOCK (demux);
2013 if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE) {
2014 /* For instant rate seeks, reply directly and update
2015 * our segment so the new rate is reflected in any future
2018 gdouble rate_multiplier;
2020 /* instant rate change only supported if direction does not change. All
2021 * other requirements are already checked before creating the seek event
2022 * but let's double-check here to be sure */
2023 if ((demux->segment.rate > 0 && rate < 0) ||
2024 (demux->segment.rate < 0 && rate > 0) ||
2025 start_type != GST_SEEK_TYPE_NONE ||
2026 stop_type != GST_SEEK_TYPE_NONE || (flags & GST_SEEK_FLAG_FLUSH)) {
2027 GST_ERROR_OBJECT (demux,
2028 "Instant rate change seeks only supported in the "
2029 "same direction, without flushing and position change");
2033 rate_multiplier = rate / demux->segment.rate;
2035 ev = gst_event_new_instant_rate_change (rate_multiplier,
2036 (GstSegmentFlags) flags);
2037 gst_event_set_seqnum (ev, seqnum);
2039 ret = gst_adaptive_demux_push_src_event (demux, ev);
2042 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2043 demux->instant_rate_multiplier = rate_multiplier;
2044 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2049 if (!gst_adaptive_demux_can_seek (demux))
2052 /* We can only accept flushing seeks from this point onward */
2053 if (!(flags & GST_SEEK_FLAG_FLUSH)) {
2054 GST_ERROR_OBJECT (demux,
2055 "Non-flushing non-instant-rate seeks are not possible");
2059 if (gst_adaptive_demux_is_live (demux)) {
2060 gint64 range_start, range_stop;
2061 gboolean changed = FALSE;
2062 gboolean start_valid = TRUE, stop_valid = TRUE;
2064 if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
2066 GST_WARNING_OBJECT (demux, "Failure getting the live seek ranges");
2070 GST_DEBUG_OBJECT (demux,
2071 "Live range is %" GST_STIME_FORMAT " %" GST_STIME_FORMAT,
2072 GST_STIME_ARGS (range_start), GST_STIME_ARGS (range_stop));
2074 /* Handle relative positioning for live streams (relative to the range_stop) */
2075 if (start_type == GST_SEEK_TYPE_END) {
2076 start = range_stop + start;
2077 start_type = GST_SEEK_TYPE_SET;
2080 if (stop_type == GST_SEEK_TYPE_END) {
2081 stop = range_stop + stop;
2082 stop_type = GST_SEEK_TYPE_SET;
2086 /* Adjust the requested start/stop position if it falls beyond the live
2088 * The only case where we don't adjust is for the starting point of
2089 * an accurate seek (start if forward and stop if backwards)
2091 if (start_type == GST_SEEK_TYPE_SET && start < range_start &&
2092 (rate < 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2093 GST_DEBUG_OBJECT (demux,
2094 "seek before live stream start, setting to range start: %"
2095 GST_TIME_FORMAT, GST_TIME_ARGS (range_start));
2096 start = range_start;
2099 /* truncate stop position also if set */
2100 if (stop_type == GST_SEEK_TYPE_SET && stop > range_stop &&
2101 (rate > 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2102 GST_DEBUG_OBJECT (demux,
2103 "seek ending after live start, adjusting to: %"
2104 GST_TIME_FORMAT, GST_TIME_ARGS (range_stop));
2109 if (start_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (start) &&
2110 (start < range_start || start > range_stop)) {
2111 GST_WARNING_OBJECT (demux,
2112 "Seek to invalid position start:%" GST_STIME_FORMAT
2113 " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2114 ")", GST_STIME_ARGS (start), GST_STIME_ARGS (range_start),
2115 GST_STIME_ARGS (range_stop));
2116 start_valid = FALSE;
2118 if (stop_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (stop) &&
2119 (stop < range_start || stop > range_stop)) {
2120 GST_WARNING_OBJECT (demux,
2121 "Seek to invalid position stop:%" GST_STIME_FORMAT
2122 " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2123 ")", GST_STIME_ARGS (stop), GST_STIME_ARGS (range_start),
2124 GST_STIME_ARGS (range_stop));
2128 /* If the seek position is still outside of the seekable range, refuse the seek */
2129 if (!start_valid || !stop_valid)
2132 /* Re-create seek event with changed/updated values */
2134 gst_event_unref (event);
2136 gst_event_new_seek (rate, format, flags,
2137 start_type, start, stop_type, stop);
2138 gst_event_set_seqnum (event, seqnum);
2142 GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
2144 /* have a backup in case seek fails */
2145 gst_segment_copy_into (&demux->segment, &oldsegment);
2147 GST_DEBUG_OBJECT (demux, "sending flush start");
2148 flush_event = gst_event_new_flush_start ();
2149 gst_event_set_seqnum (flush_event, seqnum);
2151 gst_adaptive_demux_push_src_event (demux, flush_event);
2153 gst_adaptive_demux_stop_tasks (demux, FALSE);
2154 gst_adaptive_demux_reset_tracks (demux);
2156 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2158 if (!IS_SNAP_SEEK (flags) && !(flags & GST_SEEK_FLAG_ACCURATE)) {
2159 /* If no accurate seeking was specified, we want to default to seeking to
2160 * the previous segment for efficient/fast playback. */
2161 flags |= GST_SEEK_FLAG_KEY_UNIT;
2164 if (IS_SNAP_SEEK (flags)) {
2165 GstAdaptiveDemux2Stream *default_stream = NULL;
2166 GstAdaptiveDemux2Stream *stream = NULL;
2169 * Handle snap seeks as follows:
2170 * 1) do the snap seeking a (random) active stream
2171 * 1.1) If none are active yet (early-seek), pick a random default one
2172 * 2) use the final position on this stream to seek
2173 * on the other streams to the same position
2175 * We can't snap at all streams at the same time as they might end in
2176 * different positions, so just pick one and align all others to that
2180 /* Pick a random active stream on which to do the stream seek */
2181 for (iter = demux->output_period->streams; iter; iter = iter->next) {
2182 GstAdaptiveDemux2Stream *cand = iter->data;
2183 if (gst_adaptive_demux2_stream_is_selected_locked (cand)) {
2187 if (default_stream == NULL
2188 && gst_adaptive_demux2_stream_is_default_locked (cand))
2189 default_stream = cand;
2193 stream = default_stream;
2196 GstClockTimeDiff ts;
2197 GstSeekFlags stream_seek_flags = flags;
2199 /* snap-seek on the chosen stream and then
2200 * use the resulting position to seek on all streams */
2202 if (start_type != GST_SEEK_TYPE_NONE)
2205 ts = gst_segment_position_from_running_time (&demux->segment,
2206 GST_FORMAT_TIME, demux->priv->global_output_position);
2207 start_type = GST_SEEK_TYPE_SET;
2210 if (stop_type != GST_SEEK_TYPE_NONE)
2213 stop_type = GST_SEEK_TYPE_SET;
2214 ts = gst_segment_position_from_running_time (&demux->segment,
2215 GST_FORMAT_TIME, demux->priv->global_output_position);
2219 if (gst_adaptive_demux2_stream_seek (stream, rate >= 0, stream_seek_flags,
2220 ts, &ts) != GST_FLOW_OK) {
2221 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2224 /* replace event with a new one without snapping to seek on all streams */
2225 gst_event_unref (event);
2232 gst_event_new_seek (rate, format, REMOVE_SNAP_FLAGS (flags),
2233 start_type, start, stop_type, stop);
2234 GST_DEBUG_OBJECT (demux, "Adapted snap seek to %" GST_PTR_FORMAT, event);
2238 ret = gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
2239 start, stop_type, stop, &update);
2242 GST_DEBUG_OBJECT (demux, "Calling subclass seek: %" GST_PTR_FORMAT, event);
2244 ret = demux_class->seek (demux, event);
2248 /* Is there anything else we can do if it fails? */
2249 gst_segment_copy_into (&oldsegment, &demux->segment);
2251 demux->priv->segment_seqnum = seqnum;
2253 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2255 /* Resetting flow combiner */
2256 gst_flow_combiner_reset (demux->priv->flowcombiner);
2258 GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
2259 flush_event = gst_event_new_flush_stop (TRUE);
2260 gst_event_set_seqnum (flush_event, seqnum);
2261 gst_adaptive_demux_push_src_event (demux, flush_event);
2263 /* If the seek generated a new period, prepare it */
2264 if (!demux->input_period->prepared) {
2265 /* This can only happen on flushing seeks */
2266 g_assert (flags & GST_SEEK_FLAG_FLUSH);
2267 gst_adaptive_demux_seek_to_input_period (demux);
2270 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2271 GST_DEBUG_OBJECT (demux, "Demuxer segment after seek: %" GST_SEGMENT_FORMAT,
2273 gst_adaptive_demux_setup_streams_for_restart (demux, start_type, stop_type);
2274 demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2276 /* Reset the global output position (running time) for when the output loop restarts */
2277 demux->priv->global_output_position = 0;
2279 /* After a flushing seek, any instant-rate override is undone */
2280 demux->instant_rate_multiplier = 1.0;
2282 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2284 /* Restart the demux */
2285 gst_adaptive_demux_start_tasks (demux);
2288 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2289 GST_API_UNLOCK (demux);
2290 gst_event_unref (event);
2295 /* Returns TRUE if the stream has at least one selected track */
2297 gst_adaptive_demux2_stream_has_selected_tracks (GstAdaptiveDemux2Stream *
2302 for (tmp = stream->tracks; tmp; tmp = tmp->next) {
2303 GstAdaptiveDemuxTrack *track = tmp->data;
2305 if (track->selected)
2313 handle_stream_selection (GstAdaptiveDemux * demux, GList * streams,
2316 gboolean selection_handled = TRUE;
2318 GList *tracks = NULL;
2320 if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux))
2323 TRACKS_LOCK (demux);
2324 /* We can't do stream selection if we are migrating between periods */
2325 if (demux->input_period && demux->output_period != demux->input_period) {
2326 GST_WARNING_OBJECT (demux,
2327 "Stream selection while migrating between periods is not possible");
2328 TRACKS_UNLOCK (demux);
2331 /* Validate the streams and fill:
2332 * tracks : list of tracks corresponding to requested streams
2334 for (iter = streams; iter; iter = iter->next) {
2335 gchar *stream_id = (gchar *) iter->data;
2336 GstAdaptiveDemuxTrack *track;
2338 GST_DEBUG_OBJECT (demux, "Stream requested : %s", stream_id);
2339 track = find_track_for_stream_id (demux->output_period, stream_id);
2341 GST_WARNING_OBJECT (demux, "Unrecognized stream_id '%s'", stream_id);
2342 selection_handled = FALSE;
2343 goto select_streams_done;
2345 tracks = g_list_append (tracks, track);
2346 GST_DEBUG_OBJECT (demux, "Track found, selected:%d", track->selected);
2349 /* FIXME : ACTIVATING AND DEACTIVATING STREAMS SHOULD BE DONE FROM THE
2350 * SCHEDULING THREAD */
2352 /* FIXME: We want to iterate all streams, mark them as deselected,
2353 * then iterate tracks and mark any streams that have at least 1
2354 * active output track, then loop over all streams again and start/stop
2357 /* Go over all tracks present and (de)select based on current selection */
2358 for (iter = demux->output_period->tracks; iter; iter = iter->next) {
2359 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
2361 if (track->selected && !g_list_find (tracks, track)) {
2362 GST_DEBUG_OBJECT (demux, "De-select track '%s' (active:%d)",
2363 track->stream_id, track->active);
2364 track->selected = FALSE;
2365 track->draining = TRUE;
2366 } else if (!track->selected && g_list_find (tracks, track)) {
2367 GST_DEBUG_OBJECT (demux, "Selecting track '%s'", track->stream_id);
2369 track->selected = TRUE;
2373 /* Start or stop streams based on the updated track selection */
2374 for (iter = demux->output_period->streams; iter; iter = iter->next) {
2375 GstAdaptiveDemux2Stream *stream = iter->data;
2378 gboolean is_running = gst_adaptive_demux2_stream_is_running (stream);
2379 gboolean should_be_running =
2380 gst_adaptive_demux2_stream_has_selected_tracks (stream);
2382 if (!is_running && should_be_running) {
2383 GstClockTime output_running_ts = demux->priv->global_output_position;
2384 GstClockTime start_position;
2386 /* Calculate where we should start the stream, and then
2388 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2390 GST_DEBUG_OBJECT (stream, "(Re)starting stream. Output running ts %"
2391 GST_TIME_FORMAT " in demux segment %" GST_SEGMENT_FORMAT,
2392 GST_TIME_ARGS (output_running_ts), &demux->segment);
2395 gst_segment_position_from_running_time (&demux->segment,
2396 GST_FORMAT_TIME, output_running_ts);
2398 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2400 GST_DEBUG_OBJECT (demux, "Setting stream start position to %"
2401 GST_TIME_FORMAT, GST_TIME_ARGS (start_position));
2403 stream->current_position = stream->start_position = start_position;
2404 stream->compute_segment = TRUE;
2406 /* If output has already begun, ensure we seek this segment
2407 * to the correct restart position when the download loop begins */
2408 if (output_running_ts != 0)
2409 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
2411 /* Activate track pads for this stream */
2412 for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2413 GstAdaptiveDemuxTrack *track =
2414 (GstAdaptiveDemuxTrack *) trackiter->data;
2415 gst_pad_set_active (track->sinkpad, TRUE);
2418 gst_adaptive_demux2_stream_start (stream);
2419 } else if (is_running && !should_be_running) {
2420 /* Stream should not be running and needs stopping */
2421 gst_adaptive_demux2_stream_stop (stream);
2423 /* Set all track sinkpads to inactive for this stream */
2424 for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2425 GstAdaptiveDemuxTrack *track =
2426 (GstAdaptiveDemuxTrack *) trackiter->data;
2427 gst_pad_set_active (track->sinkpad, FALSE);
2432 g_atomic_int_set (&demux->priv->requested_selection_seqnum, seqnum);
2434 select_streams_done:
2435 demux_update_buffering_locked (demux);
2436 demux_post_buffering_locked (demux);
2438 TRACKS_UNLOCK (demux);
2439 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2442 g_list_free (tracks);
2443 return selection_handled;
2447 gst_adaptive_demux_handle_select_streams_event (GstAdaptiveDemux * demux,
2451 gboolean selection_handled;
2453 if (GST_EVENT_SEQNUM (event) ==
2454 g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
2455 GST_DEBUG_OBJECT (demux, "Already handled/handling select-streams %d",
2456 GST_EVENT_SEQNUM (event));
2460 gst_event_parse_select_streams (event, &streams);
2462 handle_stream_selection (demux, streams, GST_EVENT_SEQNUM (event));
2463 g_list_free_full (streams, g_free);
2465 gst_event_unref (event);
2466 return selection_handled;
2470 gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
2473 GstAdaptiveDemux *demux;
2475 demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2477 switch (event->type) {
2478 case GST_EVENT_SEEK:
2480 guint32 seqnum = gst_event_get_seqnum (event);
2481 if (seqnum == demux->priv->segment_seqnum) {
2482 GST_LOG_OBJECT (pad,
2483 "Drop duplicated SEEK event seqnum %" G_GUINT32_FORMAT, seqnum);
2484 gst_event_unref (event);
2487 return gst_adaptive_demux_handle_seek_event (demux, event);
2489 case GST_EVENT_LATENCY:{
2490 /* Upstream and our internal source are irrelevant
2491 * for latency, and we should not fail here to
2492 * configure the latency */
2493 gst_event_unref (event);
2496 case GST_EVENT_QOS:{
2497 GstClockTimeDiff diff;
2498 GstClockTime timestamp;
2499 GstClockTime earliest_time;
2501 gst_event_parse_qos (event, NULL, NULL, &diff, ×tamp);
2502 /* Only take into account lateness if late */
2504 earliest_time = timestamp + 2 * diff;
2506 earliest_time = timestamp;
2508 GST_OBJECT_LOCK (demux);
2509 if (!GST_CLOCK_TIME_IS_VALID (demux->priv->qos_earliest_time) ||
2510 earliest_time > demux->priv->qos_earliest_time) {
2511 demux->priv->qos_earliest_time = earliest_time;
2512 GST_DEBUG_OBJECT (demux, "qos_earliest_time %" GST_TIME_FORMAT,
2513 GST_TIME_ARGS (demux->priv->qos_earliest_time));
2515 GST_OBJECT_UNLOCK (demux);
2518 case GST_EVENT_SELECT_STREAMS:
2520 return gst_adaptive_demux_handle_select_streams_event (demux, event);
2526 return gst_pad_event_default (pad, parent, event);
2530 gst_adaptive_demux_handle_query_seeking (GstAdaptiveDemux * demux,
2533 GstFormat fmt = GST_FORMAT_UNDEFINED;
2536 gboolean ret = FALSE;
2538 if (!g_atomic_int_get (&demux->priv->have_manifest)) {
2539 GST_INFO_OBJECT (demux,
2540 "Don't have manifest yet, can't answer seeking query");
2541 return FALSE; /* can't answer without manifest */
2544 GST_MANIFEST_LOCK (demux);
2546 gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
2547 GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
2548 if (fmt == GST_FORMAT_TIME) {
2549 GstClockTime duration;
2550 gboolean can_seek = gst_adaptive_demux_can_seek (demux);
2554 if (gst_adaptive_demux_is_live (demux)) {
2555 ret = gst_adaptive_demux_get_live_seek_range (demux, &start, &stop);
2558 GST_MANIFEST_UNLOCK (demux);
2559 GST_INFO_OBJECT (demux, "can't answer seeking query");
2563 duration = demux->priv->duration;
2564 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
2568 gst_query_set_seeking (query, fmt, can_seek, start, stop);
2569 GST_INFO_OBJECT (demux, "GST_QUERY_SEEKING returning with start : %"
2570 GST_TIME_FORMAT ", stop : %" GST_TIME_FORMAT,
2571 GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
2573 GST_MANIFEST_UNLOCK (demux);
2578 gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
2581 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2582 gboolean ret = FALSE;
2587 switch (query->type) {
2588 case GST_QUERY_DURATION:{
2590 GstClockTime duration = GST_CLOCK_TIME_NONE;
2592 gst_query_parse_duration (query, &fmt, NULL);
2594 if (gst_adaptive_demux_is_live (demux)) {
2595 /* We are able to answer this query: the duration is unknown */
2596 gst_query_set_duration (query, fmt, -1);
2601 if (fmt == GST_FORMAT_TIME
2602 && g_atomic_int_get (&demux->priv->have_manifest)) {
2604 GST_MANIFEST_LOCK (demux);
2605 duration = demux->priv->duration;
2606 GST_MANIFEST_UNLOCK (demux);
2608 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
2609 gst_query_set_duration (query, GST_FORMAT_TIME, duration);
2614 GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
2615 GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
2618 case GST_QUERY_LATENCY:{
2619 gst_query_set_latency (query, FALSE, 0, -1);
2623 case GST_QUERY_SEEKING:
2624 ret = gst_adaptive_demux_handle_query_seeking (demux, query);
2628 GST_MANIFEST_LOCK (demux);
2630 /* TODO HLS can answer this differently it seems */
2631 if (demux->manifest_uri) {
2632 /* FIXME: (hls) Do we answer with the variant playlist, with the current
2633 * playlist or the the uri of the last downlowaded fragment? */
2634 gst_query_set_uri (query, demux->manifest_uri);
2638 GST_MANIFEST_UNLOCK (demux);
2640 case GST_QUERY_SELECTABLE:
2641 gst_query_set_selectable (query, TRUE);
2645 /* Don't forward queries upstream because of the special nature of this
2646 * "demuxer", which relies on the upstream element only to be fed
2656 gst_adaptive_demux_query (GstElement * element, GstQuery * query)
2658 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
2660 GST_LOG_OBJECT (demux, "%" GST_PTR_FORMAT, query);
2662 switch (GST_QUERY_TYPE (query)) {
2663 case GST_QUERY_BUFFERING:
2666 gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
2668 if (!demux->output_period) {
2669 if (format != GST_FORMAT_TIME) {
2670 GST_DEBUG_OBJECT (demux,
2671 "No period setup yet, can't answer non-TIME buffering queries");
2675 GST_DEBUG_OBJECT (demux,
2676 "No period setup yet, but still answering buffering query");
2680 case GST_QUERY_SEEKING:
2682 /* Source pads might not be present early on which would cause the default
2683 * element query handler to fail, yet we can answer this query */
2684 return gst_adaptive_demux_handle_query_seeking (demux, query);
2690 return GST_ELEMENT_CLASS (parent_class)->query (element, query);
2694 gst_adaptive_demux_handle_lost_sync (GstAdaptiveDemux * demux)
2698 GST_WARNING_OBJECT (demux, "Lost synchronization, seeking back to live head");
2701 gst_event_new_seek (1.0, GST_FORMAT_TIME,
2702 GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT, GST_SEEK_TYPE_END, 0,
2703 GST_SEEK_TYPE_NONE, 0);
2704 gst_adaptive_demux_handle_seek_event (demux, seek);
2709 /* Called when the scheduler starts, to kick off manifest updates
2710 * and stream downloads */
2712 gst_adaptive_demux_scheduler_start_cb (GstAdaptiveDemux * demux)
2716 GST_INFO_OBJECT (demux, "Starting streams' tasks");
2718 iter = demux->input_period->streams;
2720 for (; iter; iter = g_list_next (iter)) {
2721 GstAdaptiveDemux2Stream *stream = iter->data;
2723 /* If we need to process this stream to discover tracks *OR* it has any
2724 * tracks which are selected, start it now */
2725 if ((stream->pending_tracks == TRUE)
2726 || gst_adaptive_demux2_stream_is_selected_locked (stream))
2727 gst_adaptive_demux2_stream_start (stream);
2733 /* must be called with manifest_lock taken */
2735 gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux)
2737 if (!gst_adaptive_demux2_is_running (demux)) {
2738 GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
2742 GST_DEBUG_OBJECT (demux, "Starting the SCHEDULER task");
2743 gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2744 (GSourceFunc) gst_adaptive_demux_scheduler_start_cb, demux, NULL);
2746 TRACKS_LOCK (demux);
2747 demux->priv->flushing = FALSE;
2748 GST_DEBUG_OBJECT (demux, "Starting the output task");
2749 gst_task_start (demux->priv->output_task);
2750 TRACKS_UNLOCK (demux);
2753 /* must be called with manifest_lock taken */
2755 gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux * demux)
2757 GST_DEBUG_OBJECT (demux, "requesting stop of the manifest update task");
2758 if (demux->priv->manifest_updates_cb != 0) {
2759 gst_adaptive_demux_loop_cancel_call (demux->priv->scheduler_task,
2760 demux->priv->manifest_updates_cb);
2761 demux->priv->manifest_updates_cb = 0;
2765 static gboolean gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux);
2767 /* must be called with manifest_lock taken */
2769 gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux * demux)
2771 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2773 if (gst_adaptive_demux_is_live (demux)) {
2774 /* Task to periodically update the manifest */
2775 if (demux_class->requires_periodical_playlist_update (demux)) {
2776 GST_DEBUG_OBJECT (demux, "requesting start of the manifest update task");
2777 if (demux->priv->manifest_updates_cb == 0) {
2778 demux->priv->manifest_updates_cb =
2779 gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2780 (GSourceFunc) gst_adaptive_demux_updates_start_cb, demux, NULL);
2786 /* must be called with manifest_lock taken
2787 * This function will temporarily release manifest_lock in order to join the
2789 * The api_lock will still protect it against other threads trying to modify
2790 * the demux element.
2793 gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux, gboolean stop_updates)
2795 GST_LOG_OBJECT (demux, "Stopping tasks");
2798 gst_adaptive_demux_stop_manifest_update_task (demux);
2800 TRACKS_LOCK (demux);
2801 if (demux->input_period)
2802 gst_adaptive_demux_period_stop_tasks (demux->input_period);
2804 demux->priv->flushing = TRUE;
2805 g_cond_signal (&demux->priv->tracks_add);
2806 gst_task_stop (demux->priv->output_task);
2807 TRACKS_UNLOCK (demux);
2809 gst_task_join (demux->priv->output_task);
2811 demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2814 /* must be called with manifest_lock taken */
2816 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event)
2819 gboolean ret = TRUE;
2821 GST_DEBUG_OBJECT (demux, "event %" GST_PTR_FORMAT, event);
2823 TRACKS_LOCK (demux);
2824 for (iter = demux->priv->outputs; iter; iter = g_list_next (iter)) {
2825 OutputSlot *slot = (OutputSlot *) iter->data;
2826 gst_event_ref (event);
2827 GST_DEBUG_OBJECT (slot->pad, "Pushing event");
2828 ret = ret & gst_pad_push_event (slot->pad, event);
2829 if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
2830 slot->pushed_timed_data = FALSE;
2832 TRACKS_UNLOCK (demux);
2833 gst_event_unref (event);
2837 /* must be called with manifest_lock taken */
2839 gst_adaptive_demux2_stream_set_caps (GstAdaptiveDemux2Stream * stream,
2842 GST_DEBUG_OBJECT (stream,
2843 "setting new caps for stream %" GST_PTR_FORMAT, caps);
2844 gst_caps_replace (&stream->pending_caps, caps);
2845 gst_caps_unref (caps);
2848 /* must be called with manifest_lock taken */
2850 gst_adaptive_demux2_stream_set_tags (GstAdaptiveDemux2Stream * stream,
2853 GST_DEBUG_OBJECT (stream,
2854 "setting new tags for stream %" GST_PTR_FORMAT, tags);
2855 gst_clear_tag_list (&stream->pending_tags);
2856 stream->pending_tags = tags;
2859 /* must be called with manifest_lock taken */
2861 gst_adaptive_demux2_stream_queue_event (GstAdaptiveDemux2Stream * stream,
2864 stream->pending_events = g_list_append (stream->pending_events, event);
2868 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
2874 /* Called when a stream needs waking after the manifest is updated */
2876 gst_adaptive_demux2_stream_wants_manifest_update (GstAdaptiveDemux * demux)
2878 demux->priv->stream_waiting_for_manifest = TRUE;
2882 gst_adaptive_demux_manifest_update_cb (GstAdaptiveDemux * demux)
2884 GstFlowReturn ret = GST_FLOW_OK;
2885 gboolean schedule_again = TRUE;
2887 GST_MANIFEST_LOCK (demux);
2888 demux->priv->manifest_updates_cb = 0;
2890 /* Updating playlist only needed for live playlists */
2891 if (!gst_adaptive_demux_is_live (demux)) {
2892 GST_MANIFEST_UNLOCK (demux);
2893 return G_SOURCE_REMOVE;
2896 GST_DEBUG_OBJECT (demux, "Updating playlist");
2897 ret = gst_adaptive_demux_update_manifest (demux);
2899 if (ret == GST_FLOW_EOS) {
2900 GST_MANIFEST_UNLOCK (demux);
2901 return G_SOURCE_REMOVE;
2904 if (ret == GST_FLOW_OK) {
2905 GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
2906 demux->priv->update_failed_count = 0;
2908 /* Wake up download tasks */
2909 if (demux->priv->stream_waiting_for_manifest) {
2912 for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
2913 GstAdaptiveDemux2Stream *stream = iter->data;
2914 gst_adaptive_demux2_stream_on_manifest_update (stream);
2916 demux->priv->stream_waiting_for_manifest = FALSE;
2919 demux->priv->update_failed_count++;
2921 if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
2922 GST_WARNING_OBJECT (demux, "Could not update the playlist, flow: %s",
2923 gst_flow_get_name (ret));
2925 GST_ELEMENT_ERROR (demux, STREAM, FAILED,
2926 (_("Internal data stream error.")), ("Could not update playlist"));
2927 GST_DEBUG_OBJECT (demux, "Stopped manifest updates because of error");
2928 schedule_again = FALSE;
2932 if (ret == GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC)
2933 gst_adaptive_demux_handle_lost_sync (demux);
2935 if (schedule_again) {
2936 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2938 demux->priv->manifest_updates_cb =
2939 gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
2940 klass->get_manifest_update_interval (demux) * GST_USECOND,
2941 (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
2944 GST_MANIFEST_UNLOCK (demux);
2946 return G_SOURCE_REMOVE;
2950 gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux)
2952 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2954 /* Loop for updating of the playlist. This periodically checks if
2955 * the playlist is updated and does so, then signals the streaming
2956 * thread in case it can continue downloading now. */
2958 /* block until the next scheduled update or the signal to quit this thread */
2959 GST_DEBUG_OBJECT (demux, "Started updates task");
2960 demux->priv->manifest_updates_cb =
2961 gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
2962 klass->get_manifest_update_interval (demux) * GST_USECOND,
2963 (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
2965 return G_SOURCE_REMOVE;
2969 find_replacement_slot_for_track (GstAdaptiveDemux * demux,
2970 GstAdaptiveDemuxTrack * track)
2974 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
2975 OutputSlot *slot = (OutputSlot *) tmp->data;
2976 /* Incompatible output type */
2977 if (slot->type != track->type)
2980 /* Slot which is already assigned to this pending track */
2981 if (slot->pending_track == track)
2984 /* slot already used for another pending track */
2985 if (slot->pending_track != NULL)
2988 /* Current output track is of the same type and is draining */
2989 if (slot->track && slot->track->draining)
2996 /* TRACKS_LOCK taken */
2998 find_slot_for_track (GstAdaptiveDemux * demux, GstAdaptiveDemuxTrack * track)
3002 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3003 OutputSlot *slot = (OutputSlot *) tmp->data;
3005 if (slot->track == track)
3012 /* TRACKS_LOCK held */
3014 all_selected_tracks_are_active (GstAdaptiveDemux * demux, guint32 seqnum)
3019 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3020 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3022 if (track->selected && !track->active)
3026 /* All selected tracks are active, created message */
3028 gst_message_new_streams_selected (GST_OBJECT (demux),
3029 demux->output_period->collection);
3030 GST_MESSAGE_SEQNUM (msg) = seqnum;
3031 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3032 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3033 if (track->active) {
3034 gst_message_streams_selected_add (msg, track->stream_object);
3042 gst_adaptive_demux_send_initial_events (GstAdaptiveDemux * demux,
3045 GstAdaptiveDemuxTrack *track = slot->track;
3048 /* Send EVENT_STREAM_START */
3049 event = gst_event_new_stream_start (track->stream_id);
3050 if (demux->have_group_id)
3051 gst_event_set_group_id (event, demux->group_id);
3052 gst_event_set_stream_flags (event, track->flags);
3053 gst_event_set_stream (event, track->stream_object);
3054 GST_DEBUG_OBJECT (demux, "Sending stream-start for track '%s'",
3056 gst_pad_push_event (slot->pad, event);
3058 /* Send EVENT_STREAM_COLLECTION */
3059 event = gst_event_new_stream_collection (demux->output_period->collection);
3060 GST_DEBUG_OBJECT (demux, "Sending stream-collection for track '%s'",
3062 gst_pad_push_event (slot->pad, event);
3064 /* Mark all sticky events for re-sending */
3065 gst_event_store_mark_all_undelivered (&track->sticky_events);
3069 * Called with TRACKS_LOCK taken
3072 check_and_handle_selection_update_locked (GstAdaptiveDemux * demux)
3075 guint requested_selection_seqnum;
3078 /* If requested_selection_seqnum != current_selection_seqnum, re-check all
3079 output slots vs active/draining tracks */
3080 requested_selection_seqnum =
3081 g_atomic_int_get (&demux->priv->requested_selection_seqnum);
3083 if (requested_selection_seqnum == demux->priv->current_selection_seqnum)
3086 GST_DEBUG_OBJECT (demux, "Selection changed, re-checking all output slots");
3088 /* Go over all slots, and if they have a pending track that's no longer
3089 * selected, clear it so the slot can be reused */
3090 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3091 OutputSlot *slot = (OutputSlot *) tmp->data;
3093 if (slot->pending_track != NULL && !slot->pending_track->selected) {
3094 GST_DEBUG_OBJECT (demux,
3095 "Removing deselected track '%s' as pending from output of current track '%s'",
3096 slot->pending_track->stream_id, slot->track->stream_id);
3097 gst_adaptive_demux_track_unref (slot->pending_track);
3098 slot->pending_track = NULL;
3102 /* Go over all tracks and create/re-assign/remove slots */
3103 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3104 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3106 if (track->selected) {
3107 OutputSlot *slot = find_slot_for_track (demux, track);
3109 /* 0. Track is selected and has a slot. Nothing to do */
3111 GST_DEBUG_OBJECT (demux, "Track '%s' is already being outputted",
3116 slot = find_replacement_slot_for_track (demux, track);
3118 /* 1. There is an existing slot of the same type which is currently
3119 * draining, assign this track as a replacement for it */
3120 g_assert (slot->pending_track == NULL || slot->pending_track == track);
3121 if (slot->pending_track == NULL) {
3122 slot->pending_track = gst_adaptive_demux_track_ref (track);
3123 GST_DEBUG_OBJECT (demux,
3124 "Track '%s' (period %u) will be used on output of track '%s' (period %u)",
3125 track->stream_id, track->period_num,
3126 slot->track->stream_id, slot->track->period_num);
3129 /* 2. There is no compatible replacement slot, create a new one */
3130 slot = gst_adaptive_demux_output_slot_new (demux, track->type);
3131 GST_DEBUG_OBJECT (demux, "Created slot for track '%s'",
3133 demux->priv->outputs = g_list_append (demux->priv->outputs, slot);
3135 track->update_next_segment = TRUE;
3137 slot->track = gst_adaptive_demux_track_ref (track);
3138 track->active = TRUE;
3139 gst_adaptive_demux_send_initial_events (demux, slot);
3142 /* If we were draining this track, we no longer are */
3143 track->draining = FALSE;
3147 /* Finally check all slots have a current/pending track. If not remove it */
3148 for (tmp = demux->priv->outputs; tmp;) {
3149 OutputSlot *slot = (OutputSlot *) tmp->data;
3150 /* We should never has slots without target tracks */
3151 g_assert (slot->track);
3152 if (slot->track->draining && !slot->pending_track) {
3153 GstAdaptiveDemux2Stream *stream;
3155 GST_DEBUG_OBJECT (demux, "Output for track '%s' is no longer used",
3156 slot->track->stream_id);
3157 slot->track->active = FALSE;
3159 /* If the stream feeding this track is stopped, flush and clear
3160 * the track now that it's going inactive. If the stream was not
3161 * found, it means we advanced past that period already (and the
3162 * stream was stopped and discarded) */
3163 stream = find_stream_for_track_locked (demux, slot->track);
3164 if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3165 gst_adaptive_demux_track_flush (slot->track);
3167 tmp = demux->priv->outputs = g_list_remove (demux->priv->outputs, slot);
3168 gst_adaptive_demux_output_slot_free (demux, slot);
3173 demux->priv->current_selection_seqnum = requested_selection_seqnum;
3174 msg = all_selected_tracks_are_active (demux, requested_selection_seqnum);
3176 TRACKS_UNLOCK (demux);
3177 GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3178 gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3179 TRACKS_LOCK (demux);
3183 /* TRACKS_LOCK held */
3185 gst_adaptive_demux_advance_output_period (GstAdaptiveDemux * demux)
3188 GstAdaptiveDemuxPeriod *previous_period;
3189 GstStreamCollection *collection;
3191 /* Grab the next period, should be demux->periods->next->data */
3192 previous_period = g_queue_pop_head (demux->priv->periods);
3194 /* Remove ref held by demux->output_period */
3195 gst_adaptive_demux_period_unref (previous_period);
3196 demux->output_period =
3197 gst_adaptive_demux_period_ref (g_queue_peek_head (demux->priv->periods));
3199 GST_DEBUG_OBJECT (demux, "Moved output to period %d",
3200 demux->output_period->period_num);
3202 /* We can now post the collection of the new period */
3203 collection = demux->output_period->collection;
3204 TRACKS_UNLOCK (demux);
3205 gst_element_post_message (GST_ELEMENT_CAST (demux),
3206 gst_message_new_stream_collection (GST_OBJECT (demux), collection));
3207 TRACKS_LOCK (demux);
3209 /* Unselect all tracks of the previous period */
3210 for (iter = previous_period->tracks; iter; iter = iter->next) {
3211 GstAdaptiveDemuxTrack *track = iter->data;
3212 if (track->selected) {
3213 track->selected = FALSE;
3214 track->draining = TRUE;
3218 /* Force a selection re-check */
3219 g_atomic_int_inc (&demux->priv->requested_selection_seqnum);
3220 check_and_handle_selection_update_locked (demux);
3222 /* Remove the final ref on the previous period now that we have done the switch */
3223 gst_adaptive_demux_period_unref (previous_period);
3228 /* Called with TRACKS_LOCK taken */
3230 handle_slot_pending_track_switch_locked (GstAdaptiveDemux * demux,
3233 GstAdaptiveDemuxTrack *track = slot->track;
3235 gboolean pending_is_ready;
3236 GstAdaptiveDemux2Stream *stream;
3238 /* If we have a pending track for this slot, the current track should be
3239 * draining and no longer selected */
3240 g_assert (track->draining && !track->selected);
3242 /* If we're draining, check if the pending track has enough data *or* that
3243 we've already drained out entirely */
3245 (slot->pending_track->level_time >=
3246 slot->pending_track->buffering_threshold);
3247 pending_is_ready |= slot->pending_track->eos;
3249 if (!pending_is_ready && gst_queue_array_get_length (track->queue) > 0) {
3250 GST_DEBUG_OBJECT (demux,
3251 "Replacement track '%s' doesn't have enough data for switching yet",
3252 slot->pending_track->stream_id);
3256 GST_DEBUG_OBJECT (demux,
3257 "Pending replacement track has enough data, switching");
3258 track->active = FALSE;
3259 track->draining = FALSE;
3261 /* If the stream feeding this track is stopped, flush and clear
3262 * the track now that it's going inactive. If the stream was not
3263 * found, it means we advanced past that period already (and the
3264 * stream was stopped and discarded) */
3265 stream = find_stream_for_track_locked (demux, track);
3266 if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3267 gst_adaptive_demux_track_flush (track);
3269 gst_adaptive_demux_track_unref (track);
3270 /* We steal the reference of pending_track */
3271 track = slot->track = slot->pending_track;
3272 slot->pending_track = NULL;
3273 slot->track->active = TRUE;
3275 /* Make sure the track segment will start at the current position */
3276 track->update_next_segment = TRUE;
3278 /* Send stream start and collection, and schedule sticky events */
3279 gst_adaptive_demux_send_initial_events (demux, slot);
3281 /* Can we emit the streams-selected message now ? */
3283 all_selected_tracks_are_active (demux,
3284 g_atomic_int_get (&demux->priv->requested_selection_seqnum));
3286 TRACKS_UNLOCK (demux);
3287 GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3288 gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3289 TRACKS_LOCK (demux);
3295 gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux)
3298 GstClockTimeDiff global_output_position = GST_CLOCK_STIME_NONE;
3299 gboolean wait_for_data = FALSE;
3300 gboolean all_tracks_empty;
3303 GST_DEBUG_OBJECT (demux, "enter");
3305 TRACKS_LOCK (demux);
3307 /* Check if stopping */
3308 if (demux->priv->flushing) {
3309 ret = GST_FLOW_FLUSHING;
3313 /* If the selection changed, handle it */
3314 check_and_handle_selection_update_locked (demux);
3318 global_output_position = GST_CLOCK_STIME_NONE;
3319 all_tracks_empty = TRUE;
3321 if (wait_for_data) {
3322 GST_DEBUG_OBJECT (demux, "Waiting for data");
3323 g_cond_wait (&demux->priv->tracks_add, &demux->priv->tracks_lock);
3324 GST_DEBUG_OBJECT (demux, "Done waiting for data");
3325 if (demux->priv->flushing) {
3326 ret = GST_FLOW_FLUSHING;
3329 wait_for_data = FALSE;
3332 /* Grab/Recalculate current global output position
3333 * This is the minimum pending output position of all tracks used for output
3335 * If there is a track which is empty and not EOS, wait for it to receive data
3336 * then recalculate global output position.
3338 * This also pushes downstream all non-timed data that might be present.
3340 * IF all tracks are EOS : stop task
3342 GST_LOG_OBJECT (demux, "Calculating global output position of output slots");
3343 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3344 OutputSlot *slot = (OutputSlot *) tmp->data;
3345 GstAdaptiveDemuxTrack *track;
3347 /* If there is a pending track, Check if it's time to switch to it */
3348 if (slot->pending_track)
3349 handle_slot_pending_track_switch_locked (demux, slot);
3351 track = slot->track;
3353 if (!track->active) {
3354 /* Note: Edward: I can't see in what cases we would end up with inactive
3355 tracks assigned to slots. */
3356 GST_ERROR_OBJECT (demux, "FIXME : Handle track switching");
3357 g_assert (track->active);
3361 if (track->next_position == GST_CLOCK_STIME_NONE) {
3362 gst_adaptive_demux_track_update_next_position (track);
3365 GST_TRACE_OBJECT (demux,
3366 "Looking at track %s (period %u). next_position %" GST_STIME_FORMAT,
3367 track->stream_id, track->period_num,
3368 GST_STIME_ARGS (track->next_position));
3370 if (track->next_position != GST_CLOCK_STIME_NONE) {
3371 if (global_output_position == GST_CLOCK_STIME_NONE)
3372 global_output_position = track->next_position;
3374 global_output_position =
3375 MIN (global_output_position, track->next_position);
3376 track->waiting_add = FALSE;
3377 all_tracks_empty = FALSE;
3378 } else if (!track->eos) {
3379 GST_DEBUG_OBJECT (demux, "Need timed data on track %s (period %u)",
3380 track->stream_id, track->period_num);
3381 all_tracks_empty = FALSE;
3382 wait_for_data = track->waiting_add = TRUE;
3384 GST_DEBUG_OBJECT (demux,
3385 "Track %s (period %u) is EOS, not waiting for timed data",
3386 track->stream_id, track->period_num);
3388 if (gst_queue_array_get_length (track->queue) > 0) {
3389 all_tracks_empty = FALSE;
3397 if (all_tracks_empty && demux->output_period->has_next_period) {
3398 GST_DEBUG_OBJECT (demux, "Period %d is drained, switching to next period",
3399 demux->output_period->period_num);
3400 if (!gst_adaptive_demux_advance_output_period (demux)) {
3401 /* Failed to move to next period, error out */
3402 ret = GST_FLOW_ERROR;
3405 /* Restart the loop */
3409 GST_DEBUG_OBJECT (demux, "Outputting data for position %" GST_STIME_FORMAT,
3410 GST_STIME_ARGS (global_output_position));
3414 * We know all active tracks have pending timed data
3415 * * while track next_position <= global output position
3416 * * push pending data
3417 * * Update track next_position
3418 * * recalculate global output position
3419 * * Pop next pending data from track and update pending position
3422 gboolean need_restart = FALSE;
3424 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3425 OutputSlot *slot = (OutputSlot *) tmp->data;
3426 GstAdaptiveDemuxTrack *track = slot->track;
3428 GST_LOG_OBJECT (track->element,
3429 "active:%d draining:%d selected:%d next_position:%" GST_STIME_FORMAT
3430 " global_output_position:%" GST_STIME_FORMAT, track->active,
3431 track->draining, track->selected, GST_STIME_ARGS (track->next_position),
3432 GST_STIME_ARGS (global_output_position));
3437 while (global_output_position == GST_CLOCK_STIME_NONE
3438 || !slot->pushed_timed_data
3439 || ((track->next_position != GST_CLOCK_STIME_NONE)
3440 && track->next_position <= global_output_position)
3441 || ((track->next_position == GST_CLOCK_STIME_NONE) && track->eos)) {
3442 GstMiniObject *mo = track_dequeue_data_locked (demux, track, TRUE);
3445 GST_DEBUG_OBJECT (demux,
3446 "Track '%s' (period %u) doesn't have any pending data (eos:%d pushed_timed_data:%d)",
3447 track->stream_id, track->period_num, track->eos,
3448 slot->pushed_timed_data);
3449 /* This should only happen if the track is EOS, or exactly in between
3450 * the parser outputting segment/caps before buffers. */
3451 g_assert (track->eos || !slot->pushed_timed_data);
3453 /* If we drained the track, but there's a pending track on the slot
3454 * loop again to activate it */
3455 if (slot->pending_track) {
3456 GST_DEBUG_OBJECT (demux,
3457 "Track '%s' (period %u) drained, but has a pending track to activate",
3458 track->stream_id, track->period_num);
3464 demux_update_buffering_locked (demux);
3465 demux_post_buffering_locked (demux);
3466 TRACKS_UNLOCK (demux);
3468 GST_DEBUG_OBJECT (demux,
3469 "Track '%s' (period %u) dequeued %" GST_PTR_FORMAT, track->stream_id,
3470 track->period_num, mo);
3472 if (GST_IS_EVENT (mo)) {
3473 GstEvent *event = (GstEvent *) mo;
3474 if (GST_EVENT_TYPE (event) == GST_EVENT_GAP) {
3475 slot->pushed_timed_data = TRUE;
3476 } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
3477 /* If there is a pending next period, don't send the EOS */
3478 if (demux->output_period->has_next_period) {
3479 GST_LOG_OBJECT (demux,
3480 "Dropping EOS on track '%s' (period %u) before next period",
3481 track->stream_id, track->period_num);
3482 gst_event_store_mark_delivered (&track->sticky_events, event);
3483 gst_event_unref (event);
3485 /* We'll need to re-check if all tracks are empty again above */
3486 need_restart = TRUE;
3490 if (event != NULL) {
3491 gst_pad_push_event (slot->pad, gst_event_ref (event));
3493 if (GST_EVENT_IS_STICKY (event))
3494 gst_event_store_mark_delivered (&track->sticky_events, event);
3495 gst_event_unref (event);
3497 } else if (GST_IS_BUFFER (mo)) {
3498 GstBuffer *buffer = (GstBuffer *) mo;
3500 if (track->output_discont) {
3501 if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT)) {
3502 buffer = gst_buffer_make_writable (buffer);
3503 GST_DEBUG_OBJECT (slot->pad,
3504 "track %s marking discont %" GST_PTR_FORMAT, track->stream_id,
3506 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
3508 track->output_discont = FALSE;
3510 slot->flow_ret = gst_pad_push (slot->pad, buffer);
3512 gst_flow_combiner_update_pad_flow (demux->priv->flowcombiner,
3513 slot->pad, slot->flow_ret);
3514 GST_DEBUG_OBJECT (slot->pad,
3515 "track %s (period %u) push returned %s (combined %s)",
3516 track->stream_id, track->period_num,
3517 gst_flow_get_name (slot->flow_ret), gst_flow_get_name (ret));
3518 slot->pushed_timed_data = TRUE;
3520 GST_ERROR ("Unhandled miniobject %" GST_PTR_FORMAT, mo);
3523 TRACKS_LOCK (demux);
3524 gst_adaptive_demux_track_update_next_position (track);
3526 if (ret != GST_FLOW_OK)
3531 /* Store global output position */
3532 if (global_output_position != GST_CLOCK_STIME_NONE) {
3533 demux->priv->global_output_position = global_output_position;
3535 /* And see if any streams need to be woken for more input */
3536 gst_adaptive_demux_period_check_input_wakeup_locked (demux->input_period,
3537 global_output_position);
3543 if (global_output_position == GST_CLOCK_STIME_NONE) {
3544 if (!demux->priv->flushing) {
3545 GST_DEBUG_OBJECT (demux,
3546 "Pausing output task after reaching NONE global_output_position");
3547 gst_task_pause (demux->priv->output_task);
3551 TRACKS_UNLOCK (demux);
3552 GST_DEBUG_OBJECT (demux, "leave");
3557 GST_DEBUG_OBJECT (demux, "Pausing due to %s", gst_flow_get_name (ret));
3558 /* If the flushing flag is set, then the task is being
3559 * externally stopped, so don't go to pause(), otherwise we
3560 * should so we don't keep spinning */
3561 if (!demux->priv->flushing) {
3562 GST_DEBUG_OBJECT (demux, "Pausing task due to %s",
3563 gst_flow_get_name (ret));
3564 gst_task_pause (demux->priv->output_task);
3567 TRACKS_UNLOCK (demux);
3569 if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
3570 GstEvent *eos = gst_event_new_eos ();
3572 if (ret != GST_FLOW_EOS) {
3573 GST_ELEMENT_FLOW_ERROR (demux, ret);
3576 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3577 if (demux->priv->segment_seqnum != GST_SEQNUM_INVALID)
3578 gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
3579 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3581 gst_adaptive_demux_push_src_event (demux, eos);
3588 /* must be called from the scheduler */
3590 gst_adaptive_demux_is_live (GstAdaptiveDemux * demux)
3592 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3595 return klass->is_live (demux);
3600 handle_manifest_download_complete (DownloadRequest * request,
3601 DownloadRequestState state, GstAdaptiveDemux * demux)
3603 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3605 GstFlowReturn result;
3607 g_free (demux->manifest_base_uri);
3608 g_free (demux->manifest_uri);
3610 if (request->redirect_permanent && request->redirect_uri) {
3611 demux->manifest_uri = g_strdup (request->redirect_uri);
3612 demux->manifest_base_uri = NULL;
3614 demux->manifest_uri = g_strdup (request->uri);
3615 demux->manifest_base_uri = g_strdup (request->redirect_uri);
3618 buffer = download_request_take_buffer (request);
3620 /* We should always have a buffer since this function is the non-error
3621 * callback for the download */
3624 result = klass->update_manifest_data (demux, buffer);
3625 gst_buffer_unref (buffer);
3627 /* FIXME: Should the manifest uri vars be reverted to original
3628 * values if updating fails? */
3630 if (result == GST_FLOW_OK) {
3631 GstClockTime duration;
3632 /* Send an updated duration message */
3633 duration = klass->get_duration (demux);
3634 if (duration != GST_CLOCK_TIME_NONE) {
3635 GST_DEBUG_OBJECT (demux,
3636 "Sending duration message : %" GST_TIME_FORMAT,
3637 GST_TIME_ARGS (duration));
3638 gst_element_post_message (GST_ELEMENT (demux),
3639 gst_message_new_duration_changed (GST_OBJECT (demux)));
3641 GST_DEBUG_OBJECT (demux,
3642 "Duration unknown, can not send the duration message");
3645 /* If a manifest changes it's liveness or periodic updateness, we need
3646 * to start/stop the manifest update task appropriately */
3647 /* Keep this condition in sync with the one in
3648 * gst_adaptive_demux_start_manifest_update_task()
3650 if (gst_adaptive_demux_is_live (demux) &&
3651 klass->requires_periodical_playlist_update (demux)) {
3652 gst_adaptive_demux_start_manifest_update_task (demux);
3654 gst_adaptive_demux_stop_manifest_update_task (demux);
3660 handle_manifest_download_failure (DownloadRequest * request,
3661 DownloadRequestState state, GstAdaptiveDemux * demux)
3663 GST_FIXME_OBJECT (demux, "Manifest download failed.");
3664 /* Retry or error out here */
3667 static GstFlowReturn
3668 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux)
3670 DownloadRequest *request;
3671 GstFlowReturn ret = GST_FLOW_OK;
3672 GError *error = NULL;
3674 request = download_request_new_uri (demux->manifest_uri);
3676 download_request_set_callbacks (request,
3677 (DownloadRequestEventCallback) handle_manifest_download_complete,
3678 (DownloadRequestEventCallback) handle_manifest_download_failure,
3681 if (!downloadhelper_submit_request (demux->download_helper, NULL,
3682 DOWNLOAD_FLAG_COMPRESS | DOWNLOAD_FLAG_FORCE_REFRESH, request,
3685 GST_ELEMENT_WARNING (demux, RESOURCE, FAILED,
3686 ("Failed to download manifest: %s", error->message), (NULL));
3687 g_clear_error (&error);
3689 ret = GST_FLOW_NOT_LINKED;
3695 /* must be called with manifest_lock taken */
3697 gst_adaptive_demux_update_manifest (GstAdaptiveDemux * demux)
3699 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3702 ret = klass->update_manifest (demux);
3707 /* must be called with manifest_lock taken */
3709 gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
3711 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3712 gboolean ret = FALSE;
3714 if (klass->has_next_period)
3715 ret = klass->has_next_period (demux);
3716 GST_DEBUG_OBJECT (demux, "Has next period: %d", ret);
3720 /* must be called with manifest_lock taken */
3722 gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
3724 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3725 GstAdaptiveDemuxPeriod *previous_period = demux->input_period;
3727 g_return_if_fail (klass->advance_period != NULL);
3729 GST_DEBUG_OBJECT (demux, "Advancing to next period");
3730 /* FIXME : no return value ? What if it fails ? */
3731 klass->advance_period (demux);
3733 if (previous_period == demux->input_period) {
3734 GST_ERROR_OBJECT (demux, "Advancing period failed");
3738 /* Stop the previous period stream tasks */
3739 gst_adaptive_demux_period_stop_tasks (previous_period);
3741 gst_adaptive_demux_update_collection (demux, demux->input_period);
3742 /* Figure out a pre-emptive selection based on the output period selection */
3743 gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
3744 demux->output_period);
3746 gst_adaptive_demux_prepare_streams (demux, FALSE);
3747 gst_adaptive_demux_start_tasks (demux);
3751 * gst_adaptive_demux_get_monotonic_time:
3752 * Returns: a monotonically increasing time, using the system realtime clock
3755 gst_adaptive_demux2_get_monotonic_time (GstAdaptiveDemux * demux)
3757 g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
3758 return gst_adaptive_demux_clock_get_time (demux->realtime_clock);
3762 * gst_adaptive_demux_get_client_now_utc:
3763 * @demux: #GstAdaptiveDemux
3764 * Returns: the client's estimate of UTC
3766 * Used to find the client's estimate of UTC, using the system realtime clock.
3769 gst_adaptive_demux2_get_client_now_utc (GstAdaptiveDemux * demux)
3771 return gst_adaptive_demux_clock_get_now_utc (demux->realtime_clock);
3775 * gst_adaptive_demux_is_running
3776 * @demux: #GstAdaptiveDemux
3777 * Returns: whether the demuxer is processing data
3779 * Returns FALSE if shutdown has started (transitioning down from
3780 * PAUSED), otherwise TRUE.
3783 gst_adaptive_demux2_is_running (GstAdaptiveDemux * demux)
3785 return g_atomic_int_get (&demux->running);
3789 * gst_adaptive_demux_get_qos_earliest_time:
3791 * Returns: The QOS earliest time
3796 gst_adaptive_demux2_get_qos_earliest_time (GstAdaptiveDemux * demux)
3798 GstClockTime earliest;
3800 GST_OBJECT_LOCK (demux);
3801 earliest = demux->priv->qos_earliest_time;
3802 GST_OBJECT_UNLOCK (demux);
3808 gst_adaptive_demux2_add_stream (GstAdaptiveDemux * demux,
3809 GstAdaptiveDemux2Stream * stream)
3811 g_return_val_if_fail (demux && stream, FALSE);
3813 /* FIXME : Migrate to parent */
3814 g_return_val_if_fail (stream->demux == NULL, FALSE);
3816 GST_DEBUG_OBJECT (demux, "Adding stream %s", GST_OBJECT_NAME (stream));
3818 TRACKS_LOCK (demux);
3819 if (demux->input_period->prepared) {
3820 GST_ERROR_OBJECT (demux,
3821 "Attempted to add streams but no new period was created");
3822 TRACKS_UNLOCK (demux);
3825 stream->demux = demux;
3826 stream->period = demux->input_period;
3827 demux->input_period->streams =
3828 g_list_append (demux->input_period->streams, stream);
3830 if (stream->tracks) {
3832 for (iter = stream->tracks; iter; iter = iter->next)
3833 if (!gst_adaptive_demux_period_add_track (demux->input_period,
3834 (GstAdaptiveDemuxTrack *) iter->data)) {
3835 GST_ERROR_OBJECT (demux, "Failed to add track elements");
3836 TRACKS_UNLOCK (demux);
3840 TRACKS_UNLOCK (demux);
3844 /* Return the current playback rate including any instant rate multiplier */
3846 gst_adaptive_demux_play_rate (GstAdaptiveDemux * demux)
3849 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3850 rate = demux->segment.rate * demux->instant_rate_multiplier;
3851 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);