3 * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4 * Author: Thiago Santos <thiagoss@osg.samsung.com>
6 * Copyright (C) 2021-2022 Centricular Ltd
7 * Author: Edward Hervey <edward@centricular.com>
8 * Author: Jan Schmidt <jan@centricular.com>
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Library General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Library General Public License for more details.
20 * You should have received a copy of the GNU Library General Public
21 * License along with this library; if not, write to the
22 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23 * Boston, MA 02110-1301, USA.
27 * SECTION:plugin-adaptivedemux2
28 * @short_description: Next Generation adaptive demuxers
30 * What is an adaptive demuxer? Adaptive demuxers are special demuxers in the
31 * sense that they don't actually demux data received from upstream but download
32 * the data themselves.
34 * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and a set
35 * of fragments. The manifest describes the available media and the sequence of
36 * fragments to use. Each fragment contains a small part of the media (typically
37 * only a few seconds). It is possible for the manifest to have the same media
38 * available in different configurations (bitrates for example) so that the
39 * client can select the one that best suits its scenario (network fluctuation,
40 * hardware requirements...).
42 * Furthermore, that manifest can also specify alternative medias (such as audio
43 * or subtitle tracks in different languages). Only the fragments for the
44 * requested selection will be download.
46 * These elements can therefore "adapt" themselves to the network conditions (as
47 * opposed to the server doing that adaptation) and user choices, which is why
48 * they are called "adaptive" demuxers.
50 * Note: These elements require a "streams-aware" container to work
51 * (i.e. urisourcebin, decodebin3, playbin3, or any bin/pipeline with the
52 * GST_BIN_FLAG_STREAMS_AWARE flag set).
55 * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
56 * about the intrinsics of the subclass formats, so the subclasses are
57 * responsible for maintaining the manifest data structures and stream
64 See the adaptive-demuxer.md design documentation for more information
67 The following rules were observed while implementing MT safety in adaptive demux:
68 1. If a variable is accessed from multiple threads and at least one thread
69 writes to it, then all the accesses needs to be done from inside a critical section.
70 2. If thread A wants to join thread B then at the moment it calls gst_task_join
71 it must not hold any mutexes that thread B might take.
73 Adaptive demux API can be called from several threads. More, adaptive demux
74 starts some threads to monitor the download of fragments. In order to protect
75 accesses to shared variables (demux and streams) all the API functions that
76 can be run in different threads will need to get a mutex (manifest_lock)
77 when they start and release it when they end. Because some of those functions
78 can indirectly call other API functions (eg they can generate events or messages
79 that are processed in the same thread) the manifest_lock must be recursive.
81 The manifest_lock will serialize the public API making access to shared
82 variables safe. But some of these functions will try at some moment to join
83 threads created by adaptive demux, or to change the state of src elements
84 (which will block trying to join the src element streaming thread). Because
85 of rule 2, those functions will need to release the manifest_lock during the
86 call of gst_task_join. During this time they can be interrupted by other API calls.
87 For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
88 is called and this will join all threads. In order to prevent interruptions
89 during such period, all the API functions will also use a second lock: api_lock.
90 This will be taken at the beginning of the function and released at the end,
91 but this time this lock will not be temporarily released during join.
92 This lock will be used only by API calls (not by the SCHEDULER task)
93 so it is safe to hold it while joining the threads or changing the src element state. The
94 api_lock will serialise all external requests to adaptive demux. In order to
95 avoid deadlocks, if a function needs to acquire both manifest and api locks,
96 the api_lock will be taken first and the manifest_lock second.
98 By using the api_lock a thread is protected against other API calls.
105 #include "gstadaptivedemux.h"
106 #include "gstadaptivedemux-private.h"
108 #include <glib/gi18n-lib.h>
109 #include <gst/base/gstadapter.h>
110 #include <gst/app/gstappsrc.h>
112 GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
113 #define GST_CAT_DEFAULT adaptivedemux2_debug
115 #define DEFAULT_FAILED_COUNT 3
116 #define DEFAULT_CONNECTION_BITRATE 0
117 #define DEFAULT_BANDWIDTH_TARGET_RATIO 0.8f
119 #define DEFAULT_MIN_BITRATE 0
120 #define DEFAULT_MAX_BITRATE 0
122 #define DEFAULT_MAX_BUFFERING_TIME (30 * GST_SECOND)
124 #define DEFAULT_BUFFERING_HIGH_WATERMARK_TIME (30 * GST_SECOND)
125 #define DEFAULT_BUFFERING_LOW_WATERMARK_TIME 0 /* Automatic */
126 #define DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS 0.0
127 #define DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS 0.0
129 #define DEFAULT_CURRENT_LEVEL_TIME_VIDEO 0
130 #define DEFAULT_CURRENT_LEVEL_TIME_AUDIO 0
132 #define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
133 #define GST_API_LOCK(d) g_mutex_lock (GST_API_GET_LOCK (d));
134 #define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
139 PROP_CONNECTION_SPEED,
140 PROP_BANDWIDTH_TARGET_RATIO,
141 PROP_CONNECTION_BITRATE,
144 PROP_CURRENT_BANDWIDTH,
145 PROP_MAX_BUFFERING_TIME,
146 PROP_BUFFERING_HIGH_WATERMARK_TIME,
147 PROP_BUFFERING_LOW_WATERMARK_TIME,
148 PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
149 PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
150 PROP_CURRENT_LEVEL_TIME_VIDEO,
151 PROP_CURRENT_LEVEL_TIME_AUDIO,
155 static GstStaticPadTemplate gst_adaptive_demux_videosrc_template =
156 GST_STATIC_PAD_TEMPLATE ("video_%02u",
159 GST_STATIC_CAPS_ANY);
161 static GstStaticPadTemplate gst_adaptive_demux_audiosrc_template =
162 GST_STATIC_PAD_TEMPLATE ("audio_%02u",
165 GST_STATIC_CAPS_ANY);
167 static GstStaticPadTemplate gst_adaptive_demux_subtitlesrc_template =
168 GST_STATIC_PAD_TEMPLATE ("subtitle_%02u",
171 GST_STATIC_CAPS_ANY);
173 /* Private structure for a track being outputted */
174 typedef struct _OutputSlot
179 /* Last flow return */
180 GstFlowReturn flow_ret;
185 /* Target track (reference) */
186 GstAdaptiveDemuxTrack *track;
188 /* Pending track (which will replace track) */
189 GstAdaptiveDemuxTrack *pending_track;
191 /* TRUE if a buffer or a gap event was pushed through this slot. */
192 gboolean pushed_timed_data;
195 static GstBinClass *parent_class = NULL;
196 static gint private_offset = 0;
198 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
199 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
200 GstAdaptiveDemuxClass * klass);
201 static void gst_adaptive_demux_finalize (GObject * object);
202 static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
203 element, GstStateChange transition);
204 static gboolean gst_adaptive_demux_query (GstElement * element,
206 static gboolean gst_adaptive_demux_send_event (GstElement * element,
209 static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
211 static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
213 static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
214 GstObject * parent, GstBuffer * buffer);
215 static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
217 static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
219 static gboolean gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux,
221 static gboolean gst_adaptive_demux_handle_select_streams_event (GstAdaptiveDemux
222 * demux, GstEvent * event);
225 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
227 static void gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux);
228 static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
229 static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
230 gboolean first_and_live);
233 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
235 static void gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux *
237 static void gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux *
240 static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux);
241 static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux,
242 gboolean stop_updates);
245 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
248 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
249 * method to get to the padtemplates */
251 gst_adaptive_demux_ng_get_type (void)
253 static gsize type = 0;
255 if (g_once_init_enter (&type)) {
257 static const GTypeInfo info = {
258 sizeof (GstAdaptiveDemuxClass),
261 (GClassInitFunc) gst_adaptive_demux_class_init,
264 sizeof (GstAdaptiveDemux),
266 (GInstanceInitFunc) gst_adaptive_demux_init,
269 _type = g_type_register_static (GST_TYPE_BIN,
270 "GstAdaptiveDemux2", &info, G_TYPE_FLAG_ABSTRACT);
273 g_type_add_instance_private (_type, sizeof (GstAdaptiveDemuxPrivate));
275 g_once_init_leave (&type, _type);
280 static inline GstAdaptiveDemuxPrivate *
281 gst_adaptive_demux_get_instance_private (GstAdaptiveDemux * self)
283 return (G_STRUCT_MEMBER_P (self, private_offset));
287 gst_adaptive_demux_set_property (GObject * object, guint prop_id,
288 const GValue * value, GParamSpec * pspec)
290 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
292 GST_OBJECT_LOCK (demux);
295 case PROP_CONNECTION_SPEED:
296 demux->connection_speed = g_value_get_uint (value) * 1000;
297 GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
298 demux->connection_speed);
300 case PROP_BANDWIDTH_TARGET_RATIO:
301 demux->bandwidth_target_ratio = g_value_get_float (value);
303 case PROP_MIN_BITRATE:
304 demux->min_bitrate = g_value_get_uint (value);
306 case PROP_MAX_BITRATE:
307 demux->max_bitrate = g_value_get_uint (value);
309 case PROP_CONNECTION_BITRATE:
310 demux->connection_speed = g_value_get_uint (value);
312 /* FIXME: Recalculate track and buffering levels
313 * when watermarks change? */
314 case PROP_MAX_BUFFERING_TIME:
315 demux->max_buffering_time = g_value_get_uint64 (value);
317 case PROP_BUFFERING_HIGH_WATERMARK_TIME:
318 demux->buffering_high_watermark_time = g_value_get_uint64 (value);
320 case PROP_BUFFERING_LOW_WATERMARK_TIME:
321 demux->buffering_low_watermark_time = g_value_get_uint64 (value);
323 case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
324 demux->buffering_high_watermark_fragments = g_value_get_double (value);
326 case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
327 demux->buffering_low_watermark_fragments = g_value_get_double (value);
330 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
334 GST_OBJECT_UNLOCK (demux);
338 gst_adaptive_demux_get_property (GObject * object, guint prop_id,
339 GValue * value, GParamSpec * pspec)
341 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
343 GST_OBJECT_LOCK (demux);
346 case PROP_CONNECTION_SPEED:
347 g_value_set_uint (value, demux->connection_speed / 1000);
349 case PROP_BANDWIDTH_TARGET_RATIO:
350 g_value_set_float (value, demux->bandwidth_target_ratio);
352 case PROP_MIN_BITRATE:
353 g_value_set_uint (value, demux->min_bitrate);
355 case PROP_MAX_BITRATE:
356 g_value_set_uint (value, demux->max_bitrate);
358 case PROP_CONNECTION_BITRATE:
359 g_value_set_uint (value, demux->connection_speed);
361 case PROP_CURRENT_BANDWIDTH:
362 g_value_set_uint (value, demux->current_download_rate);
364 case PROP_MAX_BUFFERING_TIME:
365 g_value_set_uint64 (value, demux->max_buffering_time);
367 case PROP_BUFFERING_HIGH_WATERMARK_TIME:
368 g_value_set_uint64 (value, demux->buffering_high_watermark_time);
370 case PROP_BUFFERING_LOW_WATERMARK_TIME:
371 g_value_set_uint64 (value, demux->buffering_low_watermark_time);
373 case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
374 g_value_set_double (value, demux->buffering_high_watermark_fragments);
376 case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
377 g_value_set_double (value, demux->buffering_low_watermark_fragments);
379 case PROP_CURRENT_LEVEL_TIME_VIDEO:
380 g_value_set_uint64 (value, demux->current_level_time_video);
382 case PROP_CURRENT_LEVEL_TIME_AUDIO:
383 g_value_set_uint64 (value, demux->current_level_time_audio);
386 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
390 GST_OBJECT_UNLOCK (demux);
394 gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
396 GObjectClass *gobject_class;
397 GstElementClass *gstelement_class;
398 GstBinClass *gstbin_class;
400 gobject_class = G_OBJECT_CLASS (klass);
401 gstelement_class = GST_ELEMENT_CLASS (klass);
402 gstbin_class = GST_BIN_CLASS (klass);
404 GST_DEBUG_CATEGORY_INIT (adaptivedemux2_debug, "adaptivedemux2", 0,
405 "Base Adaptive Demux (ng)");
407 parent_class = g_type_class_peek_parent (klass);
409 if (private_offset != 0)
410 g_type_class_adjust_private_offset (klass, &private_offset);
412 gobject_class->set_property = gst_adaptive_demux_set_property;
413 gobject_class->get_property = gst_adaptive_demux_get_property;
414 gobject_class->finalize = gst_adaptive_demux_finalize;
416 g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
417 g_param_spec_uint ("connection-speed", "Connection Speed",
418 "Network connection speed to use in kbps (0 = calculate from downloaded"
419 " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_BITRATE / 1000,
420 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
422 g_object_class_install_property (gobject_class, PROP_BANDWIDTH_TARGET_RATIO,
423 g_param_spec_float ("bandwidth-target-ratio",
424 "Ratio of target bandwidth / available bandwidth",
425 "Limit of the available bitrate to use when switching to alternates",
426 0, 1, DEFAULT_BANDWIDTH_TARGET_RATIO,
427 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
429 g_object_class_install_property (gobject_class, PROP_CONNECTION_BITRATE,
430 g_param_spec_uint ("connection-bitrate", "Connection Speed (bits/s)",
431 "Network connection speed to use (0 = automatic) (bits/s)",
432 0, G_MAXUINT, DEFAULT_CONNECTION_BITRATE,
433 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
435 g_object_class_install_property (gobject_class, PROP_MIN_BITRATE,
436 g_param_spec_uint ("min-bitrate", "Minimum Bitrate",
437 "Minimum bitrate to use when switching to alternates (bits/s)",
438 0, G_MAXUINT, DEFAULT_MIN_BITRATE,
439 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
441 g_object_class_install_property (gobject_class, PROP_MAX_BITRATE,
442 g_param_spec_uint ("max-bitrate", "Maximum Bitrate",
443 "Maximum bitrate to use when switching to alternates (bits/s)",
444 0, G_MAXUINT, DEFAULT_MAX_BITRATE,
445 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
447 g_object_class_install_property (gobject_class, PROP_CURRENT_BANDWIDTH,
448 g_param_spec_uint ("current-bandwidth",
449 "Current download bandwidth (bits/s)",
450 "Report of current download bandwidth (based on arriving data) (bits/s)",
451 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
453 g_object_class_install_property (gobject_class, PROP_MAX_BUFFERING_TIME,
454 g_param_spec_uint64 ("max-buffering-time",
455 "Buffering maximum size (ns)",
456 "Upper limit on the high watermark for parsed data, above which downloads are paused (in ns, 0=disable)",
457 0, G_MAXUINT64, DEFAULT_MAX_BUFFERING_TIME,
458 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
459 G_PARAM_STATIC_STRINGS));
461 g_object_class_install_property (gobject_class,
462 PROP_BUFFERING_HIGH_WATERMARK_TIME,
463 g_param_spec_uint64 ("high-watermark-time",
464 "High buffering watermark size (ns)",
465 "High watermark for parsed data above which downloads are paused (in ns, 0=disable)",
466 0, G_MAXUINT64, DEFAULT_BUFFERING_HIGH_WATERMARK_TIME,
467 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
468 G_PARAM_STATIC_STRINGS));
470 g_object_class_install_property (gobject_class,
471 PROP_BUFFERING_LOW_WATERMARK_TIME,
472 g_param_spec_uint64 ("low-watermark-time",
473 "Low buffering watermark size (ns)",
474 "Low watermark for parsed data below which downloads are resumed (in ns, 0=automatic)",
475 0, G_MAXUINT64, DEFAULT_BUFFERING_LOW_WATERMARK_TIME,
476 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
477 G_PARAM_STATIC_STRINGS));
479 g_object_class_install_property (gobject_class,
480 PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
481 g_param_spec_double ("high-watermark-fragments",
482 "High buffering watermark size (fragments)",
483 "High watermark for parsed data above which downloads are paused (in fragments, 0=disable)",
484 0, G_MAXFLOAT, DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
485 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
486 G_PARAM_STATIC_STRINGS));
488 g_object_class_install_property (gobject_class,
489 PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
490 g_param_spec_double ("low-watermark-fragments",
491 "Low buffering watermark size (fragments)",
492 "Low watermark for parsed data below which downloads are resumed (in fragments, 0=disable)",
493 0, G_MAXFLOAT, DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS,
494 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
495 G_PARAM_STATIC_STRINGS));
497 g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_VIDEO,
498 g_param_spec_uint64 ("current-level-time-video",
499 "Currently buffered level of video (ns)",
500 "Currently buffered level of video track(s) (ns)",
501 0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_VIDEO,
502 G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
503 G_PARAM_STATIC_STRINGS));
505 g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_AUDIO,
506 g_param_spec_uint64 ("current-level-time-audio",
507 "Currently buffered level of audio (ns)",
508 "Currently buffered level of audio track(s) (ns)",
509 0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_AUDIO,
510 G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
511 G_PARAM_STATIC_STRINGS));
513 gst_element_class_add_static_pad_template (gstelement_class,
514 &gst_adaptive_demux_audiosrc_template);
515 gst_element_class_add_static_pad_template (gstelement_class,
516 &gst_adaptive_demux_videosrc_template);
517 gst_element_class_add_static_pad_template (gstelement_class,
518 &gst_adaptive_demux_subtitlesrc_template);
520 gstelement_class->change_state = gst_adaptive_demux_change_state;
521 gstelement_class->query = gst_adaptive_demux_query;
522 gstelement_class->send_event = gst_adaptive_demux_send_event;
524 gstbin_class->handle_message = gst_adaptive_demux_handle_message;
526 klass->update_manifest = gst_adaptive_demux_update_manifest_default;
527 klass->requires_periodical_playlist_update =
528 gst_adaptive_demux_requires_periodical_playlist_update_default;
529 gst_type_mark_as_plugin_api (GST_TYPE_ADAPTIVE_DEMUX, 0);
533 gst_adaptive_demux_init (GstAdaptiveDemux * demux,
534 GstAdaptiveDemuxClass * klass)
536 GstPadTemplate *pad_template;
538 GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
540 demux->priv = gst_adaptive_demux_get_instance_private (demux);
541 demux->priv->input_adapter = gst_adapter_new ();
542 demux->realtime_clock = gst_adaptive_demux_clock_new ();
544 demux->download_helper = downloadhelper_new (demux->realtime_clock);
545 demux->priv->segment_seqnum = gst_util_seqnum_next ();
546 demux->have_group_id = FALSE;
547 demux->group_id = G_MAXUINT;
549 gst_segment_init (&demux->segment, GST_FORMAT_TIME);
550 demux->instant_rate_multiplier = 1.0;
552 gst_bin_set_suppressed_flags (GST_BIN_CAST (demux),
553 GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
555 g_rec_mutex_init (&demux->priv->manifest_lock);
557 demux->priv->scheduler_task = gst_adaptive_demux_loop_new ();
559 g_mutex_init (&demux->priv->api_lock);
560 g_mutex_init (&demux->priv->segment_lock);
562 g_mutex_init (&demux->priv->tracks_lock);
563 g_cond_init (&demux->priv->tracks_add);
565 g_mutex_init (&demux->priv->buffering_lock);
567 demux->priv->periods = g_queue_new ();
570 gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
571 g_return_if_fail (pad_template != NULL);
573 demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
574 gst_pad_set_event_function (demux->sinkpad,
575 GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
576 gst_pad_set_chain_function (demux->sinkpad,
577 GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));
580 demux->bandwidth_target_ratio = DEFAULT_BANDWIDTH_TARGET_RATIO;
581 demux->connection_speed = DEFAULT_CONNECTION_BITRATE;
582 demux->min_bitrate = DEFAULT_MIN_BITRATE;
583 demux->max_bitrate = DEFAULT_MAX_BITRATE;
585 demux->max_buffering_time = DEFAULT_MAX_BUFFERING_TIME;
586 demux->buffering_high_watermark_time = DEFAULT_BUFFERING_HIGH_WATERMARK_TIME;
587 demux->buffering_low_watermark_time = DEFAULT_BUFFERING_LOW_WATERMARK_TIME;
588 demux->buffering_high_watermark_fragments =
589 DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS;
590 demux->buffering_low_watermark_fragments =
591 DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS;
593 demux->current_level_time_video = DEFAULT_CURRENT_LEVEL_TIME_VIDEO;
594 demux->current_level_time_audio = DEFAULT_CURRENT_LEVEL_TIME_AUDIO;
596 gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
598 demux->priv->duration = GST_CLOCK_TIME_NONE;
600 /* Output combiner */
601 demux->priv->flowcombiner = gst_flow_combiner_new ();
604 g_rec_mutex_init (&demux->priv->output_lock);
605 demux->priv->output_task =
606 gst_task_new ((GstTaskFunction) gst_adaptive_demux_output_loop, demux,
608 gst_task_set_lock (demux->priv->output_task, &demux->priv->output_lock);
612 gst_adaptive_demux_finalize (GObject * object)
614 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
615 GstAdaptiveDemuxPrivate *priv = demux->priv;
617 GST_DEBUG_OBJECT (object, "finalize");
619 g_object_unref (priv->input_adapter);
621 downloadhelper_free (demux->download_helper);
623 g_rec_mutex_clear (&demux->priv->manifest_lock);
624 g_mutex_clear (&demux->priv->api_lock);
625 g_mutex_clear (&demux->priv->segment_lock);
627 g_mutex_clear (&demux->priv->buffering_lock);
629 gst_adaptive_demux_loop_unref (demux->priv->scheduler_task);
631 /* The input period is present after a reset, clear it now */
632 if (demux->input_period)
633 gst_adaptive_demux_period_unref (demux->input_period);
635 if (demux->realtime_clock) {
636 gst_adaptive_demux_clock_unref (demux->realtime_clock);
637 demux->realtime_clock = NULL;
639 g_object_unref (priv->output_task);
640 g_rec_mutex_clear (&priv->output_lock);
642 gst_flow_combiner_free (priv->flowcombiner);
644 g_queue_free (priv->periods);
646 G_OBJECT_CLASS (parent_class)->finalize (object);
650 gst_adaptive_demux_check_streams_aware (GstAdaptiveDemux * demux)
652 gboolean ret = FALSE;
653 GstObject *parent = gst_object_get_parent (GST_OBJECT (demux));
655 ret = (parent && GST_OBJECT_FLAG_IS_SET (parent, GST_BIN_FLAG_STREAMS_AWARE));
660 static GstStateChangeReturn
661 gst_adaptive_demux_change_state (GstElement * element,
662 GstStateChange transition)
664 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
665 GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
667 switch (transition) {
668 case GST_STATE_CHANGE_NULL_TO_READY:
669 if (!gst_adaptive_demux_check_streams_aware (demux)) {
670 GST_ELEMENT_ERROR (demux, CORE, STATE_CHANGE,
671 (_("Element requires a streams-aware context.")), (NULL));
675 case GST_STATE_CHANGE_PAUSED_TO_READY:
676 if (g_atomic_int_compare_and_exchange (&demux->running, TRUE, FALSE))
677 GST_DEBUG_OBJECT (demux, "demuxer has stopped running");
679 gst_adaptive_demux_loop_stop (demux->priv->scheduler_task, TRUE);
680 downloadhelper_stop (demux->download_helper);
683 demux->priv->flushing = TRUE;
684 g_cond_signal (&demux->priv->tracks_add);
685 gst_task_stop (demux->priv->output_task);
686 TRACKS_UNLOCK (demux);
688 gst_task_join (demux->priv->output_task);
690 GST_API_LOCK (demux);
691 gst_adaptive_demux_reset (demux);
692 GST_API_UNLOCK (demux);
694 case GST_STATE_CHANGE_READY_TO_PAUSED:
695 GST_API_LOCK (demux);
696 gst_adaptive_demux_reset (demux);
698 gst_adaptive_demux_loop_start (demux->priv->scheduler_task);
699 if (g_atomic_int_get (&demux->priv->have_manifest))
700 gst_adaptive_demux_start_manifest_update_task (demux);
701 GST_API_UNLOCK (demux);
702 if (g_atomic_int_compare_and_exchange (&demux->running, FALSE, TRUE))
703 GST_DEBUG_OBJECT (demux, "demuxer has started running");
704 /* gst_task_start (demux->priv->output_task); */
710 /* this must be run with the scheduler and output tasks stopped. */
711 result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
713 switch (transition) {
714 case GST_STATE_CHANGE_READY_TO_PAUSED:
715 /* Start download task */
716 downloadhelper_start (demux->download_helper);
727 gst_adaptive_demux_output_slot_free (GstAdaptiveDemux * demux,
730 GstEvent *eos = gst_event_new_eos ();
731 GST_DEBUG_OBJECT (slot->pad, "Releasing slot");
733 /* FIXME: The slot might not have output any data, caps or segment yet */
734 gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
735 gst_pad_push_event (slot->pad, eos);
736 gst_pad_set_active (slot->pad, FALSE);
737 gst_flow_combiner_remove_pad (demux->priv->flowcombiner, slot->pad);
738 gst_element_remove_pad (GST_ELEMENT_CAST (demux), slot->pad);
740 gst_adaptive_demux_track_unref (slot->track);
741 if (slot->pending_track)
742 gst_adaptive_demux_track_unref (slot->pending_track);
744 g_slice_free (OutputSlot, slot);
748 gst_adaptive_demux_output_slot_new (GstAdaptiveDemux * demux,
749 GstStreamType streamtype)
752 GstPadTemplate *tmpl;
755 switch (streamtype) {
756 case GST_STREAM_TYPE_AUDIO:
757 name = g_strdup_printf ("audio_%02u", demux->priv->n_audio_streams++);
759 gst_static_pad_template_get (&gst_adaptive_demux_audiosrc_template);
761 case GST_STREAM_TYPE_VIDEO:
762 name = g_strdup_printf ("video_%02u", demux->priv->n_video_streams++);
764 gst_static_pad_template_get (&gst_adaptive_demux_videosrc_template);
766 case GST_STREAM_TYPE_TEXT:
768 g_strdup_printf ("subtitle_%02u", demux->priv->n_subtitle_streams++);
770 gst_static_pad_template_get
771 (&gst_adaptive_demux_subtitlesrc_template);
774 g_assert_not_reached ();
778 slot = g_slice_new0 (OutputSlot);
779 slot->type = streamtype;
780 slot->pushed_timed_data = FALSE;
782 /* Create and activate new pads */
783 slot->pad = gst_pad_new_from_template (tmpl, name);
785 gst_object_unref (tmpl);
787 gst_element_add_pad (GST_ELEMENT_CAST (demux), slot->pad);
788 gst_flow_combiner_add_pad (demux->priv->flowcombiner, slot->pad);
789 gst_pad_set_active (slot->pad, TRUE);
791 gst_pad_set_query_function (slot->pad,
792 GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
793 gst_pad_set_event_function (slot->pad,
794 GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_event));
796 gst_pad_set_element_private (slot->pad, slot);
798 GST_INFO_OBJECT (demux, "Created output slot %s:%s",
799 GST_DEBUG_PAD_NAME (slot->pad));
804 * * After `process_manifest` or when a period starts
805 * * Or when all tracks have been created
807 * Goes over tracks and creates the collection
809 * Returns TRUE if the collection was fully created.
811 * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
814 gst_adaptive_demux_update_collection (GstAdaptiveDemux * demux,
815 GstAdaptiveDemuxPeriod * period)
817 GstStreamCollection *collection;
820 GST_DEBUG_OBJECT (demux, "tracks_changed : %d", period->tracks_changed);
822 if (!period->tracks_changed) {
823 GST_DEBUG_OBJECT (demux, "Tracks didn't change");
827 if (!period->tracks) {
828 GST_WARNING_OBJECT (demux, "No tracks registered/present");
832 if (gst_adaptive_demux_period_has_pending_tracks (period)) {
833 GST_DEBUG_OBJECT (demux,
834 "Streams still have pending tracks, not creating/updating collection");
838 /* Update collection */
839 collection = gst_stream_collection_new ("adaptivedemux");
841 for (iter = period->tracks; iter; iter = iter->next) {
842 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
844 GST_DEBUG_OBJECT (demux, "Adding '%s' to collection", track->stream_id);
845 gst_stream_collection_add_stream (collection,
846 gst_object_ref (track->stream_object));
849 if (period->collection)
850 gst_object_unref (period->collection);
851 period->collection = collection;
857 * Called for the output period:
858 * * after `update_collection()` if the input period is the same as the output period
859 * * When the output period changes
861 * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
864 gst_adaptive_demux_post_collection (GstAdaptiveDemux * demux)
866 GstStreamCollection *collection;
867 GstAdaptiveDemuxPeriod *period = demux->output_period;
868 guint32 seqnum = g_atomic_int_get (&demux->priv->requested_selection_seqnum);
870 g_return_val_if_fail (period, FALSE);
871 if (!period->collection) {
872 GST_DEBUG_OBJECT (demux, "No collection available yet");
876 collection = period->collection;
878 GST_DEBUG_OBJECT (demux, "Posting collection for period %d",
881 /* Post collection */
882 TRACKS_UNLOCK (demux);
883 GST_MANIFEST_UNLOCK (demux);
885 gst_element_post_message (GST_ELEMENT_CAST (demux),
886 gst_message_new_stream_collection (GST_OBJECT (demux), collection));
888 GST_MANIFEST_LOCK (demux);
891 /* If no stream selection was handled, make a default selection */
892 if (seqnum == g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
893 gst_adaptive_demux_period_select_default_tracks (demux,
894 demux->output_period);
897 /* Make sure the output task is running */
898 if (gst_adaptive_demux2_is_running (demux)) {
899 demux->priv->flushing = FALSE;
900 GST_DEBUG_OBJECT (demux, "Starting the output task");
901 gst_task_start (demux->priv->output_task);
908 handle_incoming_manifest (GstAdaptiveDemux * demux)
910 GstAdaptiveDemuxClass *demux_class;
915 GstBuffer *manifest_buffer;
917 GST_API_LOCK (demux);
918 GST_MANIFEST_LOCK (demux);
920 demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
922 available = gst_adapter_available (demux->priv->input_adapter);
925 goto eos_without_data;
927 GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
929 /* Need to get the URI to use it as a base to generate the fragment's
931 query = gst_query_new_uri ();
932 query_res = gst_pad_peer_query (demux->sinkpad, query);
934 gchar *uri, *redirect_uri;
937 gst_query_parse_uri (query, &uri);
938 gst_query_parse_uri_redirection (query, &redirect_uri);
939 gst_query_parse_uri_redirection_permanent (query, &permanent);
941 if (permanent && redirect_uri) {
942 demux->manifest_uri = redirect_uri;
943 demux->manifest_base_uri = NULL;
946 demux->manifest_uri = uri;
947 demux->manifest_base_uri = redirect_uri;
950 GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
951 demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
953 GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
955 gst_query_unref (query);
957 /* If somehow we didn't receive a stream-start with a group_id, pick one now */
958 if (!demux->have_group_id) {
959 demux->have_group_id = TRUE;
960 demux->group_id = gst_util_group_id_next ();
963 /* Let the subclass parse the manifest */
965 gst_adapter_take_buffer (demux->priv->input_adapter, available);
966 ret = demux_class->process_manifest (demux, manifest_buffer);
967 gst_buffer_unref (manifest_buffer);
969 gst_element_post_message (GST_ELEMENT_CAST (demux),
970 gst_message_new_element (GST_OBJECT_CAST (demux),
971 gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
972 "manifest-uri", G_TYPE_STRING,
973 demux->manifest_uri, "uri", G_TYPE_STRING,
975 "manifest-download-start", GST_TYPE_CLOCK_TIME,
977 "manifest-download-stop", GST_TYPE_CLOCK_TIME,
978 gst_util_get_timestamp (), NULL)));
981 goto invalid_manifest;
983 /* Streams should have been added to the input period if the manifest parsing
985 if (!demux->input_period->streams)
988 g_atomic_int_set (&demux->priv->have_manifest, TRUE);
990 GST_DEBUG_OBJECT (demux, "Manifest was processed, setting ourselves up");
991 /* Send duration message */
992 if (!gst_adaptive_demux_is_live (demux)) {
993 GstClockTime duration = demux_class->get_duration (demux);
995 demux->priv->duration = duration;
996 if (duration != GST_CLOCK_TIME_NONE) {
997 GST_DEBUG_OBJECT (demux,
998 "Sending duration message : %" GST_TIME_FORMAT,
999 GST_TIME_ARGS (duration));
1000 gst_element_post_message (GST_ELEMENT (demux),
1001 gst_message_new_duration_changed (GST_OBJECT (demux)));
1003 GST_DEBUG_OBJECT (demux,
1004 "media duration unknown, can not send the duration message");
1008 TRACKS_LOCK (demux);
1009 /* New streams/tracks will have been added to the input period */
1010 /* The input period has streams, make it the active output period */
1011 /* FIXME : Factorize this into a function to make a period active */
1012 demux->output_period = gst_adaptive_demux_period_ref (demux->input_period);
1013 ret = gst_adaptive_demux_update_collection (demux, demux->output_period) &&
1014 gst_adaptive_demux_post_collection (demux);
1015 TRACKS_UNLOCK (demux);
1017 gst_adaptive_demux_prepare_streams (demux,
1018 gst_adaptive_demux_is_live (demux));
1019 gst_adaptive_demux_start_tasks (demux);
1020 gst_adaptive_demux_start_manifest_update_task (demux);
1023 GST_MANIFEST_UNLOCK (demux);
1024 GST_API_UNLOCK (demux);
1031 GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
1039 GST_WARNING_OBJECT (demux, "No streams created from manifest");
1040 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1041 (_("This file contains no playable streams.")),
1042 ("No known stream formats found at the Manifest"));
1049 GST_MANIFEST_UNLOCK (demux);
1050 GST_API_UNLOCK (demux);
1052 /* In most cases, this will happen if we set a wrong url in the
1053 * source element and we have received the 404 HTML response instead of
1055 GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."), (NULL));
1060 struct http_headers_collector
1062 GstAdaptiveDemux *demux;
1067 gst_adaptive_demux_handle_upstream_http_header (GQuark field_id,
1068 const GValue * value, gpointer userdata)
1070 struct http_headers_collector *hdr_data = userdata;
1071 GstAdaptiveDemux *demux = hdr_data->demux;
1072 const gchar *field_name = g_quark_to_string (field_id);
1074 if (G_UNLIKELY (value == NULL))
1075 return TRUE; /* This should not happen */
1077 if (g_ascii_strcasecmp (field_name, "User-Agent") == 0) {
1078 const gchar *user_agent = g_value_get_string (value);
1080 GST_INFO_OBJECT (demux, "User-Agent : %s", GST_STR_NULL (user_agent));
1081 downloadhelper_set_user_agent (demux->download_helper, user_agent);
1084 if ((g_ascii_strcasecmp (field_name, "Cookie") == 0) ||
1085 g_ascii_strcasecmp (field_name, "Set-Cookie") == 0) {
1086 guint i = 0, prev_len = 0, total_len = 0;
1087 gchar **cookies = NULL;
1089 if (hdr_data->cookies != NULL)
1090 prev_len = g_strv_length (hdr_data->cookies);
1092 if (GST_VALUE_HOLDS_ARRAY (value)) {
1093 total_len = gst_value_array_get_size (value) + prev_len;
1094 cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1096 for (i = 0; i < gst_value_array_get_size (value); i++) {
1097 GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1098 g_value_get_string (gst_value_array_get_value (value, i)));
1099 cookies[i] = g_value_dup_string (gst_value_array_get_value (value, i));
1101 } else if (G_VALUE_HOLDS_STRING (value)) {
1102 total_len = 1 + prev_len;
1103 cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1105 GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1106 g_value_get_string (value));
1107 cookies[0] = g_value_dup_string (value);
1109 GST_WARNING_OBJECT (demux, "%s field is not string or array",
1110 g_quark_to_string (field_id));
1116 for (j = 0; j < prev_len; j++) {
1117 GST_DEBUG_OBJECT (demux,
1118 "Append existing cookie %s", hdr_data->cookies[j]);
1119 cookies[i + j] = g_strdup (hdr_data->cookies[j]);
1122 cookies[total_len] = NULL;
1124 g_strfreev (hdr_data->cookies);
1125 hdr_data->cookies = cookies;
1129 if (g_ascii_strcasecmp (field_name, "Referer") == 0) {
1130 const gchar *referer = g_value_get_string (value);
1131 GST_INFO_OBJECT (demux, "Referer : %s", GST_STR_NULL (referer));
1133 downloadhelper_set_referer (demux->download_helper, referer);
1136 /* Date header can be used to estimate server offset */
1137 if (g_ascii_strcasecmp (field_name, "Date") == 0) {
1138 const gchar *http_date = g_value_get_string (value);
1141 GstDateTime *datetime =
1142 gst_adaptive_demux_util_parse_http_head_date (http_date);
1145 GDateTime *utc_now = gst_date_time_to_g_date_time (datetime);
1146 gchar *date_string = gst_date_time_to_iso8601_string (datetime);
1148 GST_INFO_OBJECT (demux,
1149 "HTTP response Date %s", GST_STR_NULL (date_string));
1150 g_free (date_string);
1152 gst_adaptive_demux_clock_set_utc_time (demux->realtime_clock, utc_now);
1154 g_date_time_unref (utc_now);
1155 gst_date_time_unref (datetime);
1164 gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
1167 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1170 switch (event->type) {
1171 case GST_EVENT_FLUSH_STOP:{
1172 GST_API_LOCK (demux);
1173 GST_MANIFEST_LOCK (demux);
1175 gst_adaptive_demux_reset (demux);
1177 ret = gst_pad_event_default (pad, parent, event);
1179 GST_MANIFEST_UNLOCK (demux);
1180 GST_API_UNLOCK (demux);
1186 if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1187 if (!handle_incoming_manifest (demux)) {
1188 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1189 return gst_pad_event_default (pad, parent, event);
1191 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1193 GST_ERROR_OBJECT (demux,
1194 "Failed to acquire scheduler to handle manifest");
1195 return gst_pad_event_default (pad, parent, event);
1197 gst_event_unref (event);
1200 case GST_EVENT_STREAM_START:
1201 if (gst_event_parse_group_id (event, &demux->group_id))
1202 demux->have_group_id = TRUE;
1204 demux->have_group_id = FALSE;
1205 /* Swallow stream-start, we'll push our own */
1206 gst_event_unref (event);
1208 case GST_EVENT_SEGMENT:
1209 /* Swallow newsegments, we'll push our own */
1210 gst_event_unref (event);
1212 case GST_EVENT_CUSTOM_DOWNSTREAM_STICKY:{
1213 const GstStructure *structure = gst_event_get_structure (event);
1214 struct http_headers_collector c = { demux, NULL };
1216 if (gst_structure_has_name (structure, "http-headers")) {
1217 if (gst_structure_has_field (structure, "request-headers")) {
1218 GstStructure *req_headers = NULL;
1219 gst_structure_get (structure, "request-headers", GST_TYPE_STRUCTURE,
1220 &req_headers, NULL);
1222 gst_structure_foreach (req_headers,
1223 gst_adaptive_demux_handle_upstream_http_header, &c);
1224 gst_structure_free (req_headers);
1227 if (gst_structure_has_field (structure, "response-headers")) {
1228 GstStructure *res_headers = NULL;
1229 gst_structure_get (structure, "response-headers", GST_TYPE_STRUCTURE,
1230 &res_headers, NULL);
1232 gst_structure_foreach (res_headers,
1233 gst_adaptive_demux_handle_upstream_http_header, &c);
1234 gst_structure_free (res_headers);
1239 downloadhelper_set_cookies (demux->download_helper, c.cookies);
1247 return gst_pad_event_default (pad, parent, event);
1250 static GstFlowReturn
1251 gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
1254 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1256 GST_MANIFEST_LOCK (demux);
1258 gst_adapter_push (demux->priv->input_adapter, buffer);
1260 GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
1261 (gint) gst_adapter_available (demux->priv->input_adapter));
1263 GST_MANIFEST_UNLOCK (demux);
1268 /* Called with TRACKS_LOCK taken */
1270 gst_adaptive_demux_period_reset_tracks (GstAdaptiveDemuxPeriod * period)
1274 for (tmp = period->tracks; tmp; tmp = tmp->next) {
1275 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1277 gst_adaptive_demux_track_flush (track);
1278 if (gst_pad_is_active (track->sinkpad)) {
1279 gst_pad_set_active (track->sinkpad, FALSE);
1280 gst_pad_set_active (track->sinkpad, TRUE);
1285 /* Resets all tracks to their initial state, ready to receive new data. */
1287 gst_adaptive_demux_reset_tracks (GstAdaptiveDemux * demux)
1289 TRACKS_LOCK (demux);
1290 g_queue_foreach (demux->priv->periods,
1291 (GFunc) gst_adaptive_demux_period_reset_tracks, NULL);
1292 TRACKS_UNLOCK (demux);
1295 /* Subclasses will call this function to ensure that a new input period is
1296 * available to receive new streams and tracks */
1298 gst_adaptive_demux_start_new_period (GstAdaptiveDemux * demux)
1300 if (demux->input_period && !demux->input_period->prepared) {
1301 GST_DEBUG_OBJECT (demux, "Using existing input period");
1305 if (demux->input_period) {
1306 GST_DEBUG_OBJECT (demux, "Marking that previous period has a next one");
1307 demux->input_period->has_next_period = TRUE;
1309 GST_DEBUG_OBJECT (demux, "Setting up new period");
1311 demux->input_period = gst_adaptive_demux_period_new (demux);
1316 /* must be called with manifest_lock taken */
1318 gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
1320 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1323 gst_adaptive_demux_stop_tasks (demux, TRUE);
1326 klass->reset (demux);
1328 /* Disable and remove all outputs */
1329 GST_DEBUG_OBJECT (demux, "Disabling and removing all outputs");
1330 for (iter = demux->priv->outputs; iter; iter = iter->next) {
1331 gst_adaptive_demux_output_slot_free (demux, (OutputSlot *) iter->data);
1333 g_list_free (demux->priv->outputs);
1334 demux->priv->outputs = NULL;
1336 g_queue_clear_full (demux->priv->periods,
1337 (GDestroyNotify) gst_adaptive_demux_period_unref);
1339 /* The output period always has an extra ref taken on it */
1340 if (demux->output_period)
1341 gst_adaptive_demux_period_unref (demux->output_period);
1342 demux->output_period = NULL;
1343 /* The input period doesn't have an extra ref taken on it */
1344 demux->input_period = NULL;
1346 gst_adaptive_demux_start_new_period (demux);
1348 g_free (demux->manifest_uri);
1349 g_free (demux->manifest_base_uri);
1350 demux->manifest_uri = NULL;
1351 demux->manifest_base_uri = NULL;
1353 gst_adapter_clear (demux->priv->input_adapter);
1354 g_atomic_int_set (&demux->priv->have_manifest, FALSE);
1356 gst_segment_init (&demux->segment, GST_FORMAT_TIME);
1357 demux->instant_rate_multiplier = 1.0;
1359 demux->priv->duration = GST_CLOCK_TIME_NONE;
1361 demux->priv->percent = -1;
1362 demux->priv->is_buffering = TRUE;
1364 demux->have_group_id = FALSE;
1365 demux->group_id = G_MAXUINT;
1366 demux->priv->segment_seqnum = gst_util_seqnum_next ();
1368 demux->priv->global_output_position = 0;
1370 demux->priv->n_audio_streams = 0;
1371 demux->priv->n_video_streams = 0;
1372 demux->priv->n_subtitle_streams = 0;
1374 gst_flow_combiner_reset (demux->priv->flowcombiner);
1378 gst_adaptive_demux_query (GstElement * element, GstQuery * query)
1380 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
1382 GST_LOG_OBJECT (demux, "%" GST_PTR_FORMAT, query);
1384 switch (GST_QUERY_TYPE (query)) {
1385 case GST_QUERY_BUFFERING:
1388 gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1390 if (!demux->output_period) {
1391 if (format != GST_FORMAT_TIME) {
1392 GST_DEBUG_OBJECT (demux,
1393 "No period setup yet, can't answer non-TIME buffering queries");
1397 GST_DEBUG_OBJECT (demux,
1398 "No period setup yet, but still answering buffering query");
1406 return GST_ELEMENT_CLASS (parent_class)->query (element, query);
1410 gst_adaptive_demux_send_event (GstElement * element, GstEvent * event)
1412 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
1413 gboolean res = FALSE;
1415 GST_DEBUG_OBJECT (demux, "Received event %" GST_PTR_FORMAT, event);
1417 switch (GST_EVENT_TYPE (event)) {
1418 case GST_EVENT_SEEK:
1420 res = gst_adaptive_demux_handle_seek_event (demux, event);
1423 case GST_EVENT_SELECT_STREAMS:
1425 res = gst_adaptive_demux_handle_select_streams_event (demux, event);
1429 res = GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
1435 /* MANIFEST_LOCK held. Find the stream that owns the given element */
1436 static GstAdaptiveDemux2Stream *
1437 find_stream_for_element_locked (GstAdaptiveDemux * demux, GstObject * o)
1441 /* We only look in the streams of the input period (i.e. with active streams) */
1442 for (iter = demux->input_period->streams; iter; iter = iter->next) {
1443 GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1444 if (gst_object_has_as_ancestor (o, GST_OBJECT_CAST (stream->parsebin))) {
1453 gst_adaptive_demux_handle_stream_collection_msg (GstAdaptiveDemux * demux,
1456 GstAdaptiveDemux2Stream *stream;
1457 GstStreamCollection *collection = NULL;
1458 gboolean pending_tracks_activated = FALSE;
1460 GST_MANIFEST_LOCK (demux);
1462 stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1463 if (stream == NULL) {
1464 GST_WARNING_OBJECT (demux,
1465 "Failed to locate stream for collection message");
1469 gst_message_parse_stream_collection (msg, &collection);
1473 TRACKS_LOCK (demux);
1475 if (!gst_adaptive_demux2_stream_handle_collection (stream, collection,
1476 &pending_tracks_activated)) {
1477 TRACKS_UNLOCK (demux);
1479 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1480 (_("Stream format can't be handled")),
1481 ("The streams provided by the multiplex are ambiguous"));
1485 if (pending_tracks_activated) {
1486 /* If pending tracks were handled, then update the demuxer collection */
1487 if (gst_adaptive_demux_update_collection (demux, demux->input_period) &&
1488 demux->input_period == demux->output_period) {
1489 gst_adaptive_demux_post_collection (demux);
1492 /* If we discovered pending tracks and we no longer have any, we can ensure
1493 * selected tracks are started */
1494 if (!gst_adaptive_demux_period_has_pending_tracks (demux->input_period)) {
1495 GList *iter = demux->input_period->streams;
1496 for (; iter; iter = iter->next) {
1497 GstAdaptiveDemux2Stream *new_stream = iter->data;
1499 /* The stream that posted this collection was already started. If a
1500 * different stream is now selected, start it */
1501 if (stream != new_stream
1502 && gst_adaptive_demux2_stream_is_selected_locked (new_stream))
1503 gst_adaptive_demux2_stream_start (new_stream);
1507 TRACKS_UNLOCK (demux);
1510 GST_MANIFEST_UNLOCK (demux);
1513 gst_object_unref (collection);
1514 gst_message_unref (msg);
1519 gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
1521 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);
1523 switch (GST_MESSAGE_TYPE (msg)) {
1524 case GST_MESSAGE_STREAM_COLLECTION:
1526 gst_adaptive_demux_handle_stream_collection_msg (demux, msg);
1529 case GST_MESSAGE_ERROR:{
1530 GstAdaptiveDemux2Stream *stream = NULL;
1532 gchar *debug = NULL;
1533 gchar *new_error = NULL;
1534 const GstStructure *details = NULL;
1536 GST_MANIFEST_LOCK (demux);
1538 stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1539 if (stream == NULL) {
1540 GST_WARNING_OBJECT (demux,
1541 "Failed to locate stream for errored element");
1542 GST_MANIFEST_UNLOCK (demux);
1546 gst_message_parse_error (msg, &err, &debug);
1548 GST_WARNING_OBJECT (demux,
1549 "Source posted error: %d:%d %s (%s)", err->domain, err->code,
1550 err->message, debug);
1553 new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
1555 g_free (err->message);
1556 err->message = new_error;
1559 gst_message_parse_error_details (msg, &details);
1561 gst_structure_get_uint (details, "http-status-code",
1562 &stream->last_status_code);
1565 /* error, but ask to retry */
1566 if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1567 gst_adaptive_demux2_stream_parse_error (stream, err);
1568 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1574 GST_MANIFEST_UNLOCK (demux);
1576 gst_message_unref (msg);
1585 GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
1588 /* must be called with manifest_lock taken */
1590 gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
1592 GstAdaptiveDemuxClass *klass;
1594 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1596 if (klass->get_period_start_time == NULL)
1599 return klass->get_period_start_time (demux);
1602 /* must be called with manifest_lock taken */
1604 gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
1605 gboolean first_and_live)
1608 GstClockTime period_start;
1609 GstClockTimeDiff min_stream_time = GST_CLOCK_STIME_NONE;
1612 g_return_val_if_fail (demux->input_period->streams, FALSE);
1613 g_assert (demux->input_period->prepared == FALSE);
1615 new_streams = demux->input_period->streams;
1617 if (!gst_adaptive_demux2_is_running (demux)) {
1618 GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
1622 GST_DEBUG_OBJECT (demux,
1623 "Preparing %d streams for period %d , first_and_live:%d",
1624 g_list_length (new_streams), demux->input_period->period_num,
1627 for (iter = new_streams; iter; iter = g_list_next (iter)) {
1628 GstAdaptiveDemux2Stream *stream = iter->data;
1630 GST_DEBUG_OBJECT (stream, "Preparing stream");
1632 stream->need_header = TRUE;
1633 stream->discont = TRUE;
1635 /* Grab the first stream time for live streams
1636 * * If the stream is selected
1637 * * Or it provides dynamic tracks (in which case we need to force an update)
1640 && (gst_adaptive_demux2_stream_is_selected_locked (stream)
1641 || stream->pending_tracks)) {
1642 /* TODO we only need the first timestamp, maybe create a simple function to
1643 * get the current PTS of a fragment ? */
1644 GST_DEBUG_OBJECT (stream, "Calling update_fragment_info");
1645 gst_adaptive_demux2_stream_update_fragment_info (stream);
1647 GST_DEBUG_OBJECT (stream,
1648 "Got stream time %" GST_STIME_FORMAT,
1649 GST_STIME_ARGS (stream->fragment.stream_time));
1651 if (GST_CLOCK_STIME_IS_VALID (min_stream_time)) {
1652 min_stream_time = MIN (min_stream_time, stream->fragment.stream_time);
1654 min_stream_time = stream->fragment.stream_time;
1659 period_start = gst_adaptive_demux_get_period_start_time (demux);
1661 /* For live streams, the subclass is supposed to seek to the current fragment
1662 * and then tell us its stream time in stream->fragment.stream_time. We now
1663 * also have to seek our demuxer segment to reflect this.
1665 * FIXME: This needs some refactoring at some point.
1667 if (first_and_live) {
1668 gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
1669 GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_stream_time + period_start,
1670 GST_SEEK_TYPE_NONE, -1, NULL);
1673 GST_DEBUG_OBJECT (demux,
1674 "period_start:%" GST_TIME_FORMAT ", min_stream_time:%" GST_STIME_FORMAT
1675 " demux segment %" GST_SEGMENT_FORMAT,
1676 GST_TIME_ARGS (period_start), GST_STIME_ARGS (min_stream_time),
1679 /* Synchronize stream start/current positions */
1680 if (min_stream_time == GST_CLOCK_STIME_NONE)
1681 min_stream_time = period_start;
1683 min_stream_time += period_start;
1684 for (iter = new_streams; iter; iter = g_list_next (iter)) {
1685 GstAdaptiveDemux2Stream *stream = iter->data;
1686 stream->start_position = stream->current_position = min_stream_time;
1689 for (iter = new_streams; iter; iter = g_list_next (iter)) {
1690 GstAdaptiveDemux2Stream *stream = iter->data;
1691 stream->compute_segment = TRUE;
1692 stream->first_and_live = first_and_live;
1694 demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
1695 demux->input_period->prepared = TRUE;
1700 static GstAdaptiveDemuxTrack *
1701 find_track_for_stream_id (GstAdaptiveDemuxPeriod * period, gchar * stream_id)
1705 for (tmp = period->tracks; tmp; tmp = tmp->next) {
1706 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1707 if (!g_strcmp0 (track->stream_id, stream_id))
1714 /* TRACKS_LOCK hold */
1716 demux_update_buffering_locked (GstAdaptiveDemux * demux)
1718 GstClockTime min_level_time = GST_CLOCK_TIME_NONE;
1719 GstClockTime video_level_time = GST_CLOCK_TIME_NONE;
1720 GstClockTime audio_level_time = GST_CLOCK_TIME_NONE;
1722 gint min_percent = -1, percent;
1723 gboolean all_eos = TRUE;
1725 /* Go over all active tracks of the output period and update level */
1727 /* Check that all tracks are above their respective low thresholds (different
1728 * tracks may have different fragment durations yielding different buffering
1729 * percentages) Overall buffering percent is the lowest. */
1730 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
1731 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1733 GST_LOG_OBJECT (demux,
1734 "Checking track '%s' (period %u) active:%d selected:%d eos:%d level:%"
1735 GST_TIME_FORMAT " buffering_threshold:%" GST_TIME_FORMAT,
1736 track->stream_id, track->period_num, track->active, track->selected,
1737 track->eos, GST_TIME_ARGS (track->level_time),
1738 GST_TIME_ARGS (track->buffering_threshold));
1740 if (track->active && track->selected) {
1745 if (min_level_time == GST_CLOCK_TIME_NONE) {
1746 min_level_time = track->level_time;
1747 } else if (track->level_time < min_level_time) {
1748 min_level_time = track->level_time;
1751 if (track->type & GST_STREAM_TYPE_VIDEO
1752 && video_level_time > track->level_time)
1753 video_level_time = track->level_time;
1755 if (track->type & GST_STREAM_TYPE_AUDIO
1756 && audio_level_time > track->level_time)
1757 audio_level_time = track->level_time;
1759 if (track->level_time != GST_CLOCK_TIME_NONE
1760 && track->buffering_threshold != 0) {
1762 gst_util_uint64_scale (track->level_time, 100,
1763 track->buffering_threshold);
1764 if (min_percent < 0 || cur_percent < min_percent)
1765 min_percent = cur_percent;
1771 GST_DEBUG_OBJECT (demux,
1772 "Minimum time level %" GST_TIME_FORMAT " percent %d all_eos:%d",
1773 GST_TIME_ARGS (min_level_time), min_percent, all_eos);
1775 /* Update demuxer video/audio level properties */
1776 GST_OBJECT_LOCK (demux);
1777 demux->current_level_time_video = video_level_time;
1778 demux->current_level_time_audio = audio_level_time;
1779 GST_OBJECT_UNLOCK (demux);
1781 if (min_percent < 0 && !all_eos)
1784 if (min_percent > 100 || all_eos)
1787 percent = MAX (0, min_percent);
1789 GST_LOG_OBJECT (demux, "percent : %d %%", percent);
1791 if (demux->priv->is_buffering) {
1793 demux->priv->is_buffering = FALSE;
1794 if (demux->priv->percent != percent) {
1795 demux->priv->percent = percent;
1796 demux->priv->percent_changed = TRUE;
1798 } else if (percent < 1) {
1799 demux->priv->is_buffering = TRUE;
1800 if (demux->priv->percent != percent) {
1801 demux->priv->percent = percent;
1802 demux->priv->percent_changed = TRUE;
1806 if (demux->priv->percent_changed)
1807 GST_DEBUG_OBJECT (demux, "Percent changed, %d %% is_buffering:%d", percent,
1808 demux->priv->is_buffering);
1811 /* With TRACKS_LOCK held */
1813 demux_post_buffering_locked (GstAdaptiveDemux * demux)
1818 if (!demux->priv->percent_changed)
1821 BUFFERING_LOCK (demux);
1822 percent = demux->priv->percent;
1823 msg = gst_message_new_buffering ((GstObject *) demux, percent);
1824 TRACKS_UNLOCK (demux);
1825 gst_element_post_message ((GstElement *) demux, msg);
1827 BUFFERING_UNLOCK (demux);
1828 TRACKS_LOCK (demux);
1829 if (percent == demux->priv->percent)
1830 demux->priv->percent_changed = FALSE;
1833 /* MANIFEST_LOCK and TRACKS_LOCK hold */
1834 GstAdaptiveDemux2Stream *
1835 find_stream_for_track_locked (GstAdaptiveDemux * demux,
1836 GstAdaptiveDemuxTrack * track)
1840 for (iter = demux->output_period->streams; iter; iter = iter->next) {
1841 GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1842 if (g_list_find (stream->tracks, track))
1849 /* Called from seek handler
1851 * This function is used when a (flushing) seek caused a new period to be activated.
1853 * This will ensure that:
1854 * * the current output period is marked as finished (EOS)
1855 * * Any potential intermediate (non-input/non-output) periods are removed
1856 * * That the new input period is prepared and ready
1859 gst_adaptive_demux_seek_to_input_period (GstAdaptiveDemux * demux)
1863 GST_DEBUG_OBJECT (demux,
1864 "Preparing new input period %u", demux->input_period->period_num);
1866 /* Prepare the new input period */
1867 gst_adaptive_demux_update_collection (demux, demux->input_period);
1869 /* Transfer the previous selection to the new input period */
1870 gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
1871 demux->output_period);
1872 gst_adaptive_demux_prepare_streams (demux, FALSE);
1874 /* Remove all periods except for the input (last) and output (first) period */
1875 while (demux->priv->periods->length > 2) {
1876 GstAdaptiveDemuxPeriod *period = g_queue_pop_nth (demux->priv->periods, 1);
1877 /* Mark all tracks of the removed period as not selected and EOS so they
1878 * will be skipped / ignored */
1879 for (iter = period->tracks; iter; iter = iter->next) {
1880 GstAdaptiveDemuxTrack *track = iter->data;
1881 track->selected = FALSE;
1884 gst_adaptive_demux_period_unref (period);
1887 /* Mark all tracks of the output period as EOS so that the output loop
1888 * will immediately move to the new period */
1889 for (iter = demux->output_period->tracks; iter; iter = iter->next) {
1890 GstAdaptiveDemuxTrack *track = iter->data;
1894 /* Go over all slots, and clear any pending track */
1895 for (iter = demux->priv->outputs; iter; iter = iter->next) {
1896 OutputSlot *slot = (OutputSlot *) iter->data;
1898 if (slot->pending_track != NULL) {
1899 GST_DEBUG_OBJECT (demux,
1900 "Removing track '%s' as pending from output of current track '%s'",
1901 slot->pending_track->stream_id, slot->track->stream_id);
1902 gst_adaptive_demux_track_unref (slot->pending_track);
1903 slot->pending_track = NULL;
1908 /* must be called with manifest_lock taken */
1910 gst_adaptive_demux_get_live_seek_range (GstAdaptiveDemux * demux,
1911 gint64 * range_start, gint64 * range_stop)
1913 GstAdaptiveDemuxClass *klass;
1915 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1917 g_return_val_if_fail (klass->get_live_seek_range, FALSE);
1919 return klass->get_live_seek_range (demux, range_start, range_stop);
1922 /* must be called with manifest_lock taken */
1924 gst_adaptive_demux2_stream_in_live_seek_range (GstAdaptiveDemux * demux,
1925 GstAdaptiveDemux2Stream * stream)
1927 gint64 range_start, range_stop;
1928 if (gst_adaptive_demux_get_live_seek_range (demux, &range_start, &range_stop)) {
1929 GST_LOG_OBJECT (stream,
1930 "stream position %" GST_TIME_FORMAT " live seek range %"
1931 GST_STIME_FORMAT " - %" GST_STIME_FORMAT,
1932 GST_TIME_ARGS (stream->current_position), GST_STIME_ARGS (range_start),
1933 GST_STIME_ARGS (range_stop));
1934 return (stream->current_position >= range_start
1935 && stream->current_position <= range_stop);
1941 /* must be called with manifest_lock taken */
1943 gst_adaptive_demux_can_seek (GstAdaptiveDemux * demux)
1945 GstAdaptiveDemuxClass *klass;
1947 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1948 if (gst_adaptive_demux_is_live (demux)) {
1949 return klass->get_live_seek_range != NULL;
1952 return klass->seek != NULL;
1956 gst_adaptive_demux_setup_streams_for_restart (GstAdaptiveDemux * demux,
1957 GstSeekType start_type, GstSeekType stop_type)
1961 for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
1962 GstAdaptiveDemux2Stream *stream = iter->data;
1964 /* Make sure the download loop clears and restarts on the next start,
1965 * which will recompute the stream segment */
1966 g_assert (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED ||
1967 stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART);
1968 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
1969 stream->start_position = 0;
1971 if (demux->segment.rate > 0 && start_type != GST_SEEK_TYPE_NONE)
1972 stream->start_position = demux->segment.start;
1973 else if (demux->segment.rate < 0 && stop_type != GST_SEEK_TYPE_NONE)
1974 stream->start_position = demux->segment.stop;
1978 #define IS_SNAP_SEEK(f) (f & (GST_SEEK_FLAG_SNAP_BEFORE | \
1979 GST_SEEK_FLAG_SNAP_AFTER | \
1980 GST_SEEK_FLAG_SNAP_NEAREST | \
1981 GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | \
1982 GST_SEEK_FLAG_KEY_UNIT))
1983 #define REMOVE_SNAP_FLAGS(f) (f & ~(GST_SEEK_FLAG_SNAP_BEFORE | \
1984 GST_SEEK_FLAG_SNAP_AFTER | \
1985 GST_SEEK_FLAG_SNAP_NEAREST))
1988 gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux,
1991 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1995 GstSeekType start_type, stop_type;
2000 GstSegment oldsegment;
2001 GstEvent *flush_event;
2003 GST_INFO_OBJECT (demux, "Received seek event");
2005 GST_API_LOCK (demux);
2007 gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
2010 if (format != GST_FORMAT_TIME) {
2011 GST_API_UNLOCK (demux);
2012 GST_WARNING_OBJECT (demux,
2013 "Adaptive demuxers only support TIME-based seeking");
2014 gst_event_unref (event);
2018 if (flags & GST_SEEK_FLAG_SEGMENT) {
2019 GST_FIXME_OBJECT (demux, "Handle segment seeks");
2020 GST_API_UNLOCK (demux);
2021 gst_event_unref (event);
2025 seqnum = gst_event_get_seqnum (event);
2027 if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
2028 GST_LOG_OBJECT (demux, "Failed to acquire scheduler context");
2032 if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE) {
2033 /* For instant rate seeks, reply directly and update
2034 * our segment so the new rate is reflected in any future
2037 gdouble rate_multiplier;
2039 /* instant rate change only supported if direction does not change. All
2040 * other requirements are already checked before creating the seek event
2041 * but let's double-check here to be sure */
2042 if ((demux->segment.rate > 0 && rate < 0) ||
2043 (demux->segment.rate < 0 && rate > 0) ||
2044 start_type != GST_SEEK_TYPE_NONE ||
2045 stop_type != GST_SEEK_TYPE_NONE || (flags & GST_SEEK_FLAG_FLUSH)) {
2046 GST_ERROR_OBJECT (demux,
2047 "Instant rate change seeks only supported in the "
2048 "same direction, without flushing and position change");
2049 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2050 GST_API_UNLOCK (demux);
2054 rate_multiplier = rate / demux->segment.rate;
2056 ev = gst_event_new_instant_rate_change (rate_multiplier,
2057 (GstSegmentFlags) flags);
2058 gst_event_set_seqnum (ev, seqnum);
2060 ret = gst_adaptive_demux_push_src_event (demux, ev);
2063 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2064 demux->instant_rate_multiplier = rate_multiplier;
2065 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2068 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2069 GST_API_UNLOCK (demux);
2070 gst_event_unref (event);
2075 if (!gst_adaptive_demux_can_seek (demux)) {
2076 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2078 GST_API_UNLOCK (demux);
2079 gst_event_unref (event);
2083 /* We can only accept flushing seeks from this point onward */
2084 if (!(flags & GST_SEEK_FLAG_FLUSH)) {
2085 GST_ERROR_OBJECT (demux,
2086 "Non-flushing non-instant-rate seeks are not possible");
2088 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2090 GST_API_UNLOCK (demux);
2091 gst_event_unref (event);
2095 if (gst_adaptive_demux_is_live (demux)) {
2096 gint64 range_start, range_stop;
2097 gboolean changed = FALSE;
2098 gboolean start_valid = TRUE, stop_valid = TRUE;
2100 if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
2102 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2103 GST_API_UNLOCK (demux);
2104 gst_event_unref (event);
2105 GST_WARNING_OBJECT (demux, "Failure getting the live seek ranges");
2109 GST_DEBUG_OBJECT (demux,
2110 "Live range is %" GST_STIME_FORMAT " %" GST_STIME_FORMAT,
2111 GST_STIME_ARGS (range_start), GST_STIME_ARGS (range_stop));
2113 /* Handle relative positioning for live streams (relative to the range_stop) */
2114 if (start_type == GST_SEEK_TYPE_END) {
2115 start = range_stop + start;
2116 start_type = GST_SEEK_TYPE_SET;
2119 if (stop_type == GST_SEEK_TYPE_END) {
2120 stop = range_stop + stop;
2121 stop_type = GST_SEEK_TYPE_SET;
2125 /* Adjust the requested start/stop position if it falls beyond the live
2127 * The only case where we don't adjust is for the starting point of
2128 * an accurate seek (start if forward and stop if backwards)
2130 if (start_type == GST_SEEK_TYPE_SET && start < range_start &&
2131 (rate < 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2132 GST_DEBUG_OBJECT (demux,
2133 "seek before live stream start, setting to range start: %"
2134 GST_TIME_FORMAT, GST_TIME_ARGS (range_start));
2135 start = range_start;
2138 /* truncate stop position also if set */
2139 if (stop_type == GST_SEEK_TYPE_SET && stop > range_stop &&
2140 (rate > 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2141 GST_DEBUG_OBJECT (demux,
2142 "seek ending after live start, adjusting to: %"
2143 GST_TIME_FORMAT, GST_TIME_ARGS (range_stop));
2148 if (start_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (start) &&
2149 (start < range_start || start > range_stop)) {
2150 GST_WARNING_OBJECT (demux,
2151 "Seek to invalid position start:%" GST_STIME_FORMAT
2152 " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2153 ")", GST_STIME_ARGS (start), GST_STIME_ARGS (range_start),
2154 GST_STIME_ARGS (range_stop));
2155 start_valid = FALSE;
2157 if (stop_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (stop) &&
2158 (stop < range_start || stop > range_stop)) {
2159 GST_WARNING_OBJECT (demux,
2160 "Seek to invalid position stop:%" GST_STIME_FORMAT
2161 " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2162 ")", GST_STIME_ARGS (stop), GST_STIME_ARGS (range_start),
2163 GST_STIME_ARGS (range_stop));
2167 /* If the seek position is still outside of the seekable range, refuse the seek */
2168 if (!start_valid || !stop_valid) {
2169 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2170 GST_API_UNLOCK (demux);
2171 gst_event_unref (event);
2175 /* Re-create seek event with changed/updated values */
2177 gst_event_unref (event);
2179 gst_event_new_seek (rate, format, flags,
2180 start_type, start, stop_type, stop);
2181 gst_event_set_seqnum (event, seqnum);
2185 GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
2187 /* have a backup in case seek fails */
2188 gst_segment_copy_into (&demux->segment, &oldsegment);
2190 GST_DEBUG_OBJECT (demux, "sending flush start");
2191 flush_event = gst_event_new_flush_start ();
2192 gst_event_set_seqnum (flush_event, seqnum);
2194 gst_adaptive_demux_push_src_event (demux, flush_event);
2196 gst_adaptive_demux_stop_tasks (demux, FALSE);
2197 gst_adaptive_demux_reset_tracks (demux);
2199 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2202 * Handle snap seeks as follows:
2203 * 1) do the snap seeking a (random) active stream
2204 * 2) use the final position on this stream to seek
2205 * on the other streams to the same position
2207 * We can't snap at all streams at the same time as they might end in
2208 * different positions, so just pick one and align all others to that
2212 GstAdaptiveDemux2Stream *stream = NULL;
2214 /* Pick a random active stream on which to do the stream seek */
2215 for (iter = demux->output_period->streams; iter; iter = iter->next) {
2216 GstAdaptiveDemux2Stream *cand = iter->data;
2217 if (gst_adaptive_demux2_stream_is_selected_locked (cand)) {
2223 if (stream && IS_SNAP_SEEK (flags)) {
2224 GstClockTimeDiff ts;
2225 GstSeekFlags stream_seek_flags = flags;
2227 /* snap-seek on the chosen stream and then
2228 * use the resulting position to seek on all streams */
2230 if (start_type != GST_SEEK_TYPE_NONE)
2233 ts = gst_segment_position_from_running_time (&demux->segment,
2234 GST_FORMAT_TIME, demux->priv->global_output_position);
2235 start_type = GST_SEEK_TYPE_SET;
2238 if (stop_type != GST_SEEK_TYPE_NONE)
2241 stop_type = GST_SEEK_TYPE_SET;
2242 ts = gst_segment_position_from_running_time (&demux->segment,
2243 GST_FORMAT_TIME, demux->priv->global_output_position);
2247 if (gst_adaptive_demux2_stream_seek (stream, rate >= 0, stream_seek_flags,
2248 ts, &ts) != GST_FLOW_OK) {
2249 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2251 GST_API_UNLOCK (demux);
2252 gst_event_unref (event);
2256 /* replace event with a new one without snapping to seek on all streams */
2257 gst_event_unref (event);
2264 gst_event_new_seek (rate, format, REMOVE_SNAP_FLAGS (flags),
2265 start_type, start, stop_type, stop);
2266 GST_DEBUG_OBJECT (demux, "Adapted snap seek to %" GST_PTR_FORMAT, event);
2269 ret = gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
2270 start, stop_type, stop, &update);
2273 GST_DEBUG_OBJECT (demux, "Calling subclass seek: %" GST_PTR_FORMAT, event);
2275 ret = demux_class->seek (demux, event);
2279 /* Is there anything else we can do if it fails? */
2280 gst_segment_copy_into (&oldsegment, &demux->segment);
2282 demux->priv->segment_seqnum = seqnum;
2284 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2286 /* Resetting flow combiner */
2287 gst_flow_combiner_reset (demux->priv->flowcombiner);
2289 GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
2290 flush_event = gst_event_new_flush_stop (TRUE);
2291 gst_event_set_seqnum (flush_event, seqnum);
2292 gst_adaptive_demux_push_src_event (demux, flush_event);
2294 /* If the seek generated a new period, prepare it */
2295 if (!demux->input_period->prepared) {
2296 /* This can only happen on flushing seeks */
2297 g_assert (flags & GST_SEEK_FLAG_FLUSH);
2298 gst_adaptive_demux_seek_to_input_period (demux);
2301 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2302 GST_DEBUG_OBJECT (demux, "Demuxer segment after seek: %" GST_SEGMENT_FORMAT,
2304 gst_adaptive_demux_setup_streams_for_restart (demux, start_type, stop_type);
2305 demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2307 /* Reset the global output position (running time) for when the output loop restarts */
2308 demux->priv->global_output_position = 0;
2310 /* After a flushing seek, any instant-rate override is undone */
2311 demux->instant_rate_multiplier = 1.0;
2313 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2315 /* Restart the demux */
2316 gst_adaptive_demux_start_tasks (demux);
2318 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2319 GST_API_UNLOCK (demux);
2320 gst_event_unref (event);
2325 /* Returns TRUE if the stream has at least one selected track */
2327 gst_adaptive_demux2_stream_has_selected_tracks (GstAdaptiveDemux2Stream *
2332 for (tmp = stream->tracks; tmp; tmp = tmp->next) {
2333 GstAdaptiveDemuxTrack *track = tmp->data;
2335 if (track->selected)
2343 handle_stream_selection (GstAdaptiveDemux * demux, GList * streams,
2346 gboolean selection_handled = TRUE;
2348 GList *tracks = NULL;
2350 if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux))
2353 TRACKS_LOCK (demux);
2354 /* Validate the streams and fill:
2355 * tracks : list of tracks corresponding to requested streams
2357 for (iter = streams; iter; iter = iter->next) {
2358 gchar *stream_id = (gchar *) iter->data;
2359 GstAdaptiveDemuxTrack *track;
2361 GST_DEBUG_OBJECT (demux, "Stream requested : %s", stream_id);
2362 track = find_track_for_stream_id (demux->output_period, stream_id);
2364 GST_WARNING_OBJECT (demux, "Unrecognized stream_id '%s'", stream_id);
2365 selection_handled = FALSE;
2366 goto select_streams_done;
2368 tracks = g_list_append (tracks, track);
2369 GST_DEBUG_OBJECT (demux, "Track found, selected:%d", track->selected);
2372 /* FIXME : ACTIVATING AND DEACTIVATING STREAMS SHOULD BE DONE FROM THE
2373 * SCHEDULING THREAD */
2375 /* FIXME: We want to iterate all streams, mark them as deselected,
2376 * then iterate tracks and mark any streams that have at least 1
2377 * active output track, then loop over all streams again and start/stop
2380 /* Go over all tracks present and (de)select based on current selection */
2381 for (iter = demux->output_period->tracks; iter; iter = iter->next) {
2382 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
2384 if (track->selected && !g_list_find (tracks, track)) {
2385 GST_DEBUG_OBJECT (demux, "De-select track '%s' (active:%d)",
2386 track->stream_id, track->active);
2387 track->selected = FALSE;
2388 track->draining = TRUE;
2389 } else if (!track->selected && g_list_find (tracks, track)) {
2390 GST_DEBUG_OBJECT (demux, "Selecting track '%s'", track->stream_id);
2392 track->selected = TRUE;
2396 /* Start or stop streams based on the updated track selection */
2397 for (iter = demux->output_period->streams; iter; iter = iter->next) {
2398 GstAdaptiveDemux2Stream *stream = iter->data;
2401 gboolean is_running = gst_adaptive_demux2_stream_is_running (stream);
2402 gboolean should_be_running =
2403 gst_adaptive_demux2_stream_has_selected_tracks (stream);
2405 if (!is_running && should_be_running) {
2406 GstClockTime output_running_ts = demux->priv->global_output_position;
2407 GstClockTime start_position;
2409 /* Calculate where we should start the stream, and then
2411 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2413 GST_DEBUG_OBJECT (stream, "(Re)starting stream. Output running ts %"
2414 GST_TIME_FORMAT " in demux segment %" GST_SEGMENT_FORMAT,
2415 GST_TIME_ARGS (output_running_ts), &demux->segment);
2418 gst_segment_position_from_running_time (&demux->segment,
2419 GST_FORMAT_TIME, output_running_ts);
2421 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2423 GST_DEBUG_OBJECT (demux, "Setting stream start position to %"
2424 GST_TIME_FORMAT, GST_TIME_ARGS (start_position));
2426 stream->current_position = stream->start_position = start_position;
2427 stream->compute_segment = TRUE;
2429 /* If output has already begun, ensure we seek this segment
2430 * to the correct restart position when the download loop begins */
2431 if (output_running_ts != 0)
2432 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
2434 /* Activate track pads for this stream */
2435 for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2436 GstAdaptiveDemuxTrack *track =
2437 (GstAdaptiveDemuxTrack *) trackiter->data;
2438 gst_pad_set_active (track->sinkpad, TRUE);
2441 gst_adaptive_demux2_stream_start (stream);
2442 } else if (is_running && !should_be_running) {
2443 /* Stream should not be running and needs stopping */
2444 gst_adaptive_demux2_stream_stop (stream);
2446 /* Set all track sinkpads to inactive for this stream */
2447 for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2448 GstAdaptiveDemuxTrack *track =
2449 (GstAdaptiveDemuxTrack *) trackiter->data;
2450 gst_pad_set_active (track->sinkpad, FALSE);
2455 g_atomic_int_set (&demux->priv->requested_selection_seqnum, seqnum);
2457 select_streams_done:
2458 demux_update_buffering_locked (demux);
2459 demux_post_buffering_locked (demux);
2461 TRACKS_UNLOCK (demux);
2462 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2465 g_list_free (tracks);
2466 return selection_handled;
2470 gst_adaptive_demux_handle_select_streams_event (GstAdaptiveDemux * demux,
2474 gboolean selection_handled;
2476 if (GST_EVENT_SEQNUM (event) ==
2477 g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
2478 GST_DEBUG_OBJECT (demux, "Already handled/handling select-streams %d",
2479 GST_EVENT_SEQNUM (event));
2483 gst_event_parse_select_streams (event, &streams);
2485 handle_stream_selection (demux, streams, GST_EVENT_SEQNUM (event));
2486 g_list_free_full (streams, g_free);
2488 gst_event_unref (event);
2489 return selection_handled;
2493 gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
2496 GstAdaptiveDemux *demux;
2498 demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2500 switch (event->type) {
2501 case GST_EVENT_SEEK:
2503 guint32 seqnum = gst_event_get_seqnum (event);
2504 if (seqnum == demux->priv->segment_seqnum) {
2505 GST_LOG_OBJECT (pad,
2506 "Drop duplicated SEEK event seqnum %" G_GUINT32_FORMAT, seqnum);
2507 gst_event_unref (event);
2510 return gst_adaptive_demux_handle_seek_event (demux, event);
2512 case GST_EVENT_LATENCY:{
2513 /* Upstream and our internal source are irrelevant
2514 * for latency, and we should not fail here to
2515 * configure the latency */
2516 gst_event_unref (event);
2519 case GST_EVENT_QOS:{
2520 GstClockTimeDiff diff;
2521 GstClockTime timestamp;
2522 GstClockTime earliest_time;
2524 gst_event_parse_qos (event, NULL, NULL, &diff, ×tamp);
2525 /* Only take into account lateness if late */
2527 earliest_time = timestamp + 2 * diff;
2529 earliest_time = timestamp;
2531 GST_OBJECT_LOCK (demux);
2532 if (!GST_CLOCK_TIME_IS_VALID (demux->priv->qos_earliest_time) ||
2533 earliest_time > demux->priv->qos_earliest_time) {
2534 demux->priv->qos_earliest_time = earliest_time;
2535 GST_DEBUG_OBJECT (demux, "qos_earliest_time %" GST_TIME_FORMAT,
2536 GST_TIME_ARGS (demux->priv->qos_earliest_time));
2538 GST_OBJECT_UNLOCK (demux);
2541 case GST_EVENT_SELECT_STREAMS:
2543 return gst_adaptive_demux_handle_select_streams_event (demux, event);
2549 return gst_pad_event_default (pad, parent, event);
2553 gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
2556 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2557 gboolean ret = FALSE;
2563 switch (query->type) {
2564 case GST_QUERY_DURATION:{
2566 GstClockTime duration = GST_CLOCK_TIME_NONE;
2568 gst_query_parse_duration (query, &fmt, NULL);
2570 if (gst_adaptive_demux_is_live (demux)) {
2571 /* We are able to answer this query: the duration is unknown */
2572 gst_query_set_duration (query, fmt, -1);
2577 if (fmt == GST_FORMAT_TIME
2578 && g_atomic_int_get (&demux->priv->have_manifest)) {
2580 GST_MANIFEST_LOCK (demux);
2581 duration = demux->priv->duration;
2582 GST_MANIFEST_UNLOCK (demux);
2584 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
2585 gst_query_set_duration (query, GST_FORMAT_TIME, duration);
2590 GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
2591 GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
2594 case GST_QUERY_LATENCY:{
2595 gst_query_set_latency (query, FALSE, 0, -1);
2599 case GST_QUERY_SEEKING:{
2604 if (!g_atomic_int_get (&demux->priv->have_manifest)) {
2605 GST_INFO_OBJECT (demux,
2606 "Don't have manifest yet, can't answer seeking query");
2607 return FALSE; /* can't answer without manifest */
2610 GST_MANIFEST_LOCK (demux);
2612 gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
2613 GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
2614 if (fmt == GST_FORMAT_TIME) {
2615 GstClockTime duration;
2616 gboolean can_seek = gst_adaptive_demux_can_seek (demux);
2620 if (gst_adaptive_demux_is_live (demux)) {
2621 ret = gst_adaptive_demux_get_live_seek_range (demux, &start, &stop);
2623 GST_MANIFEST_UNLOCK (demux);
2624 GST_INFO_OBJECT (demux, "can't answer seeking query");
2628 duration = demux->priv->duration;
2629 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
2633 gst_query_set_seeking (query, fmt, can_seek, start, stop);
2634 GST_INFO_OBJECT (demux, "GST_QUERY_SEEKING returning with start : %"
2635 GST_TIME_FORMAT ", stop : %" GST_TIME_FORMAT,
2636 GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
2638 GST_MANIFEST_UNLOCK (demux);
2643 GST_MANIFEST_LOCK (demux);
2645 /* TODO HLS can answer this differently it seems */
2646 if (demux->manifest_uri) {
2647 /* FIXME: (hls) Do we answer with the variant playlist, with the current
2648 * playlist or the the uri of the last downlowaded fragment? */
2649 gst_query_set_uri (query, demux->manifest_uri);
2653 GST_MANIFEST_UNLOCK (demux);
2655 case GST_QUERY_SELECTABLE:
2656 gst_query_set_selectable (query, TRUE);
2660 /* Don't forward queries upstream because of the special nature of this
2661 * "demuxer", which relies on the upstream element only to be fed
2671 gst_adaptive_demux_handle_lost_sync (GstAdaptiveDemux * demux)
2675 GST_WARNING_OBJECT (demux, "Lost synchronization, seeking back to live head");
2678 gst_event_new_seek (1.0, GST_FORMAT_TIME,
2679 GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT, GST_SEEK_TYPE_END, 0,
2680 GST_SEEK_TYPE_NONE, 0);
2681 gst_adaptive_demux_handle_seek_event (demux, seek);
2686 /* Called when the scheduler starts, to kick off manifest updates
2687 * and stream downloads */
2689 gst_adaptive_demux_scheduler_start_cb (GstAdaptiveDemux * demux)
2693 GST_INFO_OBJECT (demux, "Starting streams' tasks");
2695 iter = demux->input_period->streams;
2697 for (; iter; iter = g_list_next (iter)) {
2698 GstAdaptiveDemux2Stream *stream = iter->data;
2700 /* If we need to process this stream to discover tracks *OR* it has any
2701 * tracks which are selected, start it now */
2702 if ((stream->pending_tracks == TRUE)
2703 || gst_adaptive_demux2_stream_is_selected_locked (stream))
2704 gst_adaptive_demux2_stream_start (stream);
2710 /* must be called with manifest_lock taken */
2712 gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux)
2714 if (!gst_adaptive_demux2_is_running (demux)) {
2715 GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
2719 GST_DEBUG_OBJECT (demux, "Starting the SCHEDULER task");
2720 gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2721 (GSourceFunc) gst_adaptive_demux_scheduler_start_cb, demux, NULL);
2723 TRACKS_LOCK (demux);
2724 demux->priv->flushing = FALSE;
2725 GST_DEBUG_OBJECT (demux, "Starting the output task");
2726 gst_task_start (demux->priv->output_task);
2727 TRACKS_UNLOCK (demux);
2730 /* must be called with manifest_lock taken */
2732 gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux * demux)
2734 GST_DEBUG_OBJECT (demux, "requesting stop of the manifest update task");
2735 if (demux->priv->manifest_updates_cb != 0) {
2736 gst_adaptive_demux_loop_cancel_call (demux->priv->scheduler_task,
2737 demux->priv->manifest_updates_cb);
2738 demux->priv->manifest_updates_cb = 0;
2742 static gboolean gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux);
2744 /* must be called with manifest_lock taken */
2746 gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux * demux)
2748 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2750 if (gst_adaptive_demux_is_live (demux)) {
2751 /* Task to periodically update the manifest */
2752 if (demux_class->requires_periodical_playlist_update (demux)) {
2753 GST_DEBUG_OBJECT (demux, "requesting start of the manifest update task");
2754 if (demux->priv->manifest_updates_cb == 0) {
2755 demux->priv->manifest_updates_cb =
2756 gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2757 (GSourceFunc) gst_adaptive_demux_updates_start_cb, demux, NULL);
2763 /* must be called with manifest_lock taken
2764 * This function will temporarily release manifest_lock in order to join the
2766 * The api_lock will still protect it against other threads trying to modify
2767 * the demux element.
2770 gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux, gboolean stop_updates)
2772 GST_LOG_OBJECT (demux, "Stopping tasks");
2775 gst_adaptive_demux_stop_manifest_update_task (demux);
2777 TRACKS_LOCK (demux);
2778 if (demux->input_period)
2779 gst_adaptive_demux_period_stop_tasks (demux->input_period);
2781 demux->priv->flushing = TRUE;
2782 g_cond_signal (&demux->priv->tracks_add);
2783 gst_task_stop (demux->priv->output_task);
2784 TRACKS_UNLOCK (demux);
2786 gst_task_join (demux->priv->output_task);
2788 demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2791 /* must be called with manifest_lock taken */
2793 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event)
2796 gboolean ret = TRUE;
2798 GST_DEBUG_OBJECT (demux, "event %" GST_PTR_FORMAT, event);
2800 TRACKS_LOCK (demux);
2801 for (iter = demux->priv->outputs; iter; iter = g_list_next (iter)) {
2802 OutputSlot *slot = (OutputSlot *) iter->data;
2803 gst_event_ref (event);
2804 GST_DEBUG_OBJECT (slot->pad, "Pushing event");
2805 ret = ret & gst_pad_push_event (slot->pad, event);
2806 if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
2807 slot->pushed_timed_data = FALSE;
2809 TRACKS_UNLOCK (demux);
2810 gst_event_unref (event);
2814 /* must be called with manifest_lock taken */
2816 gst_adaptive_demux2_stream_set_caps (GstAdaptiveDemux2Stream * stream,
2819 GST_DEBUG_OBJECT (stream,
2820 "setting new caps for stream %" GST_PTR_FORMAT, caps);
2821 gst_caps_replace (&stream->pending_caps, caps);
2822 gst_caps_unref (caps);
2825 /* must be called with manifest_lock taken */
2827 gst_adaptive_demux2_stream_set_tags (GstAdaptiveDemux2Stream * stream,
2830 GST_DEBUG_OBJECT (stream,
2831 "setting new tags for stream %" GST_PTR_FORMAT, tags);
2832 if (stream->pending_tags) {
2833 gst_tag_list_unref (stream->pending_tags);
2835 stream->pending_tags = tags;
2838 /* must be called with manifest_lock taken */
2840 gst_adaptive_demux2_stream_queue_event (GstAdaptiveDemux2Stream * stream,
2843 stream->pending_events = g_list_append (stream->pending_events, event);
2847 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
2853 /* Called when a stream needs waking after the manifest is updated */
2855 gst_adaptive_demux2_stream_wants_manifest_update (GstAdaptiveDemux * demux)
2857 demux->priv->stream_waiting_for_manifest = TRUE;
2861 gst_adaptive_demux_manifest_update_cb (GstAdaptiveDemux * demux)
2863 GstFlowReturn ret = GST_FLOW_OK;
2864 gboolean schedule_again = TRUE;
2866 GST_MANIFEST_LOCK (demux);
2867 demux->priv->manifest_updates_cb = 0;
2869 /* Updating playlist only needed for live playlists */
2870 if (!gst_adaptive_demux_is_live (demux)) {
2871 GST_MANIFEST_UNLOCK (demux);
2872 return G_SOURCE_REMOVE;
2875 GST_DEBUG_OBJECT (demux, "Updating playlist");
2876 ret = gst_adaptive_demux_update_manifest (demux);
2878 if (ret == GST_FLOW_EOS) {
2879 GST_MANIFEST_UNLOCK (demux);
2880 return G_SOURCE_REMOVE;
2883 if (ret == GST_FLOW_OK) {
2884 GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
2885 demux->priv->update_failed_count = 0;
2887 /* Wake up download tasks */
2888 if (demux->priv->stream_waiting_for_manifest) {
2891 for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
2892 GstAdaptiveDemux2Stream *stream = iter->data;
2893 gst_adaptive_demux2_stream_on_manifest_update (stream);
2895 demux->priv->stream_waiting_for_manifest = FALSE;
2898 demux->priv->update_failed_count++;
2900 if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
2901 GST_WARNING_OBJECT (demux, "Could not update the playlist, flow: %s",
2902 gst_flow_get_name (ret));
2904 GST_ELEMENT_ERROR (demux, STREAM, FAILED,
2905 (_("Internal data stream error.")), ("Could not update playlist"));
2906 GST_DEBUG_OBJECT (demux, "Stopped manifest updates because of error");
2907 schedule_again = FALSE;
2911 if (ret == GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC)
2912 gst_adaptive_demux_handle_lost_sync (demux);
2914 if (schedule_again) {
2915 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2917 demux->priv->manifest_updates_cb =
2918 gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
2919 klass->get_manifest_update_interval (demux) * GST_USECOND,
2920 (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
2923 GST_MANIFEST_UNLOCK (demux);
2925 return G_SOURCE_REMOVE;
2929 gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux)
2931 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2933 /* Loop for updating of the playlist. This periodically checks if
2934 * the playlist is updated and does so, then signals the streaming
2935 * thread in case it can continue downloading now. */
2937 /* block until the next scheduled update or the signal to quit this thread */
2938 GST_DEBUG_OBJECT (demux, "Started updates task");
2939 demux->priv->manifest_updates_cb =
2940 gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
2941 klass->get_manifest_update_interval (demux) * GST_USECOND,
2942 (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
2944 return G_SOURCE_REMOVE;
2948 find_replacement_slot_for_track (GstAdaptiveDemux * demux,
2949 GstAdaptiveDemuxTrack * track)
2953 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
2954 OutputSlot *slot = (OutputSlot *) tmp->data;
2955 /* Incompatible output type */
2956 if (slot->type != track->type)
2959 /* Slot which is already assigned to this pending track */
2960 if (slot->pending_track == track)
2963 /* slot already used for another pending track */
2964 if (slot->pending_track != NULL)
2967 /* Current output track is of the same type and is draining */
2968 if (slot->track && slot->track->draining)
2975 /* TRACKS_LOCK taken */
2977 find_slot_for_track (GstAdaptiveDemux * demux, GstAdaptiveDemuxTrack * track)
2981 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
2982 OutputSlot *slot = (OutputSlot *) tmp->data;
2984 if (slot->track == track)
2991 /* TRACKS_LOCK held */
2993 all_selected_tracks_are_active (GstAdaptiveDemux * demux, guint32 seqnum)
2998 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
2999 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3001 if (track->selected && !track->active)
3005 /* All selected tracks are active, created message */
3007 gst_message_new_streams_selected (GST_OBJECT (demux),
3008 demux->output_period->collection);
3009 GST_MESSAGE_SEQNUM (msg) = seqnum;
3010 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3011 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3012 if (track->active) {
3013 gst_message_streams_selected_add (msg, track->stream_object);
3021 gst_adaptive_demux_send_initial_events (GstAdaptiveDemux * demux,
3024 GstAdaptiveDemuxTrack *track = slot->track;
3027 /* Send EVENT_STREAM_START */
3028 event = gst_event_new_stream_start (track->stream_id);
3029 if (demux->have_group_id)
3030 gst_event_set_group_id (event, demux->group_id);
3031 gst_event_set_stream_flags (event, track->flags);
3032 gst_event_set_stream (event, track->stream_object);
3033 GST_DEBUG_OBJECT (demux, "Sending stream-start for track '%s'",
3035 gst_pad_push_event (slot->pad, event);
3037 /* Send EVENT_STREAM_COLLECTION */
3038 event = gst_event_new_stream_collection (demux->output_period->collection);
3039 GST_DEBUG_OBJECT (demux, "Sending stream-collection for track '%s'",
3041 gst_pad_push_event (slot->pad, event);
3043 /* Mark all sticky events for re-sending */
3044 gst_event_store_mark_all_undelivered (&track->sticky_events);
3048 * Called with TRACKS_LOCK taken
3051 check_and_handle_selection_update_locked (GstAdaptiveDemux * demux)
3054 guint requested_selection_seqnum;
3057 /* If requested_selection_seqnum != current_selection_seqnum, re-check all
3058 output slots vs active/draining tracks */
3059 requested_selection_seqnum =
3060 g_atomic_int_get (&demux->priv->requested_selection_seqnum);
3062 if (requested_selection_seqnum == demux->priv->current_selection_seqnum)
3065 GST_DEBUG_OBJECT (demux, "Selection changed, re-checking all output slots");
3067 /* Go over all slots, and if they have a pending track that's no longer
3068 * selected, clear it so the slot can be reused */
3069 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3070 OutputSlot *slot = (OutputSlot *) tmp->data;
3072 if (slot->pending_track != NULL && !slot->pending_track->selected) {
3073 GST_DEBUG_OBJECT (demux,
3074 "Removing deselected track '%s' as pending from output of current track '%s'",
3075 slot->pending_track->stream_id, slot->track->stream_id);
3076 gst_adaptive_demux_track_unref (slot->pending_track);
3077 slot->pending_track = NULL;
3081 /* Go over all tracks and create/re-assign/remove slots */
3082 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3083 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3085 if (track->selected) {
3086 OutputSlot *slot = find_slot_for_track (demux, track);
3088 /* 0. Track is selected and has a slot. Nothing to do */
3090 GST_DEBUG_OBJECT (demux, "Track '%s' is already being outputted",
3095 slot = find_replacement_slot_for_track (demux, track);
3097 /* 1. There is an existing slot of the same type which is currently
3098 * draining, assign this track as a replacement for it */
3099 g_assert (slot->pending_track == NULL || slot->pending_track == track);
3100 if (slot->pending_track == NULL) {
3101 slot->pending_track = gst_adaptive_demux_track_ref (track);
3102 GST_DEBUG_OBJECT (demux,
3103 "Track '%s' (period %u) will be used on output of track '%s' (period %u)",
3104 track->stream_id, track->period_num,
3105 slot->track->stream_id, slot->track->period_num);
3108 /* 2. There is no compatible replacement slot, create a new one */
3109 slot = gst_adaptive_demux_output_slot_new (demux, track->type);
3110 GST_DEBUG_OBJECT (demux, "Created slot for track '%s'",
3112 demux->priv->outputs = g_list_append (demux->priv->outputs, slot);
3114 track->update_next_segment = TRUE;
3116 slot->track = gst_adaptive_demux_track_ref (track);
3117 track->active = TRUE;
3118 gst_adaptive_demux_send_initial_events (demux, slot);
3121 /* If we were draining this track, we no longer are */
3122 track->draining = FALSE;
3126 /* Finally check all slots have a current/pending track. If not remove it */
3127 for (tmp = demux->priv->outputs; tmp;) {
3128 OutputSlot *slot = (OutputSlot *) tmp->data;
3129 /* We should never has slots without target tracks */
3130 g_assert (slot->track);
3131 if (slot->track->draining && !slot->pending_track) {
3132 GstAdaptiveDemux2Stream *stream;
3134 GST_DEBUG_OBJECT (demux, "Output for track '%s' is no longer used",
3135 slot->track->stream_id);
3136 slot->track->active = FALSE;
3138 /* If the stream feeding this track is stopped, flush and clear
3139 * the track now that it's going inactive. If the stream was not
3140 * found, it means we advanced past that period already (and the
3141 * stream was stopped and discarded) */
3142 stream = find_stream_for_track_locked (demux, slot->track);
3143 if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3144 gst_adaptive_demux_track_flush (slot->track);
3146 tmp = demux->priv->outputs = g_list_remove (demux->priv->outputs, slot);
3147 gst_adaptive_demux_output_slot_free (demux, slot);
3152 demux->priv->current_selection_seqnum = requested_selection_seqnum;
3153 msg = all_selected_tracks_are_active (demux, requested_selection_seqnum);
3155 TRACKS_UNLOCK (demux);
3156 GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3157 gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3158 TRACKS_LOCK (demux);
3162 /* TRACKS_LOCK held */
3164 gst_adaptive_demux_advance_output_period (GstAdaptiveDemux * demux)
3167 GstAdaptiveDemuxPeriod *previous_period;
3168 GstStreamCollection *collection;
3170 /* Grab the next period, should be demux->periods->next->data */
3171 previous_period = g_queue_pop_head (demux->priv->periods);
3173 /* Remove ref held by demux->output_period */
3174 gst_adaptive_demux_period_unref (previous_period);
3175 demux->output_period =
3176 gst_adaptive_demux_period_ref (g_queue_peek_head (demux->priv->periods));
3178 GST_DEBUG_OBJECT (demux, "Moved output to period %d",
3179 demux->output_period->period_num);
3181 /* We can now post the collection of the new period */
3182 collection = demux->output_period->collection;
3183 TRACKS_UNLOCK (demux);
3184 gst_element_post_message (GST_ELEMENT_CAST (demux),
3185 gst_message_new_stream_collection (GST_OBJECT (demux), collection));
3186 TRACKS_LOCK (demux);
3188 /* Unselect all tracks of the previous period */
3189 for (iter = previous_period->tracks; iter; iter = iter->next) {
3190 GstAdaptiveDemuxTrack *track = iter->data;
3191 if (track->selected) {
3192 track->selected = FALSE;
3193 track->draining = TRUE;
3197 /* Force a selection re-check */
3198 g_atomic_int_inc (&demux->priv->requested_selection_seqnum);
3199 check_and_handle_selection_update_locked (demux);
3201 /* Remove the final ref on the previous period now that we have done the switch */
3202 gst_adaptive_demux_period_unref (previous_period);
3207 /* Called with TRACKS_LOCK taken */
3209 handle_slot_pending_track_switch_locked (GstAdaptiveDemux * demux,
3212 GstAdaptiveDemuxTrack *track = slot->track;
3214 gboolean pending_is_ready;
3215 GstAdaptiveDemux2Stream *stream;
3217 /* If we have a pending track for this slot, the current track should be
3218 * draining and no longer selected */
3219 g_assert (track->draining && !track->selected);
3221 /* If we're draining, check if the pending track has enough data *or* that
3222 we've already drained out entirely */
3224 (slot->pending_track->level_time >=
3225 slot->pending_track->buffering_threshold);
3226 pending_is_ready |= slot->pending_track->eos;
3228 if (!pending_is_ready && gst_queue_array_get_length (track->queue) > 0) {
3229 GST_DEBUG_OBJECT (demux,
3230 "Replacement track '%s' doesn't have enough data for switching yet",
3231 slot->pending_track->stream_id);
3235 GST_DEBUG_OBJECT (demux,
3236 "Pending replacement track has enough data, switching");
3237 track->active = FALSE;
3238 track->draining = FALSE;
3240 /* If the stream feeding this track is stopped, flush and clear
3241 * the track now that it's going inactive. If the stream was not
3242 * found, it means we advanced past that period already (and the
3243 * stream was stopped and discarded) */
3244 stream = find_stream_for_track_locked (demux, track);
3245 if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3246 gst_adaptive_demux_track_flush (track);
3248 gst_adaptive_demux_track_unref (track);
3249 /* We steal the reference of pending_track */
3250 track = slot->track = slot->pending_track;
3251 slot->pending_track = NULL;
3252 slot->track->active = TRUE;
3254 /* Make sure the track segment will start at the current position */
3255 track->update_next_segment = TRUE;
3257 /* Send stream start and collection, and schedule sticky events */
3258 gst_adaptive_demux_send_initial_events (demux, slot);
3260 /* Can we emit the streams-selected message now ? */
3262 all_selected_tracks_are_active (demux,
3263 g_atomic_int_get (&demux->priv->requested_selection_seqnum));
3265 TRACKS_UNLOCK (demux);
3266 GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3267 gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3268 TRACKS_LOCK (demux);
3274 gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux)
3277 GstClockTimeDiff global_output_position = GST_CLOCK_STIME_NONE;
3278 gboolean wait_for_data = FALSE;
3279 gboolean all_tracks_empty;
3282 GST_DEBUG_OBJECT (demux, "enter");
3284 TRACKS_LOCK (demux);
3286 /* Check if stopping */
3287 if (demux->priv->flushing) {
3288 ret = GST_FLOW_FLUSHING;
3292 /* If the selection changed, handle it */
3293 check_and_handle_selection_update_locked (demux);
3297 global_output_position = GST_CLOCK_STIME_NONE;
3298 all_tracks_empty = TRUE;
3300 if (wait_for_data) {
3301 GST_DEBUG_OBJECT (demux, "Waiting for data");
3302 g_cond_wait (&demux->priv->tracks_add, &demux->priv->tracks_lock);
3303 GST_DEBUG_OBJECT (demux, "Done waiting for data");
3304 if (demux->priv->flushing) {
3305 ret = GST_FLOW_FLUSHING;
3308 wait_for_data = FALSE;
3311 /* Grab/Recalculate current global output position
3312 * This is the minimum pending output position of all tracks used for output
3314 * If there is a track which is empty and not EOS, wait for it to receive data
3315 * then recalculate global output position.
3317 * This also pushes downstream all non-timed data that might be present.
3319 * IF all tracks are EOS : stop task
3321 GST_LOG_OBJECT (demux, "Calculating global output position of output slots");
3322 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3323 OutputSlot *slot = (OutputSlot *) tmp->data;
3324 GstAdaptiveDemuxTrack *track;
3326 /* If there is a pending track, Check if it's time to switch to it */
3327 if (slot->pending_track)
3328 handle_slot_pending_track_switch_locked (demux, slot);
3330 track = slot->track;
3332 if (!track->active) {
3333 /* Note: Edward: I can't see in what cases we would end up with inactive
3334 tracks assigned to slots. */
3335 GST_ERROR_OBJECT (demux, "FIXME : Handle track switching");
3336 g_assert (track->active);
3340 if (track->next_position == GST_CLOCK_STIME_NONE) {
3341 gst_adaptive_demux_track_update_next_position (track);
3344 GST_TRACE_OBJECT (demux,
3345 "Looking at track %s (period %u). next_position %" GST_STIME_FORMAT,
3346 track->stream_id, track->period_num,
3347 GST_STIME_ARGS (track->next_position));
3349 if (track->next_position != GST_CLOCK_STIME_NONE) {
3350 if (global_output_position == GST_CLOCK_STIME_NONE)
3351 global_output_position = track->next_position;
3353 global_output_position =
3354 MIN (global_output_position, track->next_position);
3355 track->waiting_add = FALSE;
3356 all_tracks_empty = FALSE;
3357 } else if (!track->eos) {
3358 GST_DEBUG_OBJECT (demux, "Need timed data on track %s (period %u)",
3359 track->stream_id, track->period_num);
3360 all_tracks_empty = FALSE;
3361 wait_for_data = track->waiting_add = TRUE;
3363 GST_DEBUG_OBJECT (demux,
3364 "Track %s (period %u) is EOS, not waiting for timed data",
3365 track->stream_id, track->period_num);
3367 if (gst_queue_array_get_length (track->queue) > 0) {
3368 all_tracks_empty = FALSE;
3376 if (all_tracks_empty && demux->output_period->has_next_period) {
3377 GST_DEBUG_OBJECT (demux, "Period %d is drained, switching to next period",
3378 demux->output_period->period_num);
3379 if (!gst_adaptive_demux_advance_output_period (demux)) {
3380 /* Failed to move to next period, error out */
3381 ret = GST_FLOW_ERROR;
3384 /* Restart the loop */
3388 GST_DEBUG_OBJECT (demux, "Outputting data for position %" GST_STIME_FORMAT,
3389 GST_STIME_ARGS (global_output_position));
3393 * We know all active tracks have pending timed data
3394 * * while track next_position <= global output position
3395 * * push pending data
3396 * * Update track next_position
3397 * * recalculate global output position
3398 * * Pop next pending data from track and update pending position
3401 gboolean need_restart = FALSE;
3403 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3404 OutputSlot *slot = (OutputSlot *) tmp->data;
3405 GstAdaptiveDemuxTrack *track = slot->track;
3407 GST_LOG_OBJECT (track->element,
3408 "active:%d draining:%d selected:%d next_position:%" GST_STIME_FORMAT
3409 " global_output_position:%" GST_STIME_FORMAT, track->active,
3410 track->draining, track->selected, GST_STIME_ARGS (track->next_position),
3411 GST_STIME_ARGS (global_output_position));
3416 while (global_output_position == GST_CLOCK_STIME_NONE
3417 || !slot->pushed_timed_data
3418 || ((track->next_position != GST_CLOCK_STIME_NONE)
3419 && track->next_position <= global_output_position)
3420 || ((track->next_position == GST_CLOCK_STIME_NONE) && track->eos)) {
3421 GstMiniObject *mo = track_dequeue_data_locked (demux, track, TRUE);
3424 GST_DEBUG_OBJECT (demux,
3425 "Track '%s' (period %u) doesn't have any pending data (eos:%d pushed_timed_data:%d)",
3426 track->stream_id, track->period_num, track->eos,
3427 slot->pushed_timed_data);
3428 /* This should only happen if the track is EOS, or exactly in between
3429 * the parser outputting segment/caps before buffers. */
3430 g_assert (track->eos || !slot->pushed_timed_data);
3432 /* If we drained the track, but there's a pending track on the slot
3433 * loop again to activate it */
3434 if (slot->pending_track) {
3435 GST_DEBUG_OBJECT (demux,
3436 "Track '%s' (period %u) drained, but has a pending track to activate",
3437 track->stream_id, track->period_num);
3443 demux_update_buffering_locked (demux);
3444 demux_post_buffering_locked (demux);
3445 TRACKS_UNLOCK (demux);
3447 GST_DEBUG_OBJECT (demux,
3448 "Track '%s' (period %u) dequeued %" GST_PTR_FORMAT, track->stream_id,
3449 track->period_num, mo);
3451 if (GST_IS_EVENT (mo)) {
3452 GstEvent *event = (GstEvent *) mo;
3453 if (GST_EVENT_TYPE (event) == GST_EVENT_GAP) {
3454 slot->pushed_timed_data = TRUE;
3455 } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
3456 /* If there is a pending next period, don't send the EOS */
3457 if (demux->output_period->has_next_period) {
3458 GST_LOG_OBJECT (demux,
3459 "Dropping EOS on track '%s' (period %u) before next period",
3460 track->stream_id, track->period_num);
3461 gst_event_store_mark_delivered (&track->sticky_events, event);
3462 gst_event_unref (event);
3464 /* We'll need to re-check if all tracks are empty again above */
3465 need_restart = TRUE;
3469 if (event != NULL) {
3470 gst_pad_push_event (slot->pad, gst_event_ref (event));
3472 if (GST_EVENT_IS_STICKY (event))
3473 gst_event_store_mark_delivered (&track->sticky_events, event);
3474 gst_event_unref (event);
3476 } else if (GST_IS_BUFFER (mo)) {
3477 GstBuffer *buffer = (GstBuffer *) mo;
3479 if (track->output_discont) {
3480 if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT)) {
3481 buffer = gst_buffer_make_writable (buffer);
3482 GST_DEBUG_OBJECT (slot->pad,
3483 "track %s marking discont %" GST_PTR_FORMAT, track->stream_id,
3485 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
3487 track->output_discont = FALSE;
3489 slot->flow_ret = gst_pad_push (slot->pad, buffer);
3491 gst_flow_combiner_update_pad_flow (demux->priv->flowcombiner,
3492 slot->pad, slot->flow_ret);
3493 GST_DEBUG_OBJECT (slot->pad,
3494 "track %s (period %u) push returned %s (combined %s)",
3495 track->stream_id, track->period_num,
3496 gst_flow_get_name (slot->flow_ret), gst_flow_get_name (ret));
3497 slot->pushed_timed_data = TRUE;
3499 GST_ERROR ("Unhandled miniobject %" GST_PTR_FORMAT, mo);
3502 TRACKS_LOCK (demux);
3503 gst_adaptive_demux_track_update_next_position (track);
3505 if (ret != GST_FLOW_OK)
3510 /* Store global output position */
3511 if (global_output_position != GST_CLOCK_STIME_NONE) {
3512 demux->priv->global_output_position = global_output_position;
3514 /* And see if any streams need to be woken for more input */
3515 gst_adaptive_demux_period_check_input_wakeup_locked (demux->input_period,
3516 global_output_position);
3522 if (global_output_position == GST_CLOCK_STIME_NONE) {
3523 if (!demux->priv->flushing) {
3524 GST_DEBUG_OBJECT (demux,
3525 "Pausing output task after reaching NONE global_output_position");
3526 gst_task_pause (demux->priv->output_task);
3530 TRACKS_UNLOCK (demux);
3531 GST_DEBUG_OBJECT (demux, "leave");
3536 GST_DEBUG_OBJECT (demux, "Pausing due to %s", gst_flow_get_name (ret));
3537 /* If the flushing flag is set, then the task is being
3538 * externally stopped, so don't go to pause(), otherwise we
3539 * should so we don't keep spinning */
3540 if (!demux->priv->flushing) {
3541 GST_DEBUG_OBJECT (demux, "Pausing task due to %s",
3542 gst_flow_get_name (ret));
3543 gst_task_pause (demux->priv->output_task);
3546 TRACKS_UNLOCK (demux);
3548 if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
3549 GstEvent *eos = gst_event_new_eos ();
3551 if (ret != GST_FLOW_EOS) {
3552 GST_ELEMENT_FLOW_ERROR (demux, ret);
3555 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3556 if (demux->priv->segment_seqnum != GST_SEQNUM_INVALID)
3557 gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
3558 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3560 gst_adaptive_demux_push_src_event (demux, eos);
3567 /* must be called from the scheduler */
3569 gst_adaptive_demux_is_live (GstAdaptiveDemux * demux)
3571 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3574 return klass->is_live (demux);
3579 handle_manifest_download_complete (DownloadRequest * request,
3580 DownloadRequestState state, GstAdaptiveDemux * demux)
3582 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3584 GstFlowReturn result;
3586 g_free (demux->manifest_base_uri);
3587 g_free (demux->manifest_uri);
3589 if (request->redirect_permanent && request->redirect_uri) {
3590 demux->manifest_uri = g_strdup (request->redirect_uri);
3591 demux->manifest_base_uri = NULL;
3593 demux->manifest_uri = g_strdup (request->uri);
3594 demux->manifest_base_uri = g_strdup (request->redirect_uri);
3597 buffer = download_request_take_buffer (request);
3599 /* We should always have a buffer since this function is the non-error
3600 * callback for the download */
3603 result = klass->update_manifest_data (demux, buffer);
3604 gst_buffer_unref (buffer);
3606 /* FIXME: Should the manifest uri vars be reverted to original
3607 * values if updating fails? */
3609 if (result == GST_FLOW_OK) {
3610 GstClockTime duration;
3611 /* Send an updated duration message */
3612 duration = klass->get_duration (demux);
3613 if (duration != GST_CLOCK_TIME_NONE) {
3614 GST_DEBUG_OBJECT (demux,
3615 "Sending duration message : %" GST_TIME_FORMAT,
3616 GST_TIME_ARGS (duration));
3617 gst_element_post_message (GST_ELEMENT (demux),
3618 gst_message_new_duration_changed (GST_OBJECT (demux)));
3620 GST_DEBUG_OBJECT (demux,
3621 "Duration unknown, can not send the duration message");
3624 /* If a manifest changes it's liveness or periodic updateness, we need
3625 * to start/stop the manifest update task appropriately */
3626 /* Keep this condition in sync with the one in
3627 * gst_adaptive_demux_start_manifest_update_task()
3629 if (gst_adaptive_demux_is_live (demux) &&
3630 klass->requires_periodical_playlist_update (demux)) {
3631 gst_adaptive_demux_start_manifest_update_task (demux);
3633 gst_adaptive_demux_stop_manifest_update_task (demux);
3639 handle_manifest_download_failure (DownloadRequest * request,
3640 DownloadRequestState state, GstAdaptiveDemux * demux)
3642 GST_FIXME_OBJECT (demux, "Manifest download failed.");
3643 /* Retry or error out here */
3646 static GstFlowReturn
3647 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux)
3649 DownloadRequest *request;
3650 GstFlowReturn ret = GST_FLOW_OK;
3651 GError *error = NULL;
3653 request = download_request_new_uri (demux->manifest_uri);
3655 download_request_set_callbacks (request,
3656 (DownloadRequestEventCallback) handle_manifest_download_complete,
3657 (DownloadRequestEventCallback) handle_manifest_download_failure,
3660 if (!downloadhelper_submit_request (demux->download_helper, NULL,
3661 DOWNLOAD_FLAG_COMPRESS | DOWNLOAD_FLAG_FORCE_REFRESH, request,
3664 GST_ELEMENT_WARNING (demux, RESOURCE, FAILED,
3665 ("Failed to download manifest: %s", error->message), (NULL));
3666 g_clear_error (&error);
3668 ret = GST_FLOW_NOT_LINKED;
3674 /* must be called with manifest_lock taken */
3676 gst_adaptive_demux_update_manifest (GstAdaptiveDemux * demux)
3678 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3681 ret = klass->update_manifest (demux);
3686 /* must be called with manifest_lock taken */
3688 gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
3690 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3691 gboolean ret = FALSE;
3693 if (klass->has_next_period)
3694 ret = klass->has_next_period (demux);
3695 GST_DEBUG_OBJECT (demux, "Has next period: %d", ret);
3699 /* must be called with manifest_lock taken */
3701 gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
3703 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3704 GstAdaptiveDemuxPeriod *previous_period = demux->input_period;
3706 g_return_if_fail (klass->advance_period != NULL);
3708 GST_DEBUG_OBJECT (demux, "Advancing to next period");
3709 /* FIXME : no return value ? What if it fails ? */
3710 klass->advance_period (demux);
3712 if (previous_period == demux->input_period) {
3713 GST_ERROR_OBJECT (demux, "Advancing period failed");
3717 /* Stop the previous period stream tasks */
3718 gst_adaptive_demux_period_stop_tasks (previous_period);
3720 gst_adaptive_demux_update_collection (demux, demux->input_period);
3721 /* Figure out a pre-emptive selection based on the output period selection */
3722 gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
3723 demux->output_period);
3725 gst_adaptive_demux_prepare_streams (demux, FALSE);
3726 gst_adaptive_demux_start_tasks (demux);
3730 * gst_adaptive_demux_get_monotonic_time:
3731 * Returns: a monotonically increasing time, using the system realtime clock
3734 gst_adaptive_demux2_get_monotonic_time (GstAdaptiveDemux * demux)
3736 g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
3737 return gst_adaptive_demux_clock_get_time (demux->realtime_clock);
3741 * gst_adaptive_demux_get_client_now_utc:
3742 * @demux: #GstAdaptiveDemux
3743 * Returns: the client's estimate of UTC
3745 * Used to find the client's estimate of UTC, using the system realtime clock.
3748 gst_adaptive_demux2_get_client_now_utc (GstAdaptiveDemux * demux)
3750 return gst_adaptive_demux_clock_get_now_utc (demux->realtime_clock);
3754 * gst_adaptive_demux_is_running
3755 * @demux: #GstAdaptiveDemux
3756 * Returns: whether the demuxer is processing data
3758 * Returns FALSE if shutdown has started (transitioning down from
3759 * PAUSED), otherwise TRUE.
3762 gst_adaptive_demux2_is_running (GstAdaptiveDemux * demux)
3764 return g_atomic_int_get (&demux->running);
3768 * gst_adaptive_demux_get_qos_earliest_time:
3770 * Returns: The QOS earliest time
3775 gst_adaptive_demux2_get_qos_earliest_time (GstAdaptiveDemux * demux)
3777 GstClockTime earliest;
3779 GST_OBJECT_LOCK (demux);
3780 earliest = demux->priv->qos_earliest_time;
3781 GST_OBJECT_UNLOCK (demux);
3787 gst_adaptive_demux2_add_stream (GstAdaptiveDemux * demux,
3788 GstAdaptiveDemux2Stream * stream)
3790 g_return_val_if_fail (demux && stream, FALSE);
3792 /* FIXME : Migrate to parent */
3793 g_return_val_if_fail (stream->demux == NULL, FALSE);
3795 GST_DEBUG_OBJECT (demux, "Adding stream %s", GST_OBJECT_NAME (stream));
3797 TRACKS_LOCK (demux);
3798 if (demux->input_period->prepared) {
3799 GST_ERROR_OBJECT (demux,
3800 "Attempted to add streams but no new period was created");
3801 TRACKS_UNLOCK (demux);
3804 stream->demux = demux;
3805 stream->period = demux->input_period;
3806 demux->input_period->streams =
3807 g_list_append (demux->input_period->streams, stream);
3809 if (stream->tracks) {
3811 for (iter = stream->tracks; iter; iter = iter->next)
3812 if (!gst_adaptive_demux_period_add_track (demux->input_period,
3813 (GstAdaptiveDemuxTrack *) iter->data)) {
3814 GST_ERROR_OBJECT (demux, "Failed to add track elements");
3815 TRACKS_UNLOCK (demux);
3819 TRACKS_UNLOCK (demux);
3823 /* Return the current playback rate including any instant rate multiplier */
3825 gst_adaptive_demux_play_rate (GstAdaptiveDemux * demux)
3828 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3829 rate = demux->segment.rate * demux->instant_rate_multiplier;
3830 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);