3 * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4 * Author: Thiago Santos <thiagoss@osg.samsung.com>
6 * Copyright (C) 2021-2022 Centricular Ltd
7 * Author: Edward Hervey <edward@centricular.com>
8 * Author: Jan Schmidt <jan@centricular.com>
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Library General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Library General Public License for more details.
20 * You should have received a copy of the GNU Library General Public
21 * License along with this library; if not, write to the
22 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23 * Boston, MA 02110-1301, USA.
27 * SECTION:plugin-adaptivedemux2
28 * @short_description: Next Generation adaptive demuxers
30 * What is an adaptive demuxer? Adaptive demuxers are special demuxers in the
31 * sense that they don't actually demux data received from upstream but download
32 * the data themselves.
34 * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and a set
35 * of fragments. The manifest describes the available media and the sequence of
36 * fragments to use. Each fragment contains a small part of the media (typically
37 * only a few seconds). It is possible for the manifest to have the same media
38 * available in different configurations (bitrates for example) so that the
39 * client can select the one that best suits its scenario (network fluctuation,
40 * hardware requirements...).
42 * Furthermore, that manifest can also specify alternative medias (such as audio
43 * or subtitle tracks in different languages). Only the fragments for the
44 * requested selection will be download.
46 * These elements can therefore "adapt" themselves to the network conditions (as
47 * opposed to the server doing that adaptation) and user choices, which is why
48 * they are called "adaptive" demuxers.
50 * Note: These elements require a "streams-aware" container to work
51 * (i.e. urisourcebin, decodebin3, playbin3, or any bin/pipeline with the
52 * GST_BIN_FLAG_STREAMS_AWARE flag set).
55 * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
56 * about the intrinsics of the subclass formats, so the subclasses are
57 * responsible for maintaining the manifest data structures and stream
64 See the adaptive-demuxer.md design documentation for more information
67 The following rules were observed while implementing MT safety in adaptive demux:
68 1. If a variable is accessed from multiple threads and at least one thread
69 writes to it, then all the accesses needs to be done from inside a critical section.
70 2. If thread A wants to join thread B then at the moment it calls gst_task_join
71 it must not hold any mutexes that thread B might take.
73 Adaptive demux API can be called from several threads. More, adaptive demux
74 starts some threads to monitor the download of fragments. In order to protect
75 accesses to shared variables (demux and streams) all the API functions that
76 can be run in different threads will need to get a mutex (manifest_lock)
77 when they start and release it when they end. Because some of those functions
78 can indirectly call other API functions (eg they can generate events or messages
79 that are processed in the same thread) the manifest_lock must be recursive.
81 The manifest_lock will serialize the public API making access to shared
82 variables safe. But some of these functions will try at some moment to join
83 threads created by adaptive demux, or to change the state of src elements
84 (which will block trying to join the src element streaming thread). Because
85 of rule 2, those functions will need to release the manifest_lock during the
86 call of gst_task_join. During this time they can be interrupted by other API calls.
87 For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
88 is called and this will join all threads. In order to prevent interruptions
89 during such period, all the API functions will also use a second lock: api_lock.
90 This will be taken at the beginning of the function and released at the end,
91 but this time this lock will not be temporarily released during join.
92 This lock will be used only by API calls (not by the SCHEDULER task)
93 so it is safe to hold it while joining the threads or changing the src element state. The
94 api_lock will serialise all external requests to adaptive demux. In order to
95 avoid deadlocks, if a function needs to acquire both manifest and api locks,
96 the api_lock will be taken first and the manifest_lock second.
98 By using the api_lock a thread is protected against other API calls.
105 #include "gstadaptivedemux.h"
106 #include "gstadaptivedemux-private.h"
108 #include <glib/gi18n-lib.h>
109 #include <gst/base/gstadapter.h>
110 #include <gst/app/gstappsrc.h>
112 GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
113 #define GST_CAT_DEFAULT adaptivedemux2_debug
115 #define DEFAULT_FAILED_COUNT 3
116 #define DEFAULT_CONNECTION_BITRATE 0
117 #define DEFAULT_BANDWIDTH_TARGET_RATIO 0.8f
119 #define DEFAULT_MIN_BITRATE 0
120 #define DEFAULT_MAX_BITRATE 0
122 #define DEFAULT_MAX_BUFFERING_TIME (30 * GST_SECOND)
124 #define DEFAULT_BUFFERING_HIGH_WATERMARK_TIME (30 * GST_SECOND)
125 #define DEFAULT_BUFFERING_LOW_WATERMARK_TIME 0 /* Automatic */
126 #define DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS 0.0
127 #define DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS 0.0
129 #define DEFAULT_CURRENT_LEVEL_TIME_VIDEO 0
130 #define DEFAULT_CURRENT_LEVEL_TIME_AUDIO 0
132 #define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
133 #define GST_API_LOCK(d) g_mutex_lock (GST_API_GET_LOCK (d));
134 #define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
139 PROP_CONNECTION_SPEED,
140 PROP_BANDWIDTH_TARGET_RATIO,
141 PROP_CONNECTION_BITRATE,
144 PROP_CURRENT_BANDWIDTH,
145 PROP_MAX_BUFFERING_TIME,
146 PROP_BUFFERING_HIGH_WATERMARK_TIME,
147 PROP_BUFFERING_LOW_WATERMARK_TIME,
148 PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
149 PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
150 PROP_CURRENT_LEVEL_TIME_VIDEO,
151 PROP_CURRENT_LEVEL_TIME_AUDIO,
155 static GstStaticPadTemplate gst_adaptive_demux_videosrc_template =
156 GST_STATIC_PAD_TEMPLATE ("video_%02u",
159 GST_STATIC_CAPS_ANY);
161 static GstStaticPadTemplate gst_adaptive_demux_audiosrc_template =
162 GST_STATIC_PAD_TEMPLATE ("audio_%02u",
165 GST_STATIC_CAPS_ANY);
167 static GstStaticPadTemplate gst_adaptive_demux_subtitlesrc_template =
168 GST_STATIC_PAD_TEMPLATE ("subtitle_%02u",
171 GST_STATIC_CAPS_ANY);
173 /* Private structure for a track being outputted */
174 typedef struct _OutputSlot
179 /* Last flow return */
180 GstFlowReturn flow_ret;
185 /* Target track (reference) */
186 GstAdaptiveDemuxTrack *track;
188 /* Pending track (which will replace track) */
189 GstAdaptiveDemuxTrack *pending_track;
191 /* TRUE if a buffer or a gap event was pushed through this slot. */
192 gboolean pushed_timed_data;
195 static GstBinClass *parent_class = NULL;
196 static gint private_offset = 0;
198 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
199 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
200 GstAdaptiveDemuxClass * klass);
201 static void gst_adaptive_demux_finalize (GObject * object);
202 static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
203 element, GstStateChange transition);
204 static gboolean gst_adaptive_demux_query (GstElement * element,
206 static gboolean gst_adaptive_demux_send_event (GstElement * element,
209 static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
211 static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
213 static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
214 GstObject * parent, GstBuffer * buffer);
215 static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
217 static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
219 static gboolean gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux,
221 static gboolean gst_adaptive_demux_handle_select_streams_event (GstAdaptiveDemux
222 * demux, GstEvent * event);
225 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
227 static void gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux);
228 static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
229 static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
230 gboolean first_and_live);
233 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
235 static void gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux *
237 static void gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux *
240 static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux);
241 static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux,
242 gboolean stop_updates);
245 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
248 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
249 * method to get to the padtemplates */
251 gst_adaptive_demux_ng_get_type (void)
253 static gsize type = 0;
255 if (g_once_init_enter (&type)) {
257 static const GTypeInfo info = {
258 sizeof (GstAdaptiveDemuxClass),
261 (GClassInitFunc) gst_adaptive_demux_class_init,
264 sizeof (GstAdaptiveDemux),
266 (GInstanceInitFunc) gst_adaptive_demux_init,
269 _type = g_type_register_static (GST_TYPE_BIN,
270 "GstAdaptiveDemux2", &info, G_TYPE_FLAG_ABSTRACT);
273 g_type_add_instance_private (_type, sizeof (GstAdaptiveDemuxPrivate));
275 g_once_init_leave (&type, _type);
280 static inline GstAdaptiveDemuxPrivate *
281 gst_adaptive_demux_get_instance_private (GstAdaptiveDemux * self)
283 return (G_STRUCT_MEMBER_P (self, private_offset));
287 gst_adaptive_demux_set_property (GObject * object, guint prop_id,
288 const GValue * value, GParamSpec * pspec)
290 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
292 GST_OBJECT_LOCK (demux);
295 case PROP_CONNECTION_SPEED:
296 demux->connection_speed = g_value_get_uint (value) * 1000;
297 GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
298 demux->connection_speed);
300 case PROP_BANDWIDTH_TARGET_RATIO:
301 demux->bandwidth_target_ratio = g_value_get_float (value);
303 case PROP_MIN_BITRATE:
304 demux->min_bitrate = g_value_get_uint (value);
306 case PROP_MAX_BITRATE:
307 demux->max_bitrate = g_value_get_uint (value);
309 case PROP_CONNECTION_BITRATE:
310 demux->connection_speed = g_value_get_uint (value);
312 /* FIXME: Recalculate track and buffering levels
313 * when watermarks change? */
314 case PROP_MAX_BUFFERING_TIME:
315 demux->max_buffering_time = g_value_get_uint64 (value);
317 case PROP_BUFFERING_HIGH_WATERMARK_TIME:
318 demux->buffering_high_watermark_time = g_value_get_uint64 (value);
320 case PROP_BUFFERING_LOW_WATERMARK_TIME:
321 demux->buffering_low_watermark_time = g_value_get_uint64 (value);
323 case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
324 demux->buffering_high_watermark_fragments = g_value_get_double (value);
326 case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
327 demux->buffering_low_watermark_fragments = g_value_get_double (value);
330 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
334 GST_OBJECT_UNLOCK (demux);
338 gst_adaptive_demux_get_property (GObject * object, guint prop_id,
339 GValue * value, GParamSpec * pspec)
341 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
343 GST_OBJECT_LOCK (demux);
346 case PROP_CONNECTION_SPEED:
347 g_value_set_uint (value, demux->connection_speed / 1000);
349 case PROP_BANDWIDTH_TARGET_RATIO:
350 g_value_set_float (value, demux->bandwidth_target_ratio);
352 case PROP_MIN_BITRATE:
353 g_value_set_uint (value, demux->min_bitrate);
355 case PROP_MAX_BITRATE:
356 g_value_set_uint (value, demux->max_bitrate);
358 case PROP_CONNECTION_BITRATE:
359 g_value_set_uint (value, demux->connection_speed);
361 case PROP_CURRENT_BANDWIDTH:
362 g_value_set_uint (value, demux->current_download_rate);
364 case PROP_MAX_BUFFERING_TIME:
365 g_value_set_uint64 (value, demux->max_buffering_time);
367 case PROP_BUFFERING_HIGH_WATERMARK_TIME:
368 g_value_set_uint64 (value, demux->buffering_high_watermark_time);
370 case PROP_BUFFERING_LOW_WATERMARK_TIME:
371 g_value_set_uint64 (value, demux->buffering_low_watermark_time);
373 case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
374 g_value_set_double (value, demux->buffering_high_watermark_fragments);
376 case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
377 g_value_set_double (value, demux->buffering_low_watermark_fragments);
379 case PROP_CURRENT_LEVEL_TIME_VIDEO:
380 g_value_set_uint64 (value, demux->current_level_time_video);
382 case PROP_CURRENT_LEVEL_TIME_AUDIO:
383 g_value_set_uint64 (value, demux->current_level_time_audio);
386 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
390 GST_OBJECT_UNLOCK (demux);
394 gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
396 GObjectClass *gobject_class;
397 GstElementClass *gstelement_class;
398 GstBinClass *gstbin_class;
400 gobject_class = G_OBJECT_CLASS (klass);
401 gstelement_class = GST_ELEMENT_CLASS (klass);
402 gstbin_class = GST_BIN_CLASS (klass);
404 GST_DEBUG_CATEGORY_INIT (adaptivedemux2_debug, "adaptivedemux2", 0,
405 "Base Adaptive Demux (ng)");
407 parent_class = g_type_class_peek_parent (klass);
409 if (private_offset != 0)
410 g_type_class_adjust_private_offset (klass, &private_offset);
412 gobject_class->set_property = gst_adaptive_demux_set_property;
413 gobject_class->get_property = gst_adaptive_demux_get_property;
414 gobject_class->finalize = gst_adaptive_demux_finalize;
416 g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
417 g_param_spec_uint ("connection-speed", "Connection Speed",
418 "Network connection speed to use in kbps (0 = calculate from downloaded"
419 " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_BITRATE / 1000,
420 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
422 g_object_class_install_property (gobject_class, PROP_BANDWIDTH_TARGET_RATIO,
423 g_param_spec_float ("bandwidth-target-ratio",
424 "Ratio of target bandwidth / available bandwidth",
425 "Limit of the available bitrate to use when switching to alternates",
426 0, 1, DEFAULT_BANDWIDTH_TARGET_RATIO,
427 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
429 g_object_class_install_property (gobject_class, PROP_CONNECTION_BITRATE,
430 g_param_spec_uint ("connection-bitrate", "Connection Speed (bits/s)",
431 "Network connection speed to use (0 = automatic) (bits/s)",
432 0, G_MAXUINT, DEFAULT_CONNECTION_BITRATE,
433 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
435 g_object_class_install_property (gobject_class, PROP_MIN_BITRATE,
436 g_param_spec_uint ("min-bitrate", "Minimum Bitrate",
437 "Minimum bitrate to use when switching to alternates (bits/s)",
438 0, G_MAXUINT, DEFAULT_MIN_BITRATE,
439 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
441 g_object_class_install_property (gobject_class, PROP_MAX_BITRATE,
442 g_param_spec_uint ("max-bitrate", "Maximum Bitrate",
443 "Maximum bitrate to use when switching to alternates (bits/s)",
444 0, G_MAXUINT, DEFAULT_MAX_BITRATE,
445 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
447 g_object_class_install_property (gobject_class, PROP_CURRENT_BANDWIDTH,
448 g_param_spec_uint ("current-bandwidth",
449 "Current download bandwidth (bits/s)",
450 "Report of current download bandwidth (based on arriving data) (bits/s)",
451 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
453 g_object_class_install_property (gobject_class, PROP_MAX_BUFFERING_TIME,
454 g_param_spec_uint64 ("max-buffering-time",
455 "Buffering maximum size (ns)",
456 "Upper limit on the high watermark for parsed data, above which downloads are paused (in ns, 0=disable)",
457 0, G_MAXUINT64, DEFAULT_MAX_BUFFERING_TIME,
458 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
459 G_PARAM_STATIC_STRINGS));
461 g_object_class_install_property (gobject_class,
462 PROP_BUFFERING_HIGH_WATERMARK_TIME,
463 g_param_spec_uint64 ("high-watermark-time",
464 "High buffering watermark size (ns)",
465 "High watermark for parsed data above which downloads are paused (in ns, 0=disable)",
466 0, G_MAXUINT64, DEFAULT_BUFFERING_HIGH_WATERMARK_TIME,
467 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
468 G_PARAM_STATIC_STRINGS));
470 g_object_class_install_property (gobject_class,
471 PROP_BUFFERING_LOW_WATERMARK_TIME,
472 g_param_spec_uint64 ("low-watermark-time",
473 "Low buffering watermark size (ns)",
474 "Low watermark for parsed data below which downloads are resumed (in ns, 0=automatic)",
475 0, G_MAXUINT64, DEFAULT_BUFFERING_LOW_WATERMARK_TIME,
476 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
477 G_PARAM_STATIC_STRINGS));
479 g_object_class_install_property (gobject_class,
480 PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
481 g_param_spec_double ("high-watermark-fragments",
482 "High buffering watermark size (fragments)",
483 "High watermark for parsed data above which downloads are paused (in fragments, 0=disable)",
484 0, G_MAXFLOAT, DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
485 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
486 G_PARAM_STATIC_STRINGS));
488 g_object_class_install_property (gobject_class,
489 PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
490 g_param_spec_double ("low-watermark-fragments",
491 "Low buffering watermark size (fragments)",
492 "Low watermark for parsed data below which downloads are resumed (in fragments, 0=disable)",
493 0, G_MAXFLOAT, DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS,
494 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
495 G_PARAM_STATIC_STRINGS));
497 g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_VIDEO,
498 g_param_spec_uint64 ("current-level-time-video",
499 "Currently buffered level of video (ns)",
500 "Currently buffered level of video track(s) (ns)",
501 0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_VIDEO,
502 G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
503 G_PARAM_STATIC_STRINGS));
505 g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_AUDIO,
506 g_param_spec_uint64 ("current-level-time-audio",
507 "Currently buffered level of audio (ns)",
508 "Currently buffered level of audio track(s) (ns)",
509 0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_AUDIO,
510 G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
511 G_PARAM_STATIC_STRINGS));
513 gst_element_class_add_static_pad_template (gstelement_class,
514 &gst_adaptive_demux_audiosrc_template);
515 gst_element_class_add_static_pad_template (gstelement_class,
516 &gst_adaptive_demux_videosrc_template);
517 gst_element_class_add_static_pad_template (gstelement_class,
518 &gst_adaptive_demux_subtitlesrc_template);
520 gstelement_class->change_state = gst_adaptive_demux_change_state;
521 gstelement_class->query = gst_adaptive_demux_query;
522 gstelement_class->send_event = gst_adaptive_demux_send_event;
524 gstbin_class->handle_message = gst_adaptive_demux_handle_message;
526 klass->update_manifest = gst_adaptive_demux_update_manifest_default;
527 klass->requires_periodical_playlist_update =
528 gst_adaptive_demux_requires_periodical_playlist_update_default;
529 gst_type_mark_as_plugin_api (GST_TYPE_ADAPTIVE_DEMUX, 0);
533 gst_adaptive_demux_init (GstAdaptiveDemux * demux,
534 GstAdaptiveDemuxClass * klass)
536 GstPadTemplate *pad_template;
538 GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
540 demux->priv = gst_adaptive_demux_get_instance_private (demux);
541 demux->priv->input_adapter = gst_adapter_new ();
542 demux->realtime_clock = gst_adaptive_demux_clock_new ();
544 demux->download_helper = downloadhelper_new (demux->realtime_clock);
545 demux->priv->segment_seqnum = gst_util_seqnum_next ();
546 demux->have_group_id = FALSE;
547 demux->group_id = G_MAXUINT;
549 gst_segment_init (&demux->segment, GST_FORMAT_TIME);
550 demux->instant_rate_multiplier = 1.0;
552 gst_bin_set_suppressed_flags (GST_BIN_CAST (demux),
553 GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
555 g_rec_mutex_init (&demux->priv->manifest_lock);
557 demux->priv->scheduler_task = gst_adaptive_demux_loop_new ();
559 g_mutex_init (&demux->priv->api_lock);
560 g_mutex_init (&demux->priv->segment_lock);
562 g_mutex_init (&demux->priv->tracks_lock);
563 g_cond_init (&demux->priv->tracks_add);
565 g_mutex_init (&demux->priv->buffering_lock);
567 demux->priv->periods = g_queue_new ();
570 gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
571 g_return_if_fail (pad_template != NULL);
573 demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
574 gst_pad_set_event_function (demux->sinkpad,
575 GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
576 gst_pad_set_chain_function (demux->sinkpad,
577 GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));
580 demux->bandwidth_target_ratio = DEFAULT_BANDWIDTH_TARGET_RATIO;
581 demux->connection_speed = DEFAULT_CONNECTION_BITRATE;
582 demux->min_bitrate = DEFAULT_MIN_BITRATE;
583 demux->max_bitrate = DEFAULT_MAX_BITRATE;
585 demux->max_buffering_time = DEFAULT_MAX_BUFFERING_TIME;
586 demux->buffering_high_watermark_time = DEFAULT_BUFFERING_HIGH_WATERMARK_TIME;
587 demux->buffering_low_watermark_time = DEFAULT_BUFFERING_LOW_WATERMARK_TIME;
588 demux->buffering_high_watermark_fragments =
589 DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS;
590 demux->buffering_low_watermark_fragments =
591 DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS;
593 demux->current_level_time_video = DEFAULT_CURRENT_LEVEL_TIME_VIDEO;
594 demux->current_level_time_audio = DEFAULT_CURRENT_LEVEL_TIME_AUDIO;
596 gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
598 demux->priv->duration = GST_CLOCK_TIME_NONE;
600 /* Output combiner */
601 demux->priv->flowcombiner = gst_flow_combiner_new ();
604 g_rec_mutex_init (&demux->priv->output_lock);
605 demux->priv->output_task =
606 gst_task_new ((GstTaskFunction) gst_adaptive_demux_output_loop, demux,
608 gst_task_set_lock (demux->priv->output_task, &demux->priv->output_lock);
612 gst_adaptive_demux_finalize (GObject * object)
614 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
615 GstAdaptiveDemuxPrivate *priv = demux->priv;
617 GST_DEBUG_OBJECT (object, "finalize");
619 g_object_unref (priv->input_adapter);
621 downloadhelper_free (demux->download_helper);
623 g_rec_mutex_clear (&demux->priv->manifest_lock);
624 g_mutex_clear (&demux->priv->api_lock);
625 g_mutex_clear (&demux->priv->segment_lock);
627 g_mutex_clear (&demux->priv->buffering_lock);
629 gst_adaptive_demux_loop_unref (demux->priv->scheduler_task);
631 /* The input period is present after a reset, clear it now */
632 if (demux->input_period)
633 gst_adaptive_demux_period_unref (demux->input_period);
635 if (demux->realtime_clock) {
636 gst_adaptive_demux_clock_unref (demux->realtime_clock);
637 demux->realtime_clock = NULL;
639 g_object_unref (priv->output_task);
640 g_rec_mutex_clear (&priv->output_lock);
642 gst_flow_combiner_free (priv->flowcombiner);
644 g_queue_free (priv->periods);
646 G_OBJECT_CLASS (parent_class)->finalize (object);
650 gst_adaptive_demux_check_streams_aware (GstAdaptiveDemux * demux)
652 gboolean ret = FALSE;
653 GstObject *parent = gst_object_get_parent (GST_OBJECT (demux));
656 ret = GST_OBJECT_FLAG_IS_SET (parent, GST_BIN_FLAG_STREAMS_AWARE);
657 gst_object_unref (parent);
663 static GstStateChangeReturn
664 gst_adaptive_demux_change_state (GstElement * element,
665 GstStateChange transition)
667 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
668 GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
670 switch (transition) {
671 case GST_STATE_CHANGE_NULL_TO_READY:
672 if (!gst_adaptive_demux_check_streams_aware (demux)) {
673 GST_ELEMENT_ERROR (demux, CORE, STATE_CHANGE,
674 (_("Element requires a streams-aware context.")), (NULL));
678 case GST_STATE_CHANGE_PAUSED_TO_READY:
679 if (g_atomic_int_compare_and_exchange (&demux->running, TRUE, FALSE))
680 GST_DEBUG_OBJECT (demux, "demuxer has stopped running");
682 gst_adaptive_demux_loop_stop (demux->priv->scheduler_task, TRUE);
683 downloadhelper_stop (demux->download_helper);
686 demux->priv->flushing = TRUE;
687 g_cond_signal (&demux->priv->tracks_add);
688 gst_task_stop (demux->priv->output_task);
689 TRACKS_UNLOCK (demux);
691 gst_task_join (demux->priv->output_task);
693 GST_API_LOCK (demux);
694 gst_adaptive_demux_reset (demux);
695 GST_API_UNLOCK (demux);
697 case GST_STATE_CHANGE_READY_TO_PAUSED:
698 GST_API_LOCK (demux);
699 gst_adaptive_demux_reset (demux);
701 gst_adaptive_demux_loop_start (demux->priv->scheduler_task);
702 if (g_atomic_int_get (&demux->priv->have_manifest))
703 gst_adaptive_demux_start_manifest_update_task (demux);
704 GST_API_UNLOCK (demux);
705 if (g_atomic_int_compare_and_exchange (&demux->running, FALSE, TRUE))
706 GST_DEBUG_OBJECT (demux, "demuxer has started running");
707 /* gst_task_start (demux->priv->output_task); */
713 /* this must be run with the scheduler and output tasks stopped. */
714 result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
716 switch (transition) {
717 case GST_STATE_CHANGE_READY_TO_PAUSED:
718 /* Start download task */
719 downloadhelper_start (demux->download_helper);
730 gst_adaptive_demux_output_slot_free (GstAdaptiveDemux * demux,
733 GstEvent *eos = gst_event_new_eos ();
734 GST_DEBUG_OBJECT (slot->pad, "Releasing slot");
736 /* FIXME: The slot might not have output any data, caps or segment yet */
737 gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
738 gst_pad_push_event (slot->pad, eos);
739 gst_pad_set_active (slot->pad, FALSE);
740 gst_flow_combiner_remove_pad (demux->priv->flowcombiner, slot->pad);
741 gst_element_remove_pad (GST_ELEMENT_CAST (demux), slot->pad);
743 gst_adaptive_demux_track_unref (slot->track);
744 if (slot->pending_track)
745 gst_adaptive_demux_track_unref (slot->pending_track);
747 g_slice_free (OutputSlot, slot);
751 gst_adaptive_demux_output_slot_new (GstAdaptiveDemux * demux,
752 GstStreamType streamtype)
755 GstPadTemplate *tmpl;
758 switch (streamtype) {
759 case GST_STREAM_TYPE_AUDIO:
760 name = g_strdup_printf ("audio_%02u", demux->priv->n_audio_streams++);
762 gst_static_pad_template_get (&gst_adaptive_demux_audiosrc_template);
764 case GST_STREAM_TYPE_VIDEO:
765 name = g_strdup_printf ("video_%02u", demux->priv->n_video_streams++);
767 gst_static_pad_template_get (&gst_adaptive_demux_videosrc_template);
769 case GST_STREAM_TYPE_TEXT:
771 g_strdup_printf ("subtitle_%02u", demux->priv->n_subtitle_streams++);
773 gst_static_pad_template_get
774 (&gst_adaptive_demux_subtitlesrc_template);
777 g_assert_not_reached ();
781 slot = g_slice_new0 (OutputSlot);
782 slot->type = streamtype;
783 slot->pushed_timed_data = FALSE;
785 /* Create and activate new pads */
786 slot->pad = gst_pad_new_from_template (tmpl, name);
788 gst_object_unref (tmpl);
790 gst_element_add_pad (GST_ELEMENT_CAST (demux), slot->pad);
791 gst_flow_combiner_add_pad (demux->priv->flowcombiner, slot->pad);
792 gst_pad_set_active (slot->pad, TRUE);
794 gst_pad_set_query_function (slot->pad,
795 GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
796 gst_pad_set_event_function (slot->pad,
797 GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_event));
799 gst_pad_set_element_private (slot->pad, slot);
801 GST_INFO_OBJECT (demux, "Created output slot %s:%s",
802 GST_DEBUG_PAD_NAME (slot->pad));
807 * * After `process_manifest` or when a period starts
808 * * Or when all tracks have been created
810 * Goes over tracks and creates the collection
812 * Returns TRUE if the collection was fully created.
814 * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
817 gst_adaptive_demux_update_collection (GstAdaptiveDemux * demux,
818 GstAdaptiveDemuxPeriod * period)
820 GstStreamCollection *collection;
823 GST_DEBUG_OBJECT (demux, "tracks_changed : %d", period->tracks_changed);
825 if (!period->tracks_changed) {
826 GST_DEBUG_OBJECT (demux, "Tracks didn't change");
830 if (!period->tracks) {
831 GST_WARNING_OBJECT (demux, "No tracks registered/present");
835 if (gst_adaptive_demux_period_has_pending_tracks (period)) {
836 GST_DEBUG_OBJECT (demux,
837 "Streams still have pending tracks, not creating/updating collection");
841 /* Update collection */
842 collection = gst_stream_collection_new ("adaptivedemux");
844 for (iter = period->tracks; iter; iter = iter->next) {
845 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
847 GST_DEBUG_OBJECT (demux, "Adding '%s' to collection", track->stream_id);
848 gst_stream_collection_add_stream (collection,
849 gst_object_ref (track->stream_object));
852 if (period->collection)
853 gst_object_unref (period->collection);
854 period->collection = collection;
860 * Called for the output period:
861 * * after `update_collection()` if the input period is the same as the output period
862 * * When the output period changes
864 * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
867 gst_adaptive_demux_post_collection (GstAdaptiveDemux * demux)
869 GstStreamCollection *collection;
870 GstAdaptiveDemuxPeriod *period = demux->output_period;
871 guint32 seqnum = g_atomic_int_get (&demux->priv->requested_selection_seqnum);
873 g_return_val_if_fail (period, FALSE);
874 if (!period->collection) {
875 GST_DEBUG_OBJECT (demux, "No collection available yet");
879 collection = period->collection;
881 GST_DEBUG_OBJECT (demux, "Posting collection for period %d",
884 /* Post collection */
885 TRACKS_UNLOCK (demux);
886 GST_MANIFEST_UNLOCK (demux);
888 gst_element_post_message (GST_ELEMENT_CAST (demux),
889 gst_message_new_stream_collection (GST_OBJECT (demux), collection));
891 GST_MANIFEST_LOCK (demux);
894 /* If no stream selection was handled, make a default selection */
895 if (seqnum == g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
896 gst_adaptive_demux_period_select_default_tracks (demux,
897 demux->output_period);
900 /* Make sure the output task is running */
901 if (gst_adaptive_demux2_is_running (demux)) {
902 demux->priv->flushing = FALSE;
903 GST_DEBUG_OBJECT (demux, "Starting the output task");
904 gst_task_start (demux->priv->output_task);
911 handle_incoming_manifest (GstAdaptiveDemux * demux)
913 GstAdaptiveDemuxClass *demux_class;
918 GstBuffer *manifest_buffer;
920 GST_API_LOCK (demux);
921 GST_MANIFEST_LOCK (demux);
923 demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
925 available = gst_adapter_available (demux->priv->input_adapter);
928 goto eos_without_data;
930 GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
932 /* Need to get the URI to use it as a base to generate the fragment's
934 query = gst_query_new_uri ();
935 query_res = gst_pad_peer_query (demux->sinkpad, query);
937 gchar *uri, *redirect_uri;
940 gst_query_parse_uri (query, &uri);
941 gst_query_parse_uri_redirection (query, &redirect_uri);
942 gst_query_parse_uri_redirection_permanent (query, &permanent);
944 if (permanent && redirect_uri) {
945 demux->manifest_uri = redirect_uri;
946 demux->manifest_base_uri = NULL;
949 demux->manifest_uri = uri;
950 demux->manifest_base_uri = redirect_uri;
953 GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
954 demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
956 GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
958 gst_query_unref (query);
960 /* If somehow we didn't receive a stream-start with a group_id, pick one now */
961 if (!demux->have_group_id) {
962 demux->have_group_id = TRUE;
963 demux->group_id = gst_util_group_id_next ();
966 /* Let the subclass parse the manifest */
968 gst_adapter_take_buffer (demux->priv->input_adapter, available);
969 ret = demux_class->process_manifest (demux, manifest_buffer);
970 gst_buffer_unref (manifest_buffer);
972 gst_element_post_message (GST_ELEMENT_CAST (demux),
973 gst_message_new_element (GST_OBJECT_CAST (demux),
974 gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
975 "manifest-uri", G_TYPE_STRING,
976 demux->manifest_uri, "uri", G_TYPE_STRING,
978 "manifest-download-start", GST_TYPE_CLOCK_TIME,
980 "manifest-download-stop", GST_TYPE_CLOCK_TIME,
981 gst_util_get_timestamp (), NULL)));
984 goto invalid_manifest;
986 /* Streams should have been added to the input period if the manifest parsing
988 if (!demux->input_period->streams)
991 g_atomic_int_set (&demux->priv->have_manifest, TRUE);
993 GST_DEBUG_OBJECT (demux, "Manifest was processed, setting ourselves up");
994 /* Send duration message */
995 if (!gst_adaptive_demux_is_live (demux)) {
996 GstClockTime duration = demux_class->get_duration (demux);
998 demux->priv->duration = duration;
999 if (duration != GST_CLOCK_TIME_NONE) {
1000 GST_DEBUG_OBJECT (demux,
1001 "Sending duration message : %" GST_TIME_FORMAT,
1002 GST_TIME_ARGS (duration));
1003 gst_element_post_message (GST_ELEMENT (demux),
1004 gst_message_new_duration_changed (GST_OBJECT (demux)));
1006 GST_DEBUG_OBJECT (demux,
1007 "media duration unknown, can not send the duration message");
1011 TRACKS_LOCK (demux);
1012 /* New streams/tracks will have been added to the input period */
1013 /* The input period has streams, make it the active output period */
1014 /* FIXME : Factorize this into a function to make a period active */
1015 demux->output_period = gst_adaptive_demux_period_ref (demux->input_period);
1016 ret = gst_adaptive_demux_update_collection (demux, demux->output_period) &&
1017 gst_adaptive_demux_post_collection (demux);
1018 TRACKS_UNLOCK (demux);
1020 gst_adaptive_demux_prepare_streams (demux,
1021 gst_adaptive_demux_is_live (demux));
1022 gst_adaptive_demux_start_tasks (demux);
1023 gst_adaptive_demux_start_manifest_update_task (demux);
1026 GST_MANIFEST_UNLOCK (demux);
1027 GST_API_UNLOCK (demux);
1034 GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
1042 GST_WARNING_OBJECT (demux, "No streams created from manifest");
1043 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1044 (_("This file contains no playable streams.")),
1045 ("No known stream formats found at the Manifest"));
1052 GST_MANIFEST_UNLOCK (demux);
1053 GST_API_UNLOCK (demux);
1055 /* In most cases, this will happen if we set a wrong url in the
1056 * source element and we have received the 404 HTML response instead of
1058 GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."), (NULL));
1063 struct http_headers_collector
1065 GstAdaptiveDemux *demux;
1070 gst_adaptive_demux_handle_upstream_http_header (GQuark field_id,
1071 const GValue * value, gpointer userdata)
1073 struct http_headers_collector *hdr_data = userdata;
1074 GstAdaptiveDemux *demux = hdr_data->demux;
1075 const gchar *field_name = g_quark_to_string (field_id);
1077 if (G_UNLIKELY (value == NULL))
1078 return TRUE; /* This should not happen */
1080 if (g_ascii_strcasecmp (field_name, "User-Agent") == 0) {
1081 const gchar *user_agent = g_value_get_string (value);
1083 GST_INFO_OBJECT (demux, "User-Agent : %s", GST_STR_NULL (user_agent));
1084 downloadhelper_set_user_agent (demux->download_helper, user_agent);
1087 if ((g_ascii_strcasecmp (field_name, "Cookie") == 0) ||
1088 g_ascii_strcasecmp (field_name, "Set-Cookie") == 0) {
1089 guint i = 0, prev_len = 0, total_len = 0;
1090 gchar **cookies = NULL;
1092 if (hdr_data->cookies != NULL)
1093 prev_len = g_strv_length (hdr_data->cookies);
1095 if (GST_VALUE_HOLDS_ARRAY (value)) {
1096 total_len = gst_value_array_get_size (value) + prev_len;
1097 cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1099 for (i = 0; i < gst_value_array_get_size (value); i++) {
1100 GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1101 g_value_get_string (gst_value_array_get_value (value, i)));
1102 cookies[i] = g_value_dup_string (gst_value_array_get_value (value, i));
1104 } else if (G_VALUE_HOLDS_STRING (value)) {
1105 total_len = 1 + prev_len;
1106 cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1108 GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1109 g_value_get_string (value));
1110 cookies[0] = g_value_dup_string (value);
1112 GST_WARNING_OBJECT (demux, "%s field is not string or array",
1113 g_quark_to_string (field_id));
1119 for (j = 0; j < prev_len; j++) {
1120 GST_DEBUG_OBJECT (demux,
1121 "Append existing cookie %s", hdr_data->cookies[j]);
1122 cookies[i + j] = g_strdup (hdr_data->cookies[j]);
1125 cookies[total_len] = NULL;
1127 g_strfreev (hdr_data->cookies);
1128 hdr_data->cookies = cookies;
1132 if (g_ascii_strcasecmp (field_name, "Referer") == 0) {
1133 const gchar *referer = g_value_get_string (value);
1134 GST_INFO_OBJECT (demux, "Referer : %s", GST_STR_NULL (referer));
1136 downloadhelper_set_referer (demux->download_helper, referer);
1139 /* Date header can be used to estimate server offset */
1140 if (g_ascii_strcasecmp (field_name, "Date") == 0) {
1141 const gchar *http_date = g_value_get_string (value);
1144 GstDateTime *datetime =
1145 gst_adaptive_demux_util_parse_http_head_date (http_date);
1148 GDateTime *utc_now = gst_date_time_to_g_date_time (datetime);
1149 gchar *date_string = gst_date_time_to_iso8601_string (datetime);
1151 GST_INFO_OBJECT (demux,
1152 "HTTP response Date %s", GST_STR_NULL (date_string));
1153 g_free (date_string);
1155 gst_adaptive_demux_clock_set_utc_time (demux->realtime_clock, utc_now);
1157 g_date_time_unref (utc_now);
1158 gst_date_time_unref (datetime);
1167 gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
1170 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1173 switch (event->type) {
1174 case GST_EVENT_FLUSH_STOP:{
1175 GST_API_LOCK (demux);
1176 GST_MANIFEST_LOCK (demux);
1178 gst_adaptive_demux_reset (demux);
1180 ret = gst_pad_event_default (pad, parent, event);
1182 GST_MANIFEST_UNLOCK (demux);
1183 GST_API_UNLOCK (demux);
1189 if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1190 if (!handle_incoming_manifest (demux)) {
1191 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1192 return gst_pad_event_default (pad, parent, event);
1194 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1196 GST_ERROR_OBJECT (demux,
1197 "Failed to acquire scheduler to handle manifest");
1198 return gst_pad_event_default (pad, parent, event);
1200 gst_event_unref (event);
1203 case GST_EVENT_STREAM_START:
1204 if (gst_event_parse_group_id (event, &demux->group_id))
1205 demux->have_group_id = TRUE;
1207 demux->have_group_id = FALSE;
1208 /* Swallow stream-start, we'll push our own */
1209 gst_event_unref (event);
1211 case GST_EVENT_SEGMENT:
1212 /* Swallow newsegments, we'll push our own */
1213 gst_event_unref (event);
1215 case GST_EVENT_CUSTOM_DOWNSTREAM_STICKY:{
1216 const GstStructure *structure = gst_event_get_structure (event);
1217 struct http_headers_collector c = { demux, NULL };
1219 if (gst_structure_has_name (structure, "http-headers")) {
1220 if (gst_structure_has_field (structure, "request-headers")) {
1221 GstStructure *req_headers = NULL;
1222 gst_structure_get (structure, "request-headers", GST_TYPE_STRUCTURE,
1223 &req_headers, NULL);
1225 gst_structure_foreach (req_headers,
1226 gst_adaptive_demux_handle_upstream_http_header, &c);
1227 gst_structure_free (req_headers);
1230 if (gst_structure_has_field (structure, "response-headers")) {
1231 GstStructure *res_headers = NULL;
1232 gst_structure_get (structure, "response-headers", GST_TYPE_STRUCTURE,
1233 &res_headers, NULL);
1235 gst_structure_foreach (res_headers,
1236 gst_adaptive_demux_handle_upstream_http_header, &c);
1237 gst_structure_free (res_headers);
1242 downloadhelper_set_cookies (demux->download_helper, c.cookies);
1250 return gst_pad_event_default (pad, parent, event);
1253 static GstFlowReturn
1254 gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
1257 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1259 GST_MANIFEST_LOCK (demux);
1261 gst_adapter_push (demux->priv->input_adapter, buffer);
1263 GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
1264 (gint) gst_adapter_available (demux->priv->input_adapter));
1266 GST_MANIFEST_UNLOCK (demux);
1271 /* Called with TRACKS_LOCK taken */
1273 gst_adaptive_demux_period_reset_tracks (GstAdaptiveDemuxPeriod * period)
1277 for (tmp = period->tracks; tmp; tmp = tmp->next) {
1278 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1280 gst_adaptive_demux_track_flush (track);
1281 if (gst_pad_is_active (track->sinkpad)) {
1282 gst_pad_set_active (track->sinkpad, FALSE);
1283 gst_pad_set_active (track->sinkpad, TRUE);
1288 /* Resets all tracks to their initial state, ready to receive new data. */
1290 gst_adaptive_demux_reset_tracks (GstAdaptiveDemux * demux)
1292 TRACKS_LOCK (demux);
1293 g_queue_foreach (demux->priv->periods,
1294 (GFunc) gst_adaptive_demux_period_reset_tracks, NULL);
1295 TRACKS_UNLOCK (demux);
1298 /* Subclasses will call this function to ensure that a new input period is
1299 * available to receive new streams and tracks */
1301 gst_adaptive_demux_start_new_period (GstAdaptiveDemux * demux)
1303 if (demux->input_period && !demux->input_period->prepared) {
1304 GST_DEBUG_OBJECT (demux, "Using existing input period");
1308 if (demux->input_period) {
1309 GST_DEBUG_OBJECT (demux, "Marking that previous period has a next one");
1310 demux->input_period->has_next_period = TRUE;
1312 GST_DEBUG_OBJECT (demux, "Setting up new period");
1314 demux->input_period = gst_adaptive_demux_period_new (demux);
1319 /* must be called with manifest_lock taken */
1321 gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
1323 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1326 gst_adaptive_demux_stop_tasks (demux, TRUE);
1329 klass->reset (demux);
1331 /* Disable and remove all outputs */
1332 GST_DEBUG_OBJECT (demux, "Disabling and removing all outputs");
1333 for (iter = demux->priv->outputs; iter; iter = iter->next) {
1334 gst_adaptive_demux_output_slot_free (demux, (OutputSlot *) iter->data);
1336 g_list_free (demux->priv->outputs);
1337 demux->priv->outputs = NULL;
1339 g_queue_clear_full (demux->priv->periods,
1340 (GDestroyNotify) gst_adaptive_demux_period_unref);
1342 /* The output period always has an extra ref taken on it */
1343 if (demux->output_period)
1344 gst_adaptive_demux_period_unref (demux->output_period);
1345 demux->output_period = NULL;
1346 /* The input period doesn't have an extra ref taken on it */
1347 demux->input_period = NULL;
1349 gst_adaptive_demux_start_new_period (demux);
1351 g_free (demux->manifest_uri);
1352 g_free (demux->manifest_base_uri);
1353 demux->manifest_uri = NULL;
1354 demux->manifest_base_uri = NULL;
1356 gst_adapter_clear (demux->priv->input_adapter);
1357 g_atomic_int_set (&demux->priv->have_manifest, FALSE);
1359 gst_segment_init (&demux->segment, GST_FORMAT_TIME);
1360 demux->instant_rate_multiplier = 1.0;
1362 demux->priv->duration = GST_CLOCK_TIME_NONE;
1364 demux->priv->percent = -1;
1365 demux->priv->is_buffering = TRUE;
1367 demux->have_group_id = FALSE;
1368 demux->group_id = G_MAXUINT;
1369 demux->priv->segment_seqnum = gst_util_seqnum_next ();
1371 demux->priv->global_output_position = 0;
1373 demux->priv->n_audio_streams = 0;
1374 demux->priv->n_video_streams = 0;
1375 demux->priv->n_subtitle_streams = 0;
1377 gst_flow_combiner_reset (demux->priv->flowcombiner);
1381 gst_adaptive_demux_query (GstElement * element, GstQuery * query)
1383 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
1385 GST_LOG_OBJECT (demux, "%" GST_PTR_FORMAT, query);
1387 switch (GST_QUERY_TYPE (query)) {
1388 case GST_QUERY_BUFFERING:
1391 gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1393 if (!demux->output_period) {
1394 if (format != GST_FORMAT_TIME) {
1395 GST_DEBUG_OBJECT (demux,
1396 "No period setup yet, can't answer non-TIME buffering queries");
1400 GST_DEBUG_OBJECT (demux,
1401 "No period setup yet, but still answering buffering query");
1409 return GST_ELEMENT_CLASS (parent_class)->query (element, query);
1413 gst_adaptive_demux_send_event (GstElement * element, GstEvent * event)
1415 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
1416 gboolean res = FALSE;
1418 GST_DEBUG_OBJECT (demux, "Received event %" GST_PTR_FORMAT, event);
1420 switch (GST_EVENT_TYPE (event)) {
1421 case GST_EVENT_SEEK:
1423 res = gst_adaptive_demux_handle_seek_event (demux, event);
1426 case GST_EVENT_SELECT_STREAMS:
1428 res = gst_adaptive_demux_handle_select_streams_event (demux, event);
1432 res = GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
1438 /* MANIFEST_LOCK held. Find the stream that owns the given element */
1439 static GstAdaptiveDemux2Stream *
1440 find_stream_for_element_locked (GstAdaptiveDemux * demux, GstObject * o)
1444 /* We only look in the streams of the input period (i.e. with active streams) */
1445 for (iter = demux->input_period->streams; iter; iter = iter->next) {
1446 GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1447 if (gst_object_has_as_ancestor (o, GST_OBJECT_CAST (stream->parsebin))) {
1456 gst_adaptive_demux_handle_stream_collection_msg (GstAdaptiveDemux * demux,
1459 GstAdaptiveDemux2Stream *stream;
1460 GstStreamCollection *collection = NULL;
1461 gboolean pending_tracks_activated = FALSE;
1463 GST_MANIFEST_LOCK (demux);
1465 stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1466 if (stream == NULL) {
1467 GST_WARNING_OBJECT (demux,
1468 "Failed to locate stream for collection message");
1472 gst_message_parse_stream_collection (msg, &collection);
1476 TRACKS_LOCK (demux);
1478 if (!gst_adaptive_demux2_stream_handle_collection (stream, collection,
1479 &pending_tracks_activated)) {
1480 TRACKS_UNLOCK (demux);
1482 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1483 (_("Stream format can't be handled")),
1484 ("The streams provided by the multiplex are ambiguous"));
1488 if (pending_tracks_activated) {
1489 /* If pending tracks were handled, then update the demuxer collection */
1490 if (gst_adaptive_demux_update_collection (demux, demux->input_period) &&
1491 demux->input_period == demux->output_period) {
1492 gst_adaptive_demux_post_collection (demux);
1495 /* If we discovered pending tracks and we no longer have any, we can ensure
1496 * selected tracks are started */
1497 if (!gst_adaptive_demux_period_has_pending_tracks (demux->input_period)) {
1498 GList *iter = demux->input_period->streams;
1499 for (; iter; iter = iter->next) {
1500 GstAdaptiveDemux2Stream *new_stream = iter->data;
1502 /* The stream that posted this collection was already started. If a
1503 * different stream is now selected, start it */
1504 if (stream != new_stream
1505 && gst_adaptive_demux2_stream_is_selected_locked (new_stream))
1506 gst_adaptive_demux2_stream_start (new_stream);
1510 TRACKS_UNLOCK (demux);
1513 GST_MANIFEST_UNLOCK (demux);
1516 gst_object_unref (collection);
1517 gst_message_unref (msg);
1522 gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
1524 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);
1526 switch (GST_MESSAGE_TYPE (msg)) {
1527 case GST_MESSAGE_STREAM_COLLECTION:
1529 gst_adaptive_demux_handle_stream_collection_msg (demux, msg);
1532 case GST_MESSAGE_ERROR:{
1533 GstAdaptiveDemux2Stream *stream = NULL;
1535 gchar *debug = NULL;
1536 gchar *new_error = NULL;
1537 const GstStructure *details = NULL;
1539 GST_MANIFEST_LOCK (demux);
1541 stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1542 if (stream == NULL) {
1543 GST_WARNING_OBJECT (demux,
1544 "Failed to locate stream for errored element");
1545 GST_MANIFEST_UNLOCK (demux);
1549 gst_message_parse_error (msg, &err, &debug);
1551 GST_WARNING_OBJECT (demux,
1552 "Source posted error: %d:%d %s (%s)", err->domain, err->code,
1553 err->message, debug);
1556 new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
1558 g_free (err->message);
1559 err->message = new_error;
1562 gst_message_parse_error_details (msg, &details);
1564 gst_structure_get_uint (details, "http-status-code",
1565 &stream->last_status_code);
1568 /* error, but ask to retry */
1569 if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1570 gst_adaptive_demux2_stream_parse_error (stream, err);
1571 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1577 GST_MANIFEST_UNLOCK (demux);
1579 gst_message_unref (msg);
1588 GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
1591 /* must be called with manifest_lock taken */
1593 gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
1595 GstAdaptiveDemuxClass *klass;
1597 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1599 if (klass->get_period_start_time == NULL)
1602 return klass->get_period_start_time (demux);
1605 /* must be called with manifest_lock taken */
1607 gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
1608 gboolean first_and_live)
1611 GstClockTime period_start;
1612 GstClockTimeDiff min_stream_time = GST_CLOCK_STIME_NONE;
1615 g_return_val_if_fail (demux->input_period->streams, FALSE);
1616 g_assert (demux->input_period->prepared == FALSE);
1618 new_streams = demux->input_period->streams;
1620 if (!gst_adaptive_demux2_is_running (demux)) {
1621 GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
1625 GST_DEBUG_OBJECT (demux,
1626 "Preparing %d streams for period %d , first_and_live:%d",
1627 g_list_length (new_streams), demux->input_period->period_num,
1630 for (iter = new_streams; iter; iter = g_list_next (iter)) {
1631 GstAdaptiveDemux2Stream *stream = iter->data;
1633 GST_DEBUG_OBJECT (stream, "Preparing stream");
1635 stream->need_header = TRUE;
1636 stream->discont = TRUE;
1638 /* Grab the first stream time for live streams
1639 * * If the stream is selected
1640 * * Or it provides dynamic tracks (in which case we need to force an update)
1643 && (gst_adaptive_demux2_stream_is_selected_locked (stream)
1644 || stream->pending_tracks)) {
1645 /* TODO we only need the first timestamp, maybe create a simple function to
1646 * get the current PTS of a fragment ? */
1647 GST_DEBUG_OBJECT (stream, "Calling update_fragment_info");
1648 gst_adaptive_demux2_stream_update_fragment_info (stream);
1650 GST_DEBUG_OBJECT (stream,
1651 "Got stream time %" GST_STIME_FORMAT,
1652 GST_STIME_ARGS (stream->fragment.stream_time));
1654 if (GST_CLOCK_STIME_IS_VALID (min_stream_time)) {
1655 min_stream_time = MIN (min_stream_time, stream->fragment.stream_time);
1657 min_stream_time = stream->fragment.stream_time;
1662 period_start = gst_adaptive_demux_get_period_start_time (demux);
1664 /* For live streams, the subclass is supposed to seek to the current fragment
1665 * and then tell us its stream time in stream->fragment.stream_time. We now
1666 * also have to seek our demuxer segment to reflect this.
1668 * FIXME: This needs some refactoring at some point.
1670 if (first_and_live) {
1671 gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
1672 GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_stream_time + period_start,
1673 GST_SEEK_TYPE_NONE, -1, NULL);
1676 GST_DEBUG_OBJECT (demux,
1677 "period_start:%" GST_TIME_FORMAT ", min_stream_time:%" GST_STIME_FORMAT
1678 " demux segment %" GST_SEGMENT_FORMAT,
1679 GST_TIME_ARGS (period_start), GST_STIME_ARGS (min_stream_time),
1682 /* Synchronize stream start/current positions */
1683 if (min_stream_time == GST_CLOCK_STIME_NONE)
1684 min_stream_time = period_start;
1686 min_stream_time += period_start;
1687 for (iter = new_streams; iter; iter = g_list_next (iter)) {
1688 GstAdaptiveDemux2Stream *stream = iter->data;
1689 stream->start_position = stream->current_position = min_stream_time;
1692 for (iter = new_streams; iter; iter = g_list_next (iter)) {
1693 GstAdaptiveDemux2Stream *stream = iter->data;
1694 stream->compute_segment = TRUE;
1695 stream->first_and_live = first_and_live;
1697 demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
1698 demux->input_period->prepared = TRUE;
1703 static GstAdaptiveDemuxTrack *
1704 find_track_for_stream_id (GstAdaptiveDemuxPeriod * period, gchar * stream_id)
1708 for (tmp = period->tracks; tmp; tmp = tmp->next) {
1709 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1710 if (!g_strcmp0 (track->stream_id, stream_id))
1717 /* TRACKS_LOCK hold */
1719 demux_update_buffering_locked (GstAdaptiveDemux * demux)
1721 GstClockTime min_level_time = GST_CLOCK_TIME_NONE;
1722 GstClockTime video_level_time = GST_CLOCK_TIME_NONE;
1723 GstClockTime audio_level_time = GST_CLOCK_TIME_NONE;
1725 gint min_percent = -1, percent;
1726 gboolean all_eos = TRUE;
1728 /* Go over all active tracks of the output period and update level */
1730 /* Check that all tracks are above their respective low thresholds (different
1731 * tracks may have different fragment durations yielding different buffering
1732 * percentages) Overall buffering percent is the lowest. */
1733 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
1734 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1736 GST_LOG_OBJECT (demux,
1737 "Checking track '%s' (period %u) active:%d selected:%d eos:%d level:%"
1738 GST_TIME_FORMAT " buffering_threshold:%" GST_TIME_FORMAT,
1739 track->stream_id, track->period_num, track->active, track->selected,
1740 track->eos, GST_TIME_ARGS (track->level_time),
1741 GST_TIME_ARGS (track->buffering_threshold));
1743 if (track->active && track->selected) {
1748 if (min_level_time == GST_CLOCK_TIME_NONE) {
1749 min_level_time = track->level_time;
1750 } else if (track->level_time < min_level_time) {
1751 min_level_time = track->level_time;
1754 if (track->type & GST_STREAM_TYPE_VIDEO
1755 && video_level_time > track->level_time)
1756 video_level_time = track->level_time;
1758 if (track->type & GST_STREAM_TYPE_AUDIO
1759 && audio_level_time > track->level_time)
1760 audio_level_time = track->level_time;
1762 if (track->level_time != GST_CLOCK_TIME_NONE
1763 && track->buffering_threshold != 0) {
1765 gst_util_uint64_scale (track->level_time, 100,
1766 track->buffering_threshold);
1767 if (min_percent < 0 || cur_percent < min_percent)
1768 min_percent = cur_percent;
1774 GST_DEBUG_OBJECT (demux,
1775 "Minimum time level %" GST_TIME_FORMAT " percent %d all_eos:%d",
1776 GST_TIME_ARGS (min_level_time), min_percent, all_eos);
1778 /* Update demuxer video/audio level properties */
1779 GST_OBJECT_LOCK (demux);
1780 demux->current_level_time_video = video_level_time;
1781 demux->current_level_time_audio = audio_level_time;
1782 GST_OBJECT_UNLOCK (demux);
1784 if (min_percent < 0 && !all_eos)
1787 if (min_percent > 100 || all_eos)
1790 percent = MAX (0, min_percent);
1792 GST_LOG_OBJECT (demux, "percent : %d %%", percent);
1794 if (demux->priv->is_buffering) {
1796 demux->priv->is_buffering = FALSE;
1797 if (demux->priv->percent != percent) {
1798 demux->priv->percent = percent;
1799 demux->priv->percent_changed = TRUE;
1801 } else if (percent < 1) {
1802 demux->priv->is_buffering = TRUE;
1803 if (demux->priv->percent != percent) {
1804 demux->priv->percent = percent;
1805 demux->priv->percent_changed = TRUE;
1809 if (demux->priv->percent_changed)
1810 GST_DEBUG_OBJECT (demux, "Percent changed, %d %% is_buffering:%d", percent,
1811 demux->priv->is_buffering);
1814 /* With TRACKS_LOCK held */
1816 demux_post_buffering_locked (GstAdaptiveDemux * demux)
1821 if (!demux->priv->percent_changed)
1824 BUFFERING_LOCK (demux);
1825 percent = demux->priv->percent;
1826 msg = gst_message_new_buffering ((GstObject *) demux, percent);
1827 TRACKS_UNLOCK (demux);
1828 gst_element_post_message ((GstElement *) demux, msg);
1830 BUFFERING_UNLOCK (demux);
1831 TRACKS_LOCK (demux);
1832 if (percent == demux->priv->percent)
1833 demux->priv->percent_changed = FALSE;
1836 /* MANIFEST_LOCK and TRACKS_LOCK hold */
1837 GstAdaptiveDemux2Stream *
1838 find_stream_for_track_locked (GstAdaptiveDemux * demux,
1839 GstAdaptiveDemuxTrack * track)
1843 for (iter = demux->output_period->streams; iter; iter = iter->next) {
1844 GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1845 if (g_list_find (stream->tracks, track))
1852 /* Called from seek handler
1854 * This function is used when a (flushing) seek caused a new period to be activated.
1856 * This will ensure that:
1857 * * the current output period is marked as finished (EOS)
1858 * * Any potential intermediate (non-input/non-output) periods are removed
1859 * * That the new input period is prepared and ready
1862 gst_adaptive_demux_seek_to_input_period (GstAdaptiveDemux * demux)
1866 GST_DEBUG_OBJECT (demux,
1867 "Preparing new input period %u", demux->input_period->period_num);
1869 /* Prepare the new input period */
1870 gst_adaptive_demux_update_collection (demux, demux->input_period);
1872 /* Transfer the previous selection to the new input period */
1873 gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
1874 demux->output_period);
1875 gst_adaptive_demux_prepare_streams (demux, FALSE);
1877 /* Remove all periods except for the input (last) and output (first) period */
1878 while (demux->priv->periods->length > 2) {
1879 GstAdaptiveDemuxPeriod *period = g_queue_pop_nth (demux->priv->periods, 1);
1880 /* Mark all tracks of the removed period as not selected and EOS so they
1881 * will be skipped / ignored */
1882 for (iter = period->tracks; iter; iter = iter->next) {
1883 GstAdaptiveDemuxTrack *track = iter->data;
1884 track->selected = FALSE;
1887 gst_adaptive_demux_period_unref (period);
1890 /* Mark all tracks of the output period as EOS so that the output loop
1891 * will immediately move to the new period */
1892 for (iter = demux->output_period->tracks; iter; iter = iter->next) {
1893 GstAdaptiveDemuxTrack *track = iter->data;
1897 /* Go over all slots, and clear any pending track */
1898 for (iter = demux->priv->outputs; iter; iter = iter->next) {
1899 OutputSlot *slot = (OutputSlot *) iter->data;
1901 if (slot->pending_track != NULL) {
1902 GST_DEBUG_OBJECT (demux,
1903 "Removing track '%s' as pending from output of current track '%s'",
1904 slot->pending_track->stream_id, slot->track->stream_id);
1905 gst_adaptive_demux_track_unref (slot->pending_track);
1906 slot->pending_track = NULL;
1911 /* must be called with manifest_lock taken */
1913 gst_adaptive_demux_get_live_seek_range (GstAdaptiveDemux * demux,
1914 gint64 * range_start, gint64 * range_stop)
1916 GstAdaptiveDemuxClass *klass;
1918 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1920 g_return_val_if_fail (klass->get_live_seek_range, FALSE);
1922 return klass->get_live_seek_range (demux, range_start, range_stop);
1925 /* must be called with manifest_lock taken */
1927 gst_adaptive_demux2_stream_in_live_seek_range (GstAdaptiveDemux * demux,
1928 GstAdaptiveDemux2Stream * stream)
1930 gint64 range_start, range_stop;
1931 if (gst_adaptive_demux_get_live_seek_range (demux, &range_start, &range_stop)) {
1932 GST_LOG_OBJECT (stream,
1933 "stream position %" GST_TIME_FORMAT " live seek range %"
1934 GST_STIME_FORMAT " - %" GST_STIME_FORMAT,
1935 GST_TIME_ARGS (stream->current_position), GST_STIME_ARGS (range_start),
1936 GST_STIME_ARGS (range_stop));
1937 return (stream->current_position >= range_start
1938 && stream->current_position <= range_stop);
1944 /* must be called with manifest_lock taken */
1946 gst_adaptive_demux_can_seek (GstAdaptiveDemux * demux)
1948 GstAdaptiveDemuxClass *klass;
1950 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1951 if (gst_adaptive_demux_is_live (demux)) {
1952 return klass->get_live_seek_range != NULL;
1955 return klass->seek != NULL;
1959 gst_adaptive_demux_setup_streams_for_restart (GstAdaptiveDemux * demux,
1960 GstSeekType start_type, GstSeekType stop_type)
1964 for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
1965 GstAdaptiveDemux2Stream *stream = iter->data;
1967 /* Make sure the download loop clears and restarts on the next start,
1968 * which will recompute the stream segment */
1969 g_assert (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED ||
1970 stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART);
1971 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
1972 stream->start_position = 0;
1974 if (demux->segment.rate > 0 && start_type != GST_SEEK_TYPE_NONE)
1975 stream->start_position = demux->segment.start;
1976 else if (demux->segment.rate < 0 && stop_type != GST_SEEK_TYPE_NONE)
1977 stream->start_position = demux->segment.stop;
1981 #define IS_SNAP_SEEK(f) (f & (GST_SEEK_FLAG_SNAP_BEFORE | \
1982 GST_SEEK_FLAG_SNAP_AFTER | \
1983 GST_SEEK_FLAG_SNAP_NEAREST | \
1984 GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | \
1985 GST_SEEK_FLAG_KEY_UNIT))
1986 #define REMOVE_SNAP_FLAGS(f) (f & ~(GST_SEEK_FLAG_SNAP_BEFORE | \
1987 GST_SEEK_FLAG_SNAP_AFTER | \
1988 GST_SEEK_FLAG_SNAP_NEAREST))
1991 gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux,
1994 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1998 GstSeekType start_type, stop_type;
2003 GstSegment oldsegment;
2004 GstEvent *flush_event;
2006 GST_INFO_OBJECT (demux, "Received seek event");
2008 GST_API_LOCK (demux);
2010 gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
2013 if (format != GST_FORMAT_TIME) {
2014 GST_API_UNLOCK (demux);
2015 GST_WARNING_OBJECT (demux,
2016 "Adaptive demuxers only support TIME-based seeking");
2017 gst_event_unref (event);
2021 if (flags & GST_SEEK_FLAG_SEGMENT) {
2022 GST_FIXME_OBJECT (demux, "Handle segment seeks");
2023 GST_API_UNLOCK (demux);
2024 gst_event_unref (event);
2028 seqnum = gst_event_get_seqnum (event);
2030 if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
2031 GST_LOG_OBJECT (demux, "Failed to acquire scheduler context");
2035 if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE) {
2036 /* For instant rate seeks, reply directly and update
2037 * our segment so the new rate is reflected in any future
2040 gdouble rate_multiplier;
2042 /* instant rate change only supported if direction does not change. All
2043 * other requirements are already checked before creating the seek event
2044 * but let's double-check here to be sure */
2045 if ((demux->segment.rate > 0 && rate < 0) ||
2046 (demux->segment.rate < 0 && rate > 0) ||
2047 start_type != GST_SEEK_TYPE_NONE ||
2048 stop_type != GST_SEEK_TYPE_NONE || (flags & GST_SEEK_FLAG_FLUSH)) {
2049 GST_ERROR_OBJECT (demux,
2050 "Instant rate change seeks only supported in the "
2051 "same direction, without flushing and position change");
2052 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2053 GST_API_UNLOCK (demux);
2057 rate_multiplier = rate / demux->segment.rate;
2059 ev = gst_event_new_instant_rate_change (rate_multiplier,
2060 (GstSegmentFlags) flags);
2061 gst_event_set_seqnum (ev, seqnum);
2063 ret = gst_adaptive_demux_push_src_event (demux, ev);
2066 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2067 demux->instant_rate_multiplier = rate_multiplier;
2068 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2071 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2072 GST_API_UNLOCK (demux);
2073 gst_event_unref (event);
2078 if (!gst_adaptive_demux_can_seek (demux)) {
2079 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2081 GST_API_UNLOCK (demux);
2082 gst_event_unref (event);
2086 /* We can only accept flushing seeks from this point onward */
2087 if (!(flags & GST_SEEK_FLAG_FLUSH)) {
2088 GST_ERROR_OBJECT (demux,
2089 "Non-flushing non-instant-rate seeks are not possible");
2091 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2093 GST_API_UNLOCK (demux);
2094 gst_event_unref (event);
2098 if (gst_adaptive_demux_is_live (demux)) {
2099 gint64 range_start, range_stop;
2100 gboolean changed = FALSE;
2101 gboolean start_valid = TRUE, stop_valid = TRUE;
2103 if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
2105 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2106 GST_API_UNLOCK (demux);
2107 gst_event_unref (event);
2108 GST_WARNING_OBJECT (demux, "Failure getting the live seek ranges");
2112 GST_DEBUG_OBJECT (demux,
2113 "Live range is %" GST_STIME_FORMAT " %" GST_STIME_FORMAT,
2114 GST_STIME_ARGS (range_start), GST_STIME_ARGS (range_stop));
2116 /* Handle relative positioning for live streams (relative to the range_stop) */
2117 if (start_type == GST_SEEK_TYPE_END) {
2118 start = range_stop + start;
2119 start_type = GST_SEEK_TYPE_SET;
2122 if (stop_type == GST_SEEK_TYPE_END) {
2123 stop = range_stop + stop;
2124 stop_type = GST_SEEK_TYPE_SET;
2128 /* Adjust the requested start/stop position if it falls beyond the live
2130 * The only case where we don't adjust is for the starting point of
2131 * an accurate seek (start if forward and stop if backwards)
2133 if (start_type == GST_SEEK_TYPE_SET && start < range_start &&
2134 (rate < 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2135 GST_DEBUG_OBJECT (demux,
2136 "seek before live stream start, setting to range start: %"
2137 GST_TIME_FORMAT, GST_TIME_ARGS (range_start));
2138 start = range_start;
2141 /* truncate stop position also if set */
2142 if (stop_type == GST_SEEK_TYPE_SET && stop > range_stop &&
2143 (rate > 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2144 GST_DEBUG_OBJECT (demux,
2145 "seek ending after live start, adjusting to: %"
2146 GST_TIME_FORMAT, GST_TIME_ARGS (range_stop));
2151 if (start_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (start) &&
2152 (start < range_start || start > range_stop)) {
2153 GST_WARNING_OBJECT (demux,
2154 "Seek to invalid position start:%" GST_STIME_FORMAT
2155 " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2156 ")", GST_STIME_ARGS (start), GST_STIME_ARGS (range_start),
2157 GST_STIME_ARGS (range_stop));
2158 start_valid = FALSE;
2160 if (stop_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (stop) &&
2161 (stop < range_start || stop > range_stop)) {
2162 GST_WARNING_OBJECT (demux,
2163 "Seek to invalid position stop:%" GST_STIME_FORMAT
2164 " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2165 ")", GST_STIME_ARGS (stop), GST_STIME_ARGS (range_start),
2166 GST_STIME_ARGS (range_stop));
2170 /* If the seek position is still outside of the seekable range, refuse the seek */
2171 if (!start_valid || !stop_valid) {
2172 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2173 GST_API_UNLOCK (demux);
2174 gst_event_unref (event);
2178 /* Re-create seek event with changed/updated values */
2180 gst_event_unref (event);
2182 gst_event_new_seek (rate, format, flags,
2183 start_type, start, stop_type, stop);
2184 gst_event_set_seqnum (event, seqnum);
2188 GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
2190 /* have a backup in case seek fails */
2191 gst_segment_copy_into (&demux->segment, &oldsegment);
2193 GST_DEBUG_OBJECT (demux, "sending flush start");
2194 flush_event = gst_event_new_flush_start ();
2195 gst_event_set_seqnum (flush_event, seqnum);
2197 gst_adaptive_demux_push_src_event (demux, flush_event);
2199 gst_adaptive_demux_stop_tasks (demux, FALSE);
2200 gst_adaptive_demux_reset_tracks (demux);
2202 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2205 * Handle snap seeks as follows:
2206 * 1) do the snap seeking a (random) active stream
2207 * 2) use the final position on this stream to seek
2208 * on the other streams to the same position
2210 * We can't snap at all streams at the same time as they might end in
2211 * different positions, so just pick one and align all others to that
2215 GstAdaptiveDemux2Stream *stream = NULL;
2217 /* Pick a random active stream on which to do the stream seek */
2218 for (iter = demux->output_period->streams; iter; iter = iter->next) {
2219 GstAdaptiveDemux2Stream *cand = iter->data;
2220 if (gst_adaptive_demux2_stream_is_selected_locked (cand)) {
2226 if (stream && IS_SNAP_SEEK (flags)) {
2227 GstClockTimeDiff ts;
2228 GstSeekFlags stream_seek_flags = flags;
2230 /* snap-seek on the chosen stream and then
2231 * use the resulting position to seek on all streams */
2233 if (start_type != GST_SEEK_TYPE_NONE)
2236 ts = gst_segment_position_from_running_time (&demux->segment,
2237 GST_FORMAT_TIME, demux->priv->global_output_position);
2238 start_type = GST_SEEK_TYPE_SET;
2241 if (stop_type != GST_SEEK_TYPE_NONE)
2244 stop_type = GST_SEEK_TYPE_SET;
2245 ts = gst_segment_position_from_running_time (&demux->segment,
2246 GST_FORMAT_TIME, demux->priv->global_output_position);
2250 if (gst_adaptive_demux2_stream_seek (stream, rate >= 0, stream_seek_flags,
2251 ts, &ts) != GST_FLOW_OK) {
2252 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2254 GST_API_UNLOCK (demux);
2255 gst_event_unref (event);
2259 /* replace event with a new one without snapping to seek on all streams */
2260 gst_event_unref (event);
2267 gst_event_new_seek (rate, format, REMOVE_SNAP_FLAGS (flags),
2268 start_type, start, stop_type, stop);
2269 GST_DEBUG_OBJECT (demux, "Adapted snap seek to %" GST_PTR_FORMAT, event);
2272 ret = gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
2273 start, stop_type, stop, &update);
2276 GST_DEBUG_OBJECT (demux, "Calling subclass seek: %" GST_PTR_FORMAT, event);
2278 ret = demux_class->seek (demux, event);
2282 /* Is there anything else we can do if it fails? */
2283 gst_segment_copy_into (&oldsegment, &demux->segment);
2285 demux->priv->segment_seqnum = seqnum;
2287 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2289 /* Resetting flow combiner */
2290 gst_flow_combiner_reset (demux->priv->flowcombiner);
2292 GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
2293 flush_event = gst_event_new_flush_stop (TRUE);
2294 gst_event_set_seqnum (flush_event, seqnum);
2295 gst_adaptive_demux_push_src_event (demux, flush_event);
2297 /* If the seek generated a new period, prepare it */
2298 if (!demux->input_period->prepared) {
2299 /* This can only happen on flushing seeks */
2300 g_assert (flags & GST_SEEK_FLAG_FLUSH);
2301 gst_adaptive_demux_seek_to_input_period (demux);
2304 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2305 GST_DEBUG_OBJECT (demux, "Demuxer segment after seek: %" GST_SEGMENT_FORMAT,
2307 gst_adaptive_demux_setup_streams_for_restart (demux, start_type, stop_type);
2308 demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2310 /* Reset the global output position (running time) for when the output loop restarts */
2311 demux->priv->global_output_position = 0;
2313 /* After a flushing seek, any instant-rate override is undone */
2314 demux->instant_rate_multiplier = 1.0;
2316 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2318 /* Restart the demux */
2319 gst_adaptive_demux_start_tasks (demux);
2321 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2322 GST_API_UNLOCK (demux);
2323 gst_event_unref (event);
2328 /* Returns TRUE if the stream has at least one selected track */
2330 gst_adaptive_demux2_stream_has_selected_tracks (GstAdaptiveDemux2Stream *
2335 for (tmp = stream->tracks; tmp; tmp = tmp->next) {
2336 GstAdaptiveDemuxTrack *track = tmp->data;
2338 if (track->selected)
2346 handle_stream_selection (GstAdaptiveDemux * demux, GList * streams,
2349 gboolean selection_handled = TRUE;
2351 GList *tracks = NULL;
2353 if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux))
2356 TRACKS_LOCK (demux);
2357 /* Validate the streams and fill:
2358 * tracks : list of tracks corresponding to requested streams
2360 for (iter = streams; iter; iter = iter->next) {
2361 gchar *stream_id = (gchar *) iter->data;
2362 GstAdaptiveDemuxTrack *track;
2364 GST_DEBUG_OBJECT (demux, "Stream requested : %s", stream_id);
2365 track = find_track_for_stream_id (demux->output_period, stream_id);
2367 GST_WARNING_OBJECT (demux, "Unrecognized stream_id '%s'", stream_id);
2368 selection_handled = FALSE;
2369 goto select_streams_done;
2371 tracks = g_list_append (tracks, track);
2372 GST_DEBUG_OBJECT (demux, "Track found, selected:%d", track->selected);
2375 /* FIXME : ACTIVATING AND DEACTIVATING STREAMS SHOULD BE DONE FROM THE
2376 * SCHEDULING THREAD */
2378 /* FIXME: We want to iterate all streams, mark them as deselected,
2379 * then iterate tracks and mark any streams that have at least 1
2380 * active output track, then loop over all streams again and start/stop
2383 /* Go over all tracks present and (de)select based on current selection */
2384 for (iter = demux->output_period->tracks; iter; iter = iter->next) {
2385 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
2387 if (track->selected && !g_list_find (tracks, track)) {
2388 GST_DEBUG_OBJECT (demux, "De-select track '%s' (active:%d)",
2389 track->stream_id, track->active);
2390 track->selected = FALSE;
2391 track->draining = TRUE;
2392 } else if (!track->selected && g_list_find (tracks, track)) {
2393 GST_DEBUG_OBJECT (demux, "Selecting track '%s'", track->stream_id);
2395 track->selected = TRUE;
2399 /* Start or stop streams based on the updated track selection */
2400 for (iter = demux->output_period->streams; iter; iter = iter->next) {
2401 GstAdaptiveDemux2Stream *stream = iter->data;
2404 gboolean is_running = gst_adaptive_demux2_stream_is_running (stream);
2405 gboolean should_be_running =
2406 gst_adaptive_demux2_stream_has_selected_tracks (stream);
2408 if (!is_running && should_be_running) {
2409 GstClockTime output_running_ts = demux->priv->global_output_position;
2410 GstClockTime start_position;
2412 /* Calculate where we should start the stream, and then
2414 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2416 GST_DEBUG_OBJECT (stream, "(Re)starting stream. Output running ts %"
2417 GST_TIME_FORMAT " in demux segment %" GST_SEGMENT_FORMAT,
2418 GST_TIME_ARGS (output_running_ts), &demux->segment);
2421 gst_segment_position_from_running_time (&demux->segment,
2422 GST_FORMAT_TIME, output_running_ts);
2424 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2426 GST_DEBUG_OBJECT (demux, "Setting stream start position to %"
2427 GST_TIME_FORMAT, GST_TIME_ARGS (start_position));
2429 stream->current_position = stream->start_position = start_position;
2430 stream->compute_segment = TRUE;
2432 /* If output has already begun, ensure we seek this segment
2433 * to the correct restart position when the download loop begins */
2434 if (output_running_ts != 0)
2435 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
2437 /* Activate track pads for this stream */
2438 for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2439 GstAdaptiveDemuxTrack *track =
2440 (GstAdaptiveDemuxTrack *) trackiter->data;
2441 gst_pad_set_active (track->sinkpad, TRUE);
2444 gst_adaptive_demux2_stream_start (stream);
2445 } else if (is_running && !should_be_running) {
2446 /* Stream should not be running and needs stopping */
2447 gst_adaptive_demux2_stream_stop (stream);
2449 /* Set all track sinkpads to inactive for this stream */
2450 for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2451 GstAdaptiveDemuxTrack *track =
2452 (GstAdaptiveDemuxTrack *) trackiter->data;
2453 gst_pad_set_active (track->sinkpad, FALSE);
2458 g_atomic_int_set (&demux->priv->requested_selection_seqnum, seqnum);
2460 select_streams_done:
2461 demux_update_buffering_locked (demux);
2462 demux_post_buffering_locked (demux);
2464 TRACKS_UNLOCK (demux);
2465 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2468 g_list_free (tracks);
2469 return selection_handled;
2473 gst_adaptive_demux_handle_select_streams_event (GstAdaptiveDemux * demux,
2477 gboolean selection_handled;
2479 if (GST_EVENT_SEQNUM (event) ==
2480 g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
2481 GST_DEBUG_OBJECT (demux, "Already handled/handling select-streams %d",
2482 GST_EVENT_SEQNUM (event));
2486 gst_event_parse_select_streams (event, &streams);
2488 handle_stream_selection (demux, streams, GST_EVENT_SEQNUM (event));
2489 g_list_free_full (streams, g_free);
2491 gst_event_unref (event);
2492 return selection_handled;
2496 gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
2499 GstAdaptiveDemux *demux;
2501 demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2503 switch (event->type) {
2504 case GST_EVENT_SEEK:
2506 guint32 seqnum = gst_event_get_seqnum (event);
2507 if (seqnum == demux->priv->segment_seqnum) {
2508 GST_LOG_OBJECT (pad,
2509 "Drop duplicated SEEK event seqnum %" G_GUINT32_FORMAT, seqnum);
2510 gst_event_unref (event);
2513 return gst_adaptive_demux_handle_seek_event (demux, event);
2515 case GST_EVENT_LATENCY:{
2516 /* Upstream and our internal source are irrelevant
2517 * for latency, and we should not fail here to
2518 * configure the latency */
2519 gst_event_unref (event);
2522 case GST_EVENT_QOS:{
2523 GstClockTimeDiff diff;
2524 GstClockTime timestamp;
2525 GstClockTime earliest_time;
2527 gst_event_parse_qos (event, NULL, NULL, &diff, ×tamp);
2528 /* Only take into account lateness if late */
2530 earliest_time = timestamp + 2 * diff;
2532 earliest_time = timestamp;
2534 GST_OBJECT_LOCK (demux);
2535 if (!GST_CLOCK_TIME_IS_VALID (demux->priv->qos_earliest_time) ||
2536 earliest_time > demux->priv->qos_earliest_time) {
2537 demux->priv->qos_earliest_time = earliest_time;
2538 GST_DEBUG_OBJECT (demux, "qos_earliest_time %" GST_TIME_FORMAT,
2539 GST_TIME_ARGS (demux->priv->qos_earliest_time));
2541 GST_OBJECT_UNLOCK (demux);
2544 case GST_EVENT_SELECT_STREAMS:
2546 return gst_adaptive_demux_handle_select_streams_event (demux, event);
2552 return gst_pad_event_default (pad, parent, event);
2556 gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
2559 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2560 gboolean ret = FALSE;
2566 switch (query->type) {
2567 case GST_QUERY_DURATION:{
2569 GstClockTime duration = GST_CLOCK_TIME_NONE;
2571 gst_query_parse_duration (query, &fmt, NULL);
2573 if (gst_adaptive_demux_is_live (demux)) {
2574 /* We are able to answer this query: the duration is unknown */
2575 gst_query_set_duration (query, fmt, -1);
2580 if (fmt == GST_FORMAT_TIME
2581 && g_atomic_int_get (&demux->priv->have_manifest)) {
2583 GST_MANIFEST_LOCK (demux);
2584 duration = demux->priv->duration;
2585 GST_MANIFEST_UNLOCK (demux);
2587 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
2588 gst_query_set_duration (query, GST_FORMAT_TIME, duration);
2593 GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
2594 GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
2597 case GST_QUERY_LATENCY:{
2598 gst_query_set_latency (query, FALSE, 0, -1);
2602 case GST_QUERY_SEEKING:{
2607 if (!g_atomic_int_get (&demux->priv->have_manifest)) {
2608 GST_INFO_OBJECT (demux,
2609 "Don't have manifest yet, can't answer seeking query");
2610 return FALSE; /* can't answer without manifest */
2613 GST_MANIFEST_LOCK (demux);
2615 gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
2616 GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
2617 if (fmt == GST_FORMAT_TIME) {
2618 GstClockTime duration;
2619 gboolean can_seek = gst_adaptive_demux_can_seek (demux);
2623 if (gst_adaptive_demux_is_live (demux)) {
2624 ret = gst_adaptive_demux_get_live_seek_range (demux, &start, &stop);
2626 GST_MANIFEST_UNLOCK (demux);
2627 GST_INFO_OBJECT (demux, "can't answer seeking query");
2631 duration = demux->priv->duration;
2632 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
2636 gst_query_set_seeking (query, fmt, can_seek, start, stop);
2637 GST_INFO_OBJECT (demux, "GST_QUERY_SEEKING returning with start : %"
2638 GST_TIME_FORMAT ", stop : %" GST_TIME_FORMAT,
2639 GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
2641 GST_MANIFEST_UNLOCK (demux);
2646 GST_MANIFEST_LOCK (demux);
2648 /* TODO HLS can answer this differently it seems */
2649 if (demux->manifest_uri) {
2650 /* FIXME: (hls) Do we answer with the variant playlist, with the current
2651 * playlist or the the uri of the last downlowaded fragment? */
2652 gst_query_set_uri (query, demux->manifest_uri);
2656 GST_MANIFEST_UNLOCK (demux);
2658 case GST_QUERY_SELECTABLE:
2659 gst_query_set_selectable (query, TRUE);
2663 /* Don't forward queries upstream because of the special nature of this
2664 * "demuxer", which relies on the upstream element only to be fed
2674 gst_adaptive_demux_handle_lost_sync (GstAdaptiveDemux * demux)
2678 GST_WARNING_OBJECT (demux, "Lost synchronization, seeking back to live head");
2681 gst_event_new_seek (1.0, GST_FORMAT_TIME,
2682 GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT, GST_SEEK_TYPE_END, 0,
2683 GST_SEEK_TYPE_NONE, 0);
2684 gst_adaptive_demux_handle_seek_event (demux, seek);
2689 /* Called when the scheduler starts, to kick off manifest updates
2690 * and stream downloads */
2692 gst_adaptive_demux_scheduler_start_cb (GstAdaptiveDemux * demux)
2696 GST_INFO_OBJECT (demux, "Starting streams' tasks");
2698 iter = demux->input_period->streams;
2700 for (; iter; iter = g_list_next (iter)) {
2701 GstAdaptiveDemux2Stream *stream = iter->data;
2703 /* If we need to process this stream to discover tracks *OR* it has any
2704 * tracks which are selected, start it now */
2705 if ((stream->pending_tracks == TRUE)
2706 || gst_adaptive_demux2_stream_is_selected_locked (stream))
2707 gst_adaptive_demux2_stream_start (stream);
2713 /* must be called with manifest_lock taken */
2715 gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux)
2717 if (!gst_adaptive_demux2_is_running (demux)) {
2718 GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
2722 GST_DEBUG_OBJECT (demux, "Starting the SCHEDULER task");
2723 gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2724 (GSourceFunc) gst_adaptive_demux_scheduler_start_cb, demux, NULL);
2726 TRACKS_LOCK (demux);
2727 demux->priv->flushing = FALSE;
2728 GST_DEBUG_OBJECT (demux, "Starting the output task");
2729 gst_task_start (demux->priv->output_task);
2730 TRACKS_UNLOCK (demux);
2733 /* must be called with manifest_lock taken */
2735 gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux * demux)
2737 GST_DEBUG_OBJECT (demux, "requesting stop of the manifest update task");
2738 if (demux->priv->manifest_updates_cb != 0) {
2739 gst_adaptive_demux_loop_cancel_call (demux->priv->scheduler_task,
2740 demux->priv->manifest_updates_cb);
2741 demux->priv->manifest_updates_cb = 0;
2745 static gboolean gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux);
2747 /* must be called with manifest_lock taken */
2749 gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux * demux)
2751 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2753 if (gst_adaptive_demux_is_live (demux)) {
2754 /* Task to periodically update the manifest */
2755 if (demux_class->requires_periodical_playlist_update (demux)) {
2756 GST_DEBUG_OBJECT (demux, "requesting start of the manifest update task");
2757 if (demux->priv->manifest_updates_cb == 0) {
2758 demux->priv->manifest_updates_cb =
2759 gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2760 (GSourceFunc) gst_adaptive_demux_updates_start_cb, demux, NULL);
2766 /* must be called with manifest_lock taken
2767 * This function will temporarily release manifest_lock in order to join the
2769 * The api_lock will still protect it against other threads trying to modify
2770 * the demux element.
2773 gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux, gboolean stop_updates)
2775 GST_LOG_OBJECT (demux, "Stopping tasks");
2778 gst_adaptive_demux_stop_manifest_update_task (demux);
2780 TRACKS_LOCK (demux);
2781 if (demux->input_period)
2782 gst_adaptive_demux_period_stop_tasks (demux->input_period);
2784 demux->priv->flushing = TRUE;
2785 g_cond_signal (&demux->priv->tracks_add);
2786 gst_task_stop (demux->priv->output_task);
2787 TRACKS_UNLOCK (demux);
2789 gst_task_join (demux->priv->output_task);
2791 demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2794 /* must be called with manifest_lock taken */
2796 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event)
2799 gboolean ret = TRUE;
2801 GST_DEBUG_OBJECT (demux, "event %" GST_PTR_FORMAT, event);
2803 TRACKS_LOCK (demux);
2804 for (iter = demux->priv->outputs; iter; iter = g_list_next (iter)) {
2805 OutputSlot *slot = (OutputSlot *) iter->data;
2806 gst_event_ref (event);
2807 GST_DEBUG_OBJECT (slot->pad, "Pushing event");
2808 ret = ret & gst_pad_push_event (slot->pad, event);
2809 if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
2810 slot->pushed_timed_data = FALSE;
2812 TRACKS_UNLOCK (demux);
2813 gst_event_unref (event);
2817 /* must be called with manifest_lock taken */
2819 gst_adaptive_demux2_stream_set_caps (GstAdaptiveDemux2Stream * stream,
2822 GST_DEBUG_OBJECT (stream,
2823 "setting new caps for stream %" GST_PTR_FORMAT, caps);
2824 gst_caps_replace (&stream->pending_caps, caps);
2825 gst_caps_unref (caps);
2828 /* must be called with manifest_lock taken */
2830 gst_adaptive_demux2_stream_set_tags (GstAdaptiveDemux2Stream * stream,
2833 GST_DEBUG_OBJECT (stream,
2834 "setting new tags for stream %" GST_PTR_FORMAT, tags);
2835 if (stream->pending_tags) {
2836 gst_tag_list_unref (stream->pending_tags);
2838 stream->pending_tags = tags;
2841 /* must be called with manifest_lock taken */
2843 gst_adaptive_demux2_stream_queue_event (GstAdaptiveDemux2Stream * stream,
2846 stream->pending_events = g_list_append (stream->pending_events, event);
2850 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
2856 /* Called when a stream needs waking after the manifest is updated */
2858 gst_adaptive_demux2_stream_wants_manifest_update (GstAdaptiveDemux * demux)
2860 demux->priv->stream_waiting_for_manifest = TRUE;
2864 gst_adaptive_demux_manifest_update_cb (GstAdaptiveDemux * demux)
2866 GstFlowReturn ret = GST_FLOW_OK;
2867 gboolean schedule_again = TRUE;
2869 GST_MANIFEST_LOCK (demux);
2870 demux->priv->manifest_updates_cb = 0;
2872 /* Updating playlist only needed for live playlists */
2873 if (!gst_adaptive_demux_is_live (demux)) {
2874 GST_MANIFEST_UNLOCK (demux);
2875 return G_SOURCE_REMOVE;
2878 GST_DEBUG_OBJECT (demux, "Updating playlist");
2879 ret = gst_adaptive_demux_update_manifest (demux);
2881 if (ret == GST_FLOW_EOS) {
2882 GST_MANIFEST_UNLOCK (demux);
2883 return G_SOURCE_REMOVE;
2886 if (ret == GST_FLOW_OK) {
2887 GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
2888 demux->priv->update_failed_count = 0;
2890 /* Wake up download tasks */
2891 if (demux->priv->stream_waiting_for_manifest) {
2894 for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
2895 GstAdaptiveDemux2Stream *stream = iter->data;
2896 gst_adaptive_demux2_stream_on_manifest_update (stream);
2898 demux->priv->stream_waiting_for_manifest = FALSE;
2901 demux->priv->update_failed_count++;
2903 if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
2904 GST_WARNING_OBJECT (demux, "Could not update the playlist, flow: %s",
2905 gst_flow_get_name (ret));
2907 GST_ELEMENT_ERROR (demux, STREAM, FAILED,
2908 (_("Internal data stream error.")), ("Could not update playlist"));
2909 GST_DEBUG_OBJECT (demux, "Stopped manifest updates because of error");
2910 schedule_again = FALSE;
2914 if (ret == GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC)
2915 gst_adaptive_demux_handle_lost_sync (demux);
2917 if (schedule_again) {
2918 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2920 demux->priv->manifest_updates_cb =
2921 gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
2922 klass->get_manifest_update_interval (demux) * GST_USECOND,
2923 (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
2926 GST_MANIFEST_UNLOCK (demux);
2928 return G_SOURCE_REMOVE;
2932 gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux)
2934 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2936 /* Loop for updating of the playlist. This periodically checks if
2937 * the playlist is updated and does so, then signals the streaming
2938 * thread in case it can continue downloading now. */
2940 /* block until the next scheduled update or the signal to quit this thread */
2941 GST_DEBUG_OBJECT (demux, "Started updates task");
2942 demux->priv->manifest_updates_cb =
2943 gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
2944 klass->get_manifest_update_interval (demux) * GST_USECOND,
2945 (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
2947 return G_SOURCE_REMOVE;
2951 find_replacement_slot_for_track (GstAdaptiveDemux * demux,
2952 GstAdaptiveDemuxTrack * track)
2956 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
2957 OutputSlot *slot = (OutputSlot *) tmp->data;
2958 /* Incompatible output type */
2959 if (slot->type != track->type)
2962 /* Slot which is already assigned to this pending track */
2963 if (slot->pending_track == track)
2966 /* slot already used for another pending track */
2967 if (slot->pending_track != NULL)
2970 /* Current output track is of the same type and is draining */
2971 if (slot->track && slot->track->draining)
2978 /* TRACKS_LOCK taken */
2980 find_slot_for_track (GstAdaptiveDemux * demux, GstAdaptiveDemuxTrack * track)
2984 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
2985 OutputSlot *slot = (OutputSlot *) tmp->data;
2987 if (slot->track == track)
2994 /* TRACKS_LOCK held */
2996 all_selected_tracks_are_active (GstAdaptiveDemux * demux, guint32 seqnum)
3001 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3002 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3004 if (track->selected && !track->active)
3008 /* All selected tracks are active, created message */
3010 gst_message_new_streams_selected (GST_OBJECT (demux),
3011 demux->output_period->collection);
3012 GST_MESSAGE_SEQNUM (msg) = seqnum;
3013 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3014 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3015 if (track->active) {
3016 gst_message_streams_selected_add (msg, track->stream_object);
3024 gst_adaptive_demux_send_initial_events (GstAdaptiveDemux * demux,
3027 GstAdaptiveDemuxTrack *track = slot->track;
3030 /* Send EVENT_STREAM_START */
3031 event = gst_event_new_stream_start (track->stream_id);
3032 if (demux->have_group_id)
3033 gst_event_set_group_id (event, demux->group_id);
3034 gst_event_set_stream_flags (event, track->flags);
3035 gst_event_set_stream (event, track->stream_object);
3036 GST_DEBUG_OBJECT (demux, "Sending stream-start for track '%s'",
3038 gst_pad_push_event (slot->pad, event);
3040 /* Send EVENT_STREAM_COLLECTION */
3041 event = gst_event_new_stream_collection (demux->output_period->collection);
3042 GST_DEBUG_OBJECT (demux, "Sending stream-collection for track '%s'",
3044 gst_pad_push_event (slot->pad, event);
3046 /* Mark all sticky events for re-sending */
3047 gst_event_store_mark_all_undelivered (&track->sticky_events);
3051 * Called with TRACKS_LOCK taken
3054 check_and_handle_selection_update_locked (GstAdaptiveDemux * demux)
3057 guint requested_selection_seqnum;
3060 /* If requested_selection_seqnum != current_selection_seqnum, re-check all
3061 output slots vs active/draining tracks */
3062 requested_selection_seqnum =
3063 g_atomic_int_get (&demux->priv->requested_selection_seqnum);
3065 if (requested_selection_seqnum == demux->priv->current_selection_seqnum)
3068 GST_DEBUG_OBJECT (demux, "Selection changed, re-checking all output slots");
3070 /* Go over all slots, and if they have a pending track that's no longer
3071 * selected, clear it so the slot can be reused */
3072 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3073 OutputSlot *slot = (OutputSlot *) tmp->data;
3075 if (slot->pending_track != NULL && !slot->pending_track->selected) {
3076 GST_DEBUG_OBJECT (demux,
3077 "Removing deselected track '%s' as pending from output of current track '%s'",
3078 slot->pending_track->stream_id, slot->track->stream_id);
3079 gst_adaptive_demux_track_unref (slot->pending_track);
3080 slot->pending_track = NULL;
3084 /* Go over all tracks and create/re-assign/remove slots */
3085 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3086 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3088 if (track->selected) {
3089 OutputSlot *slot = find_slot_for_track (demux, track);
3091 /* 0. Track is selected and has a slot. Nothing to do */
3093 GST_DEBUG_OBJECT (demux, "Track '%s' is already being outputted",
3098 slot = find_replacement_slot_for_track (demux, track);
3100 /* 1. There is an existing slot of the same type which is currently
3101 * draining, assign this track as a replacement for it */
3102 g_assert (slot->pending_track == NULL || slot->pending_track == track);
3103 if (slot->pending_track == NULL) {
3104 slot->pending_track = gst_adaptive_demux_track_ref (track);
3105 GST_DEBUG_OBJECT (demux,
3106 "Track '%s' (period %u) will be used on output of track '%s' (period %u)",
3107 track->stream_id, track->period_num,
3108 slot->track->stream_id, slot->track->period_num);
3111 /* 2. There is no compatible replacement slot, create a new one */
3112 slot = gst_adaptive_demux_output_slot_new (demux, track->type);
3113 GST_DEBUG_OBJECT (demux, "Created slot for track '%s'",
3115 demux->priv->outputs = g_list_append (demux->priv->outputs, slot);
3117 track->update_next_segment = TRUE;
3119 slot->track = gst_adaptive_demux_track_ref (track);
3120 track->active = TRUE;
3121 gst_adaptive_demux_send_initial_events (demux, slot);
3124 /* If we were draining this track, we no longer are */
3125 track->draining = FALSE;
3129 /* Finally check all slots have a current/pending track. If not remove it */
3130 for (tmp = demux->priv->outputs; tmp;) {
3131 OutputSlot *slot = (OutputSlot *) tmp->data;
3132 /* We should never has slots without target tracks */
3133 g_assert (slot->track);
3134 if (slot->track->draining && !slot->pending_track) {
3135 GstAdaptiveDemux2Stream *stream;
3137 GST_DEBUG_OBJECT (demux, "Output for track '%s' is no longer used",
3138 slot->track->stream_id);
3139 slot->track->active = FALSE;
3141 /* If the stream feeding this track is stopped, flush and clear
3142 * the track now that it's going inactive. If the stream was not
3143 * found, it means we advanced past that period already (and the
3144 * stream was stopped and discarded) */
3145 stream = find_stream_for_track_locked (demux, slot->track);
3146 if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3147 gst_adaptive_demux_track_flush (slot->track);
3149 tmp = demux->priv->outputs = g_list_remove (demux->priv->outputs, slot);
3150 gst_adaptive_demux_output_slot_free (demux, slot);
3155 demux->priv->current_selection_seqnum = requested_selection_seqnum;
3156 msg = all_selected_tracks_are_active (demux, requested_selection_seqnum);
3158 TRACKS_UNLOCK (demux);
3159 GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3160 gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3161 TRACKS_LOCK (demux);
3165 /* TRACKS_LOCK held */
3167 gst_adaptive_demux_advance_output_period (GstAdaptiveDemux * demux)
3170 GstAdaptiveDemuxPeriod *previous_period;
3171 GstStreamCollection *collection;
3173 /* Grab the next period, should be demux->periods->next->data */
3174 previous_period = g_queue_pop_head (demux->priv->periods);
3176 /* Remove ref held by demux->output_period */
3177 gst_adaptive_demux_period_unref (previous_period);
3178 demux->output_period =
3179 gst_adaptive_demux_period_ref (g_queue_peek_head (demux->priv->periods));
3181 GST_DEBUG_OBJECT (demux, "Moved output to period %d",
3182 demux->output_period->period_num);
3184 /* We can now post the collection of the new period */
3185 collection = demux->output_period->collection;
3186 TRACKS_UNLOCK (demux);
3187 gst_element_post_message (GST_ELEMENT_CAST (demux),
3188 gst_message_new_stream_collection (GST_OBJECT (demux), collection));
3189 TRACKS_LOCK (demux);
3191 /* Unselect all tracks of the previous period */
3192 for (iter = previous_period->tracks; iter; iter = iter->next) {
3193 GstAdaptiveDemuxTrack *track = iter->data;
3194 if (track->selected) {
3195 track->selected = FALSE;
3196 track->draining = TRUE;
3200 /* Force a selection re-check */
3201 g_atomic_int_inc (&demux->priv->requested_selection_seqnum);
3202 check_and_handle_selection_update_locked (demux);
3204 /* Remove the final ref on the previous period now that we have done the switch */
3205 gst_adaptive_demux_period_unref (previous_period);
3210 /* Called with TRACKS_LOCK taken */
3212 handle_slot_pending_track_switch_locked (GstAdaptiveDemux * demux,
3215 GstAdaptiveDemuxTrack *track = slot->track;
3217 gboolean pending_is_ready;
3218 GstAdaptiveDemux2Stream *stream;
3220 /* If we have a pending track for this slot, the current track should be
3221 * draining and no longer selected */
3222 g_assert (track->draining && !track->selected);
3224 /* If we're draining, check if the pending track has enough data *or* that
3225 we've already drained out entirely */
3227 (slot->pending_track->level_time >=
3228 slot->pending_track->buffering_threshold);
3229 pending_is_ready |= slot->pending_track->eos;
3231 if (!pending_is_ready && gst_queue_array_get_length (track->queue) > 0) {
3232 GST_DEBUG_OBJECT (demux,
3233 "Replacement track '%s' doesn't have enough data for switching yet",
3234 slot->pending_track->stream_id);
3238 GST_DEBUG_OBJECT (demux,
3239 "Pending replacement track has enough data, switching");
3240 track->active = FALSE;
3241 track->draining = FALSE;
3243 /* If the stream feeding this track is stopped, flush and clear
3244 * the track now that it's going inactive. If the stream was not
3245 * found, it means we advanced past that period already (and the
3246 * stream was stopped and discarded) */
3247 stream = find_stream_for_track_locked (demux, track);
3248 if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3249 gst_adaptive_demux_track_flush (track);
3251 gst_adaptive_demux_track_unref (track);
3252 /* We steal the reference of pending_track */
3253 track = slot->track = slot->pending_track;
3254 slot->pending_track = NULL;
3255 slot->track->active = TRUE;
3257 /* Make sure the track segment will start at the current position */
3258 track->update_next_segment = TRUE;
3260 /* Send stream start and collection, and schedule sticky events */
3261 gst_adaptive_demux_send_initial_events (demux, slot);
3263 /* Can we emit the streams-selected message now ? */
3265 all_selected_tracks_are_active (demux,
3266 g_atomic_int_get (&demux->priv->requested_selection_seqnum));
3268 TRACKS_UNLOCK (demux);
3269 GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3270 gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3271 TRACKS_LOCK (demux);
3277 gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux)
3280 GstClockTimeDiff global_output_position = GST_CLOCK_STIME_NONE;
3281 gboolean wait_for_data = FALSE;
3282 gboolean all_tracks_empty;
3285 GST_DEBUG_OBJECT (demux, "enter");
3287 TRACKS_LOCK (demux);
3289 /* Check if stopping */
3290 if (demux->priv->flushing) {
3291 ret = GST_FLOW_FLUSHING;
3295 /* If the selection changed, handle it */
3296 check_and_handle_selection_update_locked (demux);
3300 global_output_position = GST_CLOCK_STIME_NONE;
3301 all_tracks_empty = TRUE;
3303 if (wait_for_data) {
3304 GST_DEBUG_OBJECT (demux, "Waiting for data");
3305 g_cond_wait (&demux->priv->tracks_add, &demux->priv->tracks_lock);
3306 GST_DEBUG_OBJECT (demux, "Done waiting for data");
3307 if (demux->priv->flushing) {
3308 ret = GST_FLOW_FLUSHING;
3311 wait_for_data = FALSE;
3314 /* Grab/Recalculate current global output position
3315 * This is the minimum pending output position of all tracks used for output
3317 * If there is a track which is empty and not EOS, wait for it to receive data
3318 * then recalculate global output position.
3320 * This also pushes downstream all non-timed data that might be present.
3322 * IF all tracks are EOS : stop task
3324 GST_LOG_OBJECT (demux, "Calculating global output position of output slots");
3325 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3326 OutputSlot *slot = (OutputSlot *) tmp->data;
3327 GstAdaptiveDemuxTrack *track;
3329 /* If there is a pending track, Check if it's time to switch to it */
3330 if (slot->pending_track)
3331 handle_slot_pending_track_switch_locked (demux, slot);
3333 track = slot->track;
3335 if (!track->active) {
3336 /* Note: Edward: I can't see in what cases we would end up with inactive
3337 tracks assigned to slots. */
3338 GST_ERROR_OBJECT (demux, "FIXME : Handle track switching");
3339 g_assert (track->active);
3343 if (track->next_position == GST_CLOCK_STIME_NONE) {
3344 gst_adaptive_demux_track_update_next_position (track);
3347 GST_TRACE_OBJECT (demux,
3348 "Looking at track %s (period %u). next_position %" GST_STIME_FORMAT,
3349 track->stream_id, track->period_num,
3350 GST_STIME_ARGS (track->next_position));
3352 if (track->next_position != GST_CLOCK_STIME_NONE) {
3353 if (global_output_position == GST_CLOCK_STIME_NONE)
3354 global_output_position = track->next_position;
3356 global_output_position =
3357 MIN (global_output_position, track->next_position);
3358 track->waiting_add = FALSE;
3359 all_tracks_empty = FALSE;
3360 } else if (!track->eos) {
3361 GST_DEBUG_OBJECT (demux, "Need timed data on track %s (period %u)",
3362 track->stream_id, track->period_num);
3363 all_tracks_empty = FALSE;
3364 wait_for_data = track->waiting_add = TRUE;
3366 GST_DEBUG_OBJECT (demux,
3367 "Track %s (period %u) is EOS, not waiting for timed data",
3368 track->stream_id, track->period_num);
3370 if (gst_queue_array_get_length (track->queue) > 0) {
3371 all_tracks_empty = FALSE;
3379 if (all_tracks_empty && demux->output_period->has_next_period) {
3380 GST_DEBUG_OBJECT (demux, "Period %d is drained, switching to next period",
3381 demux->output_period->period_num);
3382 if (!gst_adaptive_demux_advance_output_period (demux)) {
3383 /* Failed to move to next period, error out */
3384 ret = GST_FLOW_ERROR;
3387 /* Restart the loop */
3391 GST_DEBUG_OBJECT (demux, "Outputting data for position %" GST_STIME_FORMAT,
3392 GST_STIME_ARGS (global_output_position));
3396 * We know all active tracks have pending timed data
3397 * * while track next_position <= global output position
3398 * * push pending data
3399 * * Update track next_position
3400 * * recalculate global output position
3401 * * Pop next pending data from track and update pending position
3404 gboolean need_restart = FALSE;
3406 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3407 OutputSlot *slot = (OutputSlot *) tmp->data;
3408 GstAdaptiveDemuxTrack *track = slot->track;
3410 GST_LOG_OBJECT (track->element,
3411 "active:%d draining:%d selected:%d next_position:%" GST_STIME_FORMAT
3412 " global_output_position:%" GST_STIME_FORMAT, track->active,
3413 track->draining, track->selected, GST_STIME_ARGS (track->next_position),
3414 GST_STIME_ARGS (global_output_position));
3419 while (global_output_position == GST_CLOCK_STIME_NONE
3420 || !slot->pushed_timed_data
3421 || ((track->next_position != GST_CLOCK_STIME_NONE)
3422 && track->next_position <= global_output_position)
3423 || ((track->next_position == GST_CLOCK_STIME_NONE) && track->eos)) {
3424 GstMiniObject *mo = track_dequeue_data_locked (demux, track, TRUE);
3427 GST_DEBUG_OBJECT (demux,
3428 "Track '%s' (period %u) doesn't have any pending data (eos:%d pushed_timed_data:%d)",
3429 track->stream_id, track->period_num, track->eos,
3430 slot->pushed_timed_data);
3431 /* This should only happen if the track is EOS, or exactly in between
3432 * the parser outputting segment/caps before buffers. */
3433 g_assert (track->eos || !slot->pushed_timed_data);
3435 /* If we drained the track, but there's a pending track on the slot
3436 * loop again to activate it */
3437 if (slot->pending_track) {
3438 GST_DEBUG_OBJECT (demux,
3439 "Track '%s' (period %u) drained, but has a pending track to activate",
3440 track->stream_id, track->period_num);
3446 demux_update_buffering_locked (demux);
3447 demux_post_buffering_locked (demux);
3448 TRACKS_UNLOCK (demux);
3450 GST_DEBUG_OBJECT (demux,
3451 "Track '%s' (period %u) dequeued %" GST_PTR_FORMAT, track->stream_id,
3452 track->period_num, mo);
3454 if (GST_IS_EVENT (mo)) {
3455 GstEvent *event = (GstEvent *) mo;
3456 if (GST_EVENT_TYPE (event) == GST_EVENT_GAP) {
3457 slot->pushed_timed_data = TRUE;
3458 } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
3459 /* If there is a pending next period, don't send the EOS */
3460 if (demux->output_period->has_next_period) {
3461 GST_LOG_OBJECT (demux,
3462 "Dropping EOS on track '%s' (period %u) before next period",
3463 track->stream_id, track->period_num);
3464 gst_event_store_mark_delivered (&track->sticky_events, event);
3465 gst_event_unref (event);
3467 /* We'll need to re-check if all tracks are empty again above */
3468 need_restart = TRUE;
3472 if (event != NULL) {
3473 gst_pad_push_event (slot->pad, gst_event_ref (event));
3475 if (GST_EVENT_IS_STICKY (event))
3476 gst_event_store_mark_delivered (&track->sticky_events, event);
3477 gst_event_unref (event);
3479 } else if (GST_IS_BUFFER (mo)) {
3480 GstBuffer *buffer = (GstBuffer *) mo;
3482 if (track->output_discont) {
3483 if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT)) {
3484 buffer = gst_buffer_make_writable (buffer);
3485 GST_DEBUG_OBJECT (slot->pad,
3486 "track %s marking discont %" GST_PTR_FORMAT, track->stream_id,
3488 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
3490 track->output_discont = FALSE;
3492 slot->flow_ret = gst_pad_push (slot->pad, buffer);
3494 gst_flow_combiner_update_pad_flow (demux->priv->flowcombiner,
3495 slot->pad, slot->flow_ret);
3496 GST_DEBUG_OBJECT (slot->pad,
3497 "track %s (period %u) push returned %s (combined %s)",
3498 track->stream_id, track->period_num,
3499 gst_flow_get_name (slot->flow_ret), gst_flow_get_name (ret));
3500 slot->pushed_timed_data = TRUE;
3502 GST_ERROR ("Unhandled miniobject %" GST_PTR_FORMAT, mo);
3505 TRACKS_LOCK (demux);
3506 gst_adaptive_demux_track_update_next_position (track);
3508 if (ret != GST_FLOW_OK)
3513 /* Store global output position */
3514 if (global_output_position != GST_CLOCK_STIME_NONE) {
3515 demux->priv->global_output_position = global_output_position;
3517 /* And see if any streams need to be woken for more input */
3518 gst_adaptive_demux_period_check_input_wakeup_locked (demux->input_period,
3519 global_output_position);
3525 if (global_output_position == GST_CLOCK_STIME_NONE) {
3526 if (!demux->priv->flushing) {
3527 GST_DEBUG_OBJECT (demux,
3528 "Pausing output task after reaching NONE global_output_position");
3529 gst_task_pause (demux->priv->output_task);
3533 TRACKS_UNLOCK (demux);
3534 GST_DEBUG_OBJECT (demux, "leave");
3539 GST_DEBUG_OBJECT (demux, "Pausing due to %s", gst_flow_get_name (ret));
3540 /* If the flushing flag is set, then the task is being
3541 * externally stopped, so don't go to pause(), otherwise we
3542 * should so we don't keep spinning */
3543 if (!demux->priv->flushing) {
3544 GST_DEBUG_OBJECT (demux, "Pausing task due to %s",
3545 gst_flow_get_name (ret));
3546 gst_task_pause (demux->priv->output_task);
3549 TRACKS_UNLOCK (demux);
3551 if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
3552 GstEvent *eos = gst_event_new_eos ();
3554 if (ret != GST_FLOW_EOS) {
3555 GST_ELEMENT_FLOW_ERROR (demux, ret);
3558 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3559 if (demux->priv->segment_seqnum != GST_SEQNUM_INVALID)
3560 gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
3561 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3563 gst_adaptive_demux_push_src_event (demux, eos);
3570 /* must be called from the scheduler */
3572 gst_adaptive_demux_is_live (GstAdaptiveDemux * demux)
3574 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3577 return klass->is_live (demux);
3582 handle_manifest_download_complete (DownloadRequest * request,
3583 DownloadRequestState state, GstAdaptiveDemux * demux)
3585 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3587 GstFlowReturn result;
3589 g_free (demux->manifest_base_uri);
3590 g_free (demux->manifest_uri);
3592 if (request->redirect_permanent && request->redirect_uri) {
3593 demux->manifest_uri = g_strdup (request->redirect_uri);
3594 demux->manifest_base_uri = NULL;
3596 demux->manifest_uri = g_strdup (request->uri);
3597 demux->manifest_base_uri = g_strdup (request->redirect_uri);
3600 buffer = download_request_take_buffer (request);
3602 /* We should always have a buffer since this function is the non-error
3603 * callback for the download */
3606 result = klass->update_manifest_data (demux, buffer);
3607 gst_buffer_unref (buffer);
3609 /* FIXME: Should the manifest uri vars be reverted to original
3610 * values if updating fails? */
3612 if (result == GST_FLOW_OK) {
3613 GstClockTime duration;
3614 /* Send an updated duration message */
3615 duration = klass->get_duration (demux);
3616 if (duration != GST_CLOCK_TIME_NONE) {
3617 GST_DEBUG_OBJECT (demux,
3618 "Sending duration message : %" GST_TIME_FORMAT,
3619 GST_TIME_ARGS (duration));
3620 gst_element_post_message (GST_ELEMENT (demux),
3621 gst_message_new_duration_changed (GST_OBJECT (demux)));
3623 GST_DEBUG_OBJECT (demux,
3624 "Duration unknown, can not send the duration message");
3627 /* If a manifest changes it's liveness or periodic updateness, we need
3628 * to start/stop the manifest update task appropriately */
3629 /* Keep this condition in sync with the one in
3630 * gst_adaptive_demux_start_manifest_update_task()
3632 if (gst_adaptive_demux_is_live (demux) &&
3633 klass->requires_periodical_playlist_update (demux)) {
3634 gst_adaptive_demux_start_manifest_update_task (demux);
3636 gst_adaptive_demux_stop_manifest_update_task (demux);
3642 handle_manifest_download_failure (DownloadRequest * request,
3643 DownloadRequestState state, GstAdaptiveDemux * demux)
3645 GST_FIXME_OBJECT (demux, "Manifest download failed.");
3646 /* Retry or error out here */
3649 static GstFlowReturn
3650 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux)
3652 DownloadRequest *request;
3653 GstFlowReturn ret = GST_FLOW_OK;
3654 GError *error = NULL;
3656 request = download_request_new_uri (demux->manifest_uri);
3658 download_request_set_callbacks (request,
3659 (DownloadRequestEventCallback) handle_manifest_download_complete,
3660 (DownloadRequestEventCallback) handle_manifest_download_failure,
3663 if (!downloadhelper_submit_request (demux->download_helper, NULL,
3664 DOWNLOAD_FLAG_COMPRESS | DOWNLOAD_FLAG_FORCE_REFRESH, request,
3667 GST_ELEMENT_WARNING (demux, RESOURCE, FAILED,
3668 ("Failed to download manifest: %s", error->message), (NULL));
3669 g_clear_error (&error);
3671 ret = GST_FLOW_NOT_LINKED;
3677 /* must be called with manifest_lock taken */
3679 gst_adaptive_demux_update_manifest (GstAdaptiveDemux * demux)
3681 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3684 ret = klass->update_manifest (demux);
3689 /* must be called with manifest_lock taken */
3691 gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
3693 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3694 gboolean ret = FALSE;
3696 if (klass->has_next_period)
3697 ret = klass->has_next_period (demux);
3698 GST_DEBUG_OBJECT (demux, "Has next period: %d", ret);
3702 /* must be called with manifest_lock taken */
3704 gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
3706 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3707 GstAdaptiveDemuxPeriod *previous_period = demux->input_period;
3709 g_return_if_fail (klass->advance_period != NULL);
3711 GST_DEBUG_OBJECT (demux, "Advancing to next period");
3712 /* FIXME : no return value ? What if it fails ? */
3713 klass->advance_period (demux);
3715 if (previous_period == demux->input_period) {
3716 GST_ERROR_OBJECT (demux, "Advancing period failed");
3720 /* Stop the previous period stream tasks */
3721 gst_adaptive_demux_period_stop_tasks (previous_period);
3723 gst_adaptive_demux_update_collection (demux, demux->input_period);
3724 /* Figure out a pre-emptive selection based on the output period selection */
3725 gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
3726 demux->output_period);
3728 gst_adaptive_demux_prepare_streams (demux, FALSE);
3729 gst_adaptive_demux_start_tasks (demux);
3733 * gst_adaptive_demux_get_monotonic_time:
3734 * Returns: a monotonically increasing time, using the system realtime clock
3737 gst_adaptive_demux2_get_monotonic_time (GstAdaptiveDemux * demux)
3739 g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
3740 return gst_adaptive_demux_clock_get_time (demux->realtime_clock);
3744 * gst_adaptive_demux_get_client_now_utc:
3745 * @demux: #GstAdaptiveDemux
3746 * Returns: the client's estimate of UTC
3748 * Used to find the client's estimate of UTC, using the system realtime clock.
3751 gst_adaptive_demux2_get_client_now_utc (GstAdaptiveDemux * demux)
3753 return gst_adaptive_demux_clock_get_now_utc (demux->realtime_clock);
3757 * gst_adaptive_demux_is_running
3758 * @demux: #GstAdaptiveDemux
3759 * Returns: whether the demuxer is processing data
3761 * Returns FALSE if shutdown has started (transitioning down from
3762 * PAUSED), otherwise TRUE.
3765 gst_adaptive_demux2_is_running (GstAdaptiveDemux * demux)
3767 return g_atomic_int_get (&demux->running);
3771 * gst_adaptive_demux_get_qos_earliest_time:
3773 * Returns: The QOS earliest time
3778 gst_adaptive_demux2_get_qos_earliest_time (GstAdaptiveDemux * demux)
3780 GstClockTime earliest;
3782 GST_OBJECT_LOCK (demux);
3783 earliest = demux->priv->qos_earliest_time;
3784 GST_OBJECT_UNLOCK (demux);
3790 gst_adaptive_demux2_add_stream (GstAdaptiveDemux * demux,
3791 GstAdaptiveDemux2Stream * stream)
3793 g_return_val_if_fail (demux && stream, FALSE);
3795 /* FIXME : Migrate to parent */
3796 g_return_val_if_fail (stream->demux == NULL, FALSE);
3798 GST_DEBUG_OBJECT (demux, "Adding stream %s", GST_OBJECT_NAME (stream));
3800 TRACKS_LOCK (demux);
3801 if (demux->input_period->prepared) {
3802 GST_ERROR_OBJECT (demux,
3803 "Attempted to add streams but no new period was created");
3804 TRACKS_UNLOCK (demux);
3807 stream->demux = demux;
3808 stream->period = demux->input_period;
3809 demux->input_period->streams =
3810 g_list_append (demux->input_period->streams, stream);
3812 if (stream->tracks) {
3814 for (iter = stream->tracks; iter; iter = iter->next)
3815 if (!gst_adaptive_demux_period_add_track (demux->input_period,
3816 (GstAdaptiveDemuxTrack *) iter->data)) {
3817 GST_ERROR_OBJECT (demux, "Failed to add track elements");
3818 TRACKS_UNLOCK (demux);
3822 TRACKS_UNLOCK (demux);
3826 /* Return the current playback rate including any instant rate multiplier */
3828 gst_adaptive_demux_play_rate (GstAdaptiveDemux * demux)
3831 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3832 rate = demux->segment.rate * demux->instant_rate_multiplier;
3833 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);