3 * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4 * Author: Thiago Santos <thiagoss@osg.samsung.com>
6 * Copyright (C) 2021-2022 Centricular Ltd
7 * Author: Edward Hervey <edward@centricular.com>
8 * Author: Jan Schmidt <jan@centricular.com>
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Library General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Library General Public License for more details.
20 * You should have received a copy of the GNU Library General Public
21 * License along with this library; if not, write to the
22 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23 * Boston, MA 02110-1301, USA.
27 * SECTION:plugin-adaptivedemux2
28 * @short_description: Next Generation adaptive demuxers
30 * What is an adaptive demuxer? Adaptive demuxers are special demuxers in the
31 * sense that they don't actually demux data received from upstream but download
32 * the data themselves.
34 * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and a set
35 * of fragments. The manifest describes the available media and the sequence of
36 * fragments to use. Each fragment contains a small part of the media (typically
37 * only a few seconds). It is possible for the manifest to have the same media
38 * available in different configurations (bitrates for example) so that the
39 * client can select the one that best suits its scenario (network fluctuation,
40 * hardware requirements...).
42 * Furthermore, that manifest can also specify alternative medias (such as audio
43 * or subtitle tracks in different languages). Only the fragments for the
44 * requested selection will be download.
46 * These elements can therefore "adapt" themselves to the network conditions (as
47 * opposed to the server doing that adaptation) and user choices, which is why
48 * they are called "adaptive" demuxers.
50 * Note: These elements require a "streams-aware" container to work
51 * (i.e. urisourcebin, decodebin3, playbin3, or any bin/pipeline with the
52 * GST_BIN_FLAG_STREAMS_AWARE flag set).
55 * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
56 * about the intrinsics of the subclass formats, so the subclasses are
57 * responsible for maintaining the manifest data structures and stream
64 See the adaptive-demuxer.md design documentation for more information
67 The following rules were observed while implementing MT safety in adaptive demux:
68 1. If a variable is accessed from multiple threads and at least one thread
69 writes to it, then all the accesses needs to be done from inside a critical section.
70 2. If thread A wants to join thread B then at the moment it calls gst_task_join
71 it must not hold any mutexes that thread B might take.
73 Adaptive demux API can be called from several threads. More, adaptive demux
74 starts some threads to monitor the download of fragments. In order to protect
75 accesses to shared variables (demux and streams) all the API functions that
76 can be run in different threads will need to get a mutex (manifest_lock)
77 when they start and release it when they end. Because some of those functions
78 can indirectly call other API functions (eg they can generate events or messages
79 that are processed in the same thread) the manifest_lock must be recursive.
81 The manifest_lock will serialize the public API making access to shared
82 variables safe. But some of these functions will try at some moment to join
83 threads created by adaptive demux, or to change the state of src elements
84 (which will block trying to join the src element streaming thread). Because
85 of rule 2, those functions will need to release the manifest_lock during the
86 call of gst_task_join. During this time they can be interrupted by other API calls.
87 For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
88 is called and this will join all threads. In order to prevent interruptions
89 during such period, all the API functions will also use a second lock: api_lock.
90 This will be taken at the beginning of the function and released at the end,
91 but this time this lock will not be temporarily released during join.
92 This lock will be used only by API calls (not by the SCHEDULER task)
93 so it is safe to hold it while joining the threads or changing the src element state. The
94 api_lock will serialise all external requests to adaptive demux. In order to
95 avoid deadlocks, if a function needs to acquire both manifest and api locks,
96 the api_lock will be taken first and the manifest_lock second.
98 By using the api_lock a thread is protected against other API calls.
105 #include "gstadaptivedemux.h"
106 #include "gstadaptivedemux-private.h"
108 #include <glib/gi18n-lib.h>
109 #include <gst/base/gstadapter.h>
110 #include <gst/app/gstappsrc.h>
112 GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
113 #define GST_CAT_DEFAULT adaptivedemux2_debug
115 #define DEFAULT_FAILED_COUNT 3
116 #define DEFAULT_CONNECTION_BITRATE 0
117 #define DEFAULT_BANDWIDTH_TARGET_RATIO 0.8f
119 #define DEFAULT_MIN_BITRATE 0
120 #define DEFAULT_MAX_BITRATE 0
121 #ifdef TIZEN_FEATURE_ADAPTIVE_VARIANT_LIMIT
122 #define DEFAULT_MAX_RESOLUTION 0
125 #define DEFAULT_MAX_BUFFERING_TIME (30 * GST_SECOND)
127 #define DEFAULT_BUFFERING_HIGH_WATERMARK_TIME (30 * GST_SECOND)
128 #define DEFAULT_BUFFERING_LOW_WATERMARK_TIME 0 /* Automatic */
129 #define DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS 0.0
130 #define DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS 0.0
132 #define DEFAULT_CURRENT_LEVEL_TIME_VIDEO 0
133 #define DEFAULT_CURRENT_LEVEL_TIME_AUDIO 0
135 #define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
136 #define GST_API_LOCK(d) g_mutex_lock (GST_API_GET_LOCK (d));
137 #define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
142 PROP_CONNECTION_SPEED,
143 PROP_BANDWIDTH_TARGET_RATIO,
144 PROP_CONNECTION_BITRATE,
147 PROP_CURRENT_BANDWIDTH,
148 PROP_MAX_BUFFERING_TIME,
149 PROP_BUFFERING_HIGH_WATERMARK_TIME,
150 PROP_BUFFERING_LOW_WATERMARK_TIME,
151 PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
152 PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
153 PROP_CURRENT_LEVEL_TIME_VIDEO,
154 PROP_CURRENT_LEVEL_TIME_AUDIO,
155 #ifdef TIZEN_FEATURE_ADAPTIVE_VARIANT_LIMIT
162 static GstStaticPadTemplate gst_adaptive_demux_videosrc_template =
163 GST_STATIC_PAD_TEMPLATE ("video_%02u",
166 GST_STATIC_CAPS_ANY);
168 static GstStaticPadTemplate gst_adaptive_demux_audiosrc_template =
169 GST_STATIC_PAD_TEMPLATE ("audio_%02u",
172 GST_STATIC_CAPS_ANY);
174 static GstStaticPadTemplate gst_adaptive_demux_subtitlesrc_template =
175 GST_STATIC_PAD_TEMPLATE ("subtitle_%02u",
178 GST_STATIC_CAPS_ANY);
180 /* Private structure for a track being outputted */
181 typedef struct _OutputSlot
186 /* Last flow return */
187 GstFlowReturn flow_ret;
192 /* Target track (reference) */
193 GstAdaptiveDemuxTrack *track;
195 /* Pending track (which will replace track) */
196 GstAdaptiveDemuxTrack *pending_track;
198 /* TRUE if a buffer or a gap event was pushed through this slot. */
199 gboolean pushed_timed_data;
202 static GstBinClass *parent_class = NULL;
203 static gint private_offset = 0;
205 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
206 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
207 GstAdaptiveDemuxClass * klass);
208 static void gst_adaptive_demux_finalize (GObject * object);
209 static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
210 element, GstStateChange transition);
211 static gboolean gst_adaptive_demux_query (GstElement * element,
213 static gboolean gst_adaptive_demux_send_event (GstElement * element,
216 static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
218 static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
220 static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
221 GstObject * parent, GstBuffer * buffer);
222 static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
224 static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
226 static gboolean gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux,
228 static gboolean gst_adaptive_demux_handle_select_streams_event (GstAdaptiveDemux
229 * demux, GstEvent * event);
232 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
234 static void gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux);
235 static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
236 static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
237 gboolean first_and_live);
240 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
242 static void gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux *
244 static void gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux *
247 static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux);
248 static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux,
249 gboolean stop_updates);
252 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
255 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
256 * method to get to the padtemplates */
258 gst_adaptive_demux_ng_get_type (void)
260 static gsize type = 0;
262 if (g_once_init_enter (&type)) {
264 static const GTypeInfo info = {
265 sizeof (GstAdaptiveDemuxClass),
268 (GClassInitFunc) gst_adaptive_demux_class_init,
271 sizeof (GstAdaptiveDemux),
273 (GInstanceInitFunc) gst_adaptive_demux_init,
276 _type = g_type_register_static (GST_TYPE_BIN,
277 "GstAdaptiveDemux2", &info, G_TYPE_FLAG_ABSTRACT);
280 g_type_add_instance_private (_type, sizeof (GstAdaptiveDemuxPrivate));
282 g_once_init_leave (&type, _type);
287 static inline GstAdaptiveDemuxPrivate *
288 gst_adaptive_demux_get_instance_private (GstAdaptiveDemux * self)
290 return (G_STRUCT_MEMBER_P (self, private_offset));
294 gst_adaptive_demux_set_property (GObject * object, guint prop_id,
295 const GValue * value, GParamSpec * pspec)
297 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
299 GST_OBJECT_LOCK (demux);
302 case PROP_CONNECTION_SPEED:
303 demux->connection_speed = g_value_get_uint (value) * 1000;
304 GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
305 demux->connection_speed);
307 case PROP_BANDWIDTH_TARGET_RATIO:
308 demux->bandwidth_target_ratio = g_value_get_float (value);
310 case PROP_MIN_BITRATE:
311 demux->min_bitrate = g_value_get_uint (value);
313 case PROP_MAX_BITRATE:
314 demux->max_bitrate = g_value_get_uint (value);
316 case PROP_CONNECTION_BITRATE:
317 demux->connection_speed = g_value_get_uint (value);
319 /* FIXME: Recalculate track and buffering levels
320 * when watermarks change? */
321 case PROP_MAX_BUFFERING_TIME:
322 demux->max_buffering_time = g_value_get_uint64 (value);
324 case PROP_BUFFERING_HIGH_WATERMARK_TIME:
325 demux->buffering_high_watermark_time = g_value_get_uint64 (value);
327 case PROP_BUFFERING_LOW_WATERMARK_TIME:
328 demux->buffering_low_watermark_time = g_value_get_uint64 (value);
330 case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
331 demux->buffering_high_watermark_fragments = g_value_get_double (value);
333 case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
334 demux->buffering_low_watermark_fragments = g_value_get_double (value);
336 #ifdef TIZEN_FEATURE_ADAPTIVE_VARIANT_LIMIT
338 demux->max_width = g_value_get_uint (value);
340 case PROP_MAX_HEIGHT:
341 demux->max_height = g_value_get_uint (value);
345 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
349 GST_OBJECT_UNLOCK (demux);
353 gst_adaptive_demux_get_property (GObject * object, guint prop_id,
354 GValue * value, GParamSpec * pspec)
356 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
358 GST_OBJECT_LOCK (demux);
361 case PROP_CONNECTION_SPEED:
362 g_value_set_uint (value, demux->connection_speed / 1000);
364 case PROP_BANDWIDTH_TARGET_RATIO:
365 g_value_set_float (value, demux->bandwidth_target_ratio);
367 case PROP_MIN_BITRATE:
368 g_value_set_uint (value, demux->min_bitrate);
370 case PROP_MAX_BITRATE:
371 g_value_set_uint (value, demux->max_bitrate);
373 case PROP_CONNECTION_BITRATE:
374 g_value_set_uint (value, demux->connection_speed);
376 case PROP_CURRENT_BANDWIDTH:
377 g_value_set_uint (value, demux->current_download_rate);
379 case PROP_MAX_BUFFERING_TIME:
380 g_value_set_uint64 (value, demux->max_buffering_time);
382 case PROP_BUFFERING_HIGH_WATERMARK_TIME:
383 g_value_set_uint64 (value, demux->buffering_high_watermark_time);
385 case PROP_BUFFERING_LOW_WATERMARK_TIME:
386 g_value_set_uint64 (value, demux->buffering_low_watermark_time);
388 case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
389 g_value_set_double (value, demux->buffering_high_watermark_fragments);
391 case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
392 g_value_set_double (value, demux->buffering_low_watermark_fragments);
394 case PROP_CURRENT_LEVEL_TIME_VIDEO:
395 g_value_set_uint64 (value, demux->current_level_time_video);
397 case PROP_CURRENT_LEVEL_TIME_AUDIO:
398 g_value_set_uint64 (value, demux->current_level_time_audio);
400 #ifdef TIZEN_FEATURE_ADAPTIVE_VARIANT_LIMIT
402 g_value_set_uint (value, demux->max_width);
404 case PROP_MAX_HEIGHT:
405 g_value_set_uint (value, demux->max_height);
409 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
413 GST_OBJECT_UNLOCK (demux);
417 gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
419 GObjectClass *gobject_class;
420 GstElementClass *gstelement_class;
421 GstBinClass *gstbin_class;
423 gobject_class = G_OBJECT_CLASS (klass);
424 gstelement_class = GST_ELEMENT_CLASS (klass);
425 gstbin_class = GST_BIN_CLASS (klass);
427 GST_DEBUG_CATEGORY_INIT (adaptivedemux2_debug, "adaptivedemux2", 0,
428 "Base Adaptive Demux (ng)");
430 parent_class = g_type_class_peek_parent (klass);
432 if (private_offset != 0)
433 g_type_class_adjust_private_offset (klass, &private_offset);
435 gobject_class->set_property = gst_adaptive_demux_set_property;
436 gobject_class->get_property = gst_adaptive_demux_get_property;
437 gobject_class->finalize = gst_adaptive_demux_finalize;
439 g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
440 g_param_spec_uint ("connection-speed", "Connection Speed",
441 "Network connection speed to use in kbps (0 = calculate from downloaded"
442 " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_BITRATE / 1000,
443 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
445 g_object_class_install_property (gobject_class, PROP_BANDWIDTH_TARGET_RATIO,
446 g_param_spec_float ("bandwidth-target-ratio",
447 "Ratio of target bandwidth / available bandwidth",
448 "Limit of the available bitrate to use when switching to alternates",
449 0, 1, DEFAULT_BANDWIDTH_TARGET_RATIO,
450 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
452 g_object_class_install_property (gobject_class, PROP_CONNECTION_BITRATE,
453 g_param_spec_uint ("connection-bitrate", "Connection Speed (bits/s)",
454 "Network connection speed to use (0 = automatic) (bits/s)",
455 0, G_MAXUINT, DEFAULT_CONNECTION_BITRATE,
456 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
458 g_object_class_install_property (gobject_class, PROP_MIN_BITRATE,
459 g_param_spec_uint ("min-bitrate", "Minimum Bitrate",
460 "Minimum bitrate to use when switching to alternates (bits/s)",
461 0, G_MAXUINT, DEFAULT_MIN_BITRATE,
462 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
464 g_object_class_install_property (gobject_class, PROP_MAX_BITRATE,
465 g_param_spec_uint ("max-bitrate", "Maximum Bitrate",
466 "Maximum bitrate to use when switching to alternates (bits/s)",
467 0, G_MAXUINT, DEFAULT_MAX_BITRATE,
468 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
470 #ifdef TIZEN_FEATURE_ADAPTIVE_VARIANT_LIMIT
471 g_object_class_install_property (gobject_class, PROP_MAX_WIDTH,
472 g_param_spec_uint ("max-video-width",
473 "Max video width limit",
474 "Maximum limit of the available video width to use when switching to alternates. (0 = no limit)",
475 0, G_MAXUINT, DEFAULT_MAX_RESOLUTION,
476 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
478 g_object_class_install_property (gobject_class, PROP_MAX_HEIGHT,
479 g_param_spec_uint ("max-video-height",
480 "Max video height limit",
481 "Maximum limit of the available video height to use when switching to alternates. (0 = no limit)",
482 0, G_MAXUINT, DEFAULT_MAX_RESOLUTION,
483 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
486 g_object_class_install_property (gobject_class, PROP_CURRENT_BANDWIDTH,
487 g_param_spec_uint ("current-bandwidth",
488 "Current download bandwidth (bits/s)",
489 "Report of current download bandwidth (based on arriving data) (bits/s)",
490 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
492 g_object_class_install_property (gobject_class, PROP_MAX_BUFFERING_TIME,
493 g_param_spec_uint64 ("max-buffering-time",
494 "Buffering maximum size (ns)",
495 "Upper limit on the high watermark for parsed data, above which downloads are paused (in ns, 0=disable)",
496 0, G_MAXUINT64, DEFAULT_MAX_BUFFERING_TIME,
497 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
498 G_PARAM_STATIC_STRINGS));
500 g_object_class_install_property (gobject_class,
501 PROP_BUFFERING_HIGH_WATERMARK_TIME,
502 g_param_spec_uint64 ("high-watermark-time",
503 "High buffering watermark size (ns)",
504 "High watermark for parsed data above which downloads are paused (in ns, 0=disable)",
505 0, G_MAXUINT64, DEFAULT_BUFFERING_HIGH_WATERMARK_TIME,
506 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
507 G_PARAM_STATIC_STRINGS));
509 g_object_class_install_property (gobject_class,
510 PROP_BUFFERING_LOW_WATERMARK_TIME,
511 g_param_spec_uint64 ("low-watermark-time",
512 "Low buffering watermark size (ns)",
513 "Low watermark for parsed data below which downloads are resumed (in ns, 0=automatic)",
514 0, G_MAXUINT64, DEFAULT_BUFFERING_LOW_WATERMARK_TIME,
515 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
516 G_PARAM_STATIC_STRINGS));
518 g_object_class_install_property (gobject_class,
519 PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
520 g_param_spec_double ("high-watermark-fragments",
521 "High buffering watermark size (fragments)",
522 "High watermark for parsed data above which downloads are paused (in fragments, 0=disable)",
523 0, G_MAXFLOAT, DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
524 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
525 G_PARAM_STATIC_STRINGS));
527 g_object_class_install_property (gobject_class,
528 PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
529 g_param_spec_double ("low-watermark-fragments",
530 "Low buffering watermark size (fragments)",
531 "Low watermark for parsed data below which downloads are resumed (in fragments, 0=disable)",
532 0, G_MAXFLOAT, DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS,
533 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
534 G_PARAM_STATIC_STRINGS));
536 g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_VIDEO,
537 g_param_spec_uint64 ("current-level-time-video",
538 "Currently buffered level of video (ns)",
539 "Currently buffered level of video track(s) (ns)",
540 0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_VIDEO,
541 G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
542 G_PARAM_STATIC_STRINGS));
544 g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_AUDIO,
545 g_param_spec_uint64 ("current-level-time-audio",
546 "Currently buffered level of audio (ns)",
547 "Currently buffered level of audio track(s) (ns)",
548 0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_AUDIO,
549 G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
550 G_PARAM_STATIC_STRINGS));
552 gst_element_class_add_static_pad_template (gstelement_class,
553 &gst_adaptive_demux_audiosrc_template);
554 gst_element_class_add_static_pad_template (gstelement_class,
555 &gst_adaptive_demux_videosrc_template);
556 gst_element_class_add_static_pad_template (gstelement_class,
557 &gst_adaptive_demux_subtitlesrc_template);
559 gstelement_class->change_state = gst_adaptive_demux_change_state;
560 gstelement_class->query = gst_adaptive_demux_query;
561 gstelement_class->send_event = gst_adaptive_demux_send_event;
563 gstbin_class->handle_message = gst_adaptive_demux_handle_message;
565 klass->update_manifest = gst_adaptive_demux_update_manifest_default;
566 klass->requires_periodical_playlist_update =
567 gst_adaptive_demux_requires_periodical_playlist_update_default;
568 gst_type_mark_as_plugin_api (GST_TYPE_ADAPTIVE_DEMUX, 0);
572 gst_adaptive_demux_init (GstAdaptiveDemux * demux,
573 GstAdaptiveDemuxClass * klass)
575 GstPadTemplate *pad_template;
577 GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
579 demux->priv = gst_adaptive_demux_get_instance_private (demux);
580 demux->priv->input_adapter = gst_adapter_new ();
581 demux->realtime_clock = gst_adaptive_demux_clock_new ();
583 demux->download_helper = downloadhelper_new (demux->realtime_clock);
584 demux->priv->segment_seqnum = gst_util_seqnum_next ();
585 demux->have_group_id = FALSE;
586 demux->group_id = G_MAXUINT;
588 gst_segment_init (&demux->segment, GST_FORMAT_TIME);
589 demux->instant_rate_multiplier = 1.0;
591 GST_OBJECT_FLAG_SET (demux, GST_BIN_FLAG_STREAMS_AWARE);
592 gst_bin_set_suppressed_flags (GST_BIN_CAST (demux),
593 GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
595 g_rec_mutex_init (&demux->priv->manifest_lock);
597 demux->priv->scheduler_task = gst_adaptive_demux_loop_new ();
599 g_mutex_init (&demux->priv->api_lock);
600 g_mutex_init (&demux->priv->segment_lock);
602 g_mutex_init (&demux->priv->tracks_lock);
603 g_cond_init (&demux->priv->tracks_add);
605 g_mutex_init (&demux->priv->buffering_lock);
607 demux->priv->periods = g_queue_new ();
610 gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
611 g_return_if_fail (pad_template != NULL);
613 demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
614 gst_pad_set_event_function (demux->sinkpad,
615 GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
616 gst_pad_set_chain_function (demux->sinkpad,
617 GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));
620 demux->bandwidth_target_ratio = DEFAULT_BANDWIDTH_TARGET_RATIO;
621 demux->connection_speed = DEFAULT_CONNECTION_BITRATE;
622 demux->min_bitrate = DEFAULT_MIN_BITRATE;
623 demux->max_bitrate = DEFAULT_MAX_BITRATE;
624 #ifdef TIZEN_FEATURE_ADAPTIVE_VARIANT_LIMIT
625 demux->max_width = DEFAULT_MAX_RESOLUTION;
626 demux->max_height = DEFAULT_MAX_RESOLUTION;
629 demux->max_buffering_time = DEFAULT_MAX_BUFFERING_TIME;
630 demux->buffering_high_watermark_time = DEFAULT_BUFFERING_HIGH_WATERMARK_TIME;
631 demux->buffering_low_watermark_time = DEFAULT_BUFFERING_LOW_WATERMARK_TIME;
632 demux->buffering_high_watermark_fragments =
633 DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS;
634 demux->buffering_low_watermark_fragments =
635 DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS;
637 demux->current_level_time_video = DEFAULT_CURRENT_LEVEL_TIME_VIDEO;
638 demux->current_level_time_audio = DEFAULT_CURRENT_LEVEL_TIME_AUDIO;
640 gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
642 demux->priv->duration = GST_CLOCK_TIME_NONE;
644 /* Output combiner */
645 demux->priv->flowcombiner = gst_flow_combiner_new ();
648 g_rec_mutex_init (&demux->priv->output_lock);
649 demux->priv->output_task =
650 gst_task_new ((GstTaskFunction) gst_adaptive_demux_output_loop, demux,
652 gst_task_set_lock (demux->priv->output_task, &demux->priv->output_lock);
656 gst_adaptive_demux_finalize (GObject * object)
658 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
659 GstAdaptiveDemuxPrivate *priv = demux->priv;
661 GST_DEBUG_OBJECT (object, "finalize");
663 g_object_unref (priv->input_adapter);
665 downloadhelper_free (demux->download_helper);
667 g_rec_mutex_clear (&demux->priv->manifest_lock);
668 g_mutex_clear (&demux->priv->api_lock);
669 g_mutex_clear (&demux->priv->segment_lock);
671 g_mutex_clear (&demux->priv->buffering_lock);
673 gst_adaptive_demux_loop_unref (demux->priv->scheduler_task);
675 /* The input period is present after a reset, clear it now */
676 if (demux->input_period)
677 gst_adaptive_demux_period_unref (demux->input_period);
679 if (demux->realtime_clock) {
680 gst_adaptive_demux_clock_unref (demux->realtime_clock);
681 demux->realtime_clock = NULL;
683 g_object_unref (priv->output_task);
684 g_rec_mutex_clear (&priv->output_lock);
686 gst_flow_combiner_free (priv->flowcombiner);
688 g_queue_free (priv->periods);
690 G_OBJECT_CLASS (parent_class)->finalize (object);
694 gst_adaptive_demux_check_streams_aware (GstAdaptiveDemux * demux)
696 gboolean ret = FALSE;
697 GstObject *parent = gst_object_get_parent (GST_OBJECT (demux));
700 ret = GST_OBJECT_FLAG_IS_SET (parent, GST_BIN_FLAG_STREAMS_AWARE);
701 gst_object_unref (parent);
707 static GstStateChangeReturn
708 gst_adaptive_demux_change_state (GstElement * element,
709 GstStateChange transition)
711 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
712 GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
714 switch (transition) {
715 case GST_STATE_CHANGE_NULL_TO_READY:
716 if (!gst_adaptive_demux_check_streams_aware (demux)) {
717 GST_ELEMENT_ERROR (demux, CORE, STATE_CHANGE,
718 (_("Element requires a streams-aware context.")), (NULL));
722 case GST_STATE_CHANGE_PAUSED_TO_READY:
723 if (g_atomic_int_compare_and_exchange (&demux->running, TRUE, FALSE))
724 GST_DEBUG_OBJECT (demux, "demuxer has stopped running");
726 gst_adaptive_demux_loop_stop (demux->priv->scheduler_task, TRUE);
727 downloadhelper_stop (demux->download_helper);
730 demux->priv->flushing = TRUE;
731 g_cond_signal (&demux->priv->tracks_add);
732 gst_task_stop (demux->priv->output_task);
733 TRACKS_UNLOCK (demux);
735 gst_task_join (demux->priv->output_task);
737 GST_API_LOCK (demux);
738 gst_adaptive_demux_reset (demux);
739 GST_API_UNLOCK (demux);
741 case GST_STATE_CHANGE_READY_TO_PAUSED:
742 GST_API_LOCK (demux);
743 gst_adaptive_demux_reset (demux);
745 gst_adaptive_demux_loop_start (demux->priv->scheduler_task);
746 if (g_atomic_int_get (&demux->priv->have_manifest))
747 gst_adaptive_demux_start_manifest_update_task (demux);
748 GST_API_UNLOCK (demux);
749 if (g_atomic_int_compare_and_exchange (&demux->running, FALSE, TRUE))
750 GST_DEBUG_OBJECT (demux, "demuxer has started running");
751 /* gst_task_start (demux->priv->output_task); */
757 /* this must be run with the scheduler and output tasks stopped. */
758 result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
760 switch (transition) {
761 case GST_STATE_CHANGE_READY_TO_PAUSED:
762 /* Start download task */
763 downloadhelper_start (demux->download_helper);
774 gst_adaptive_demux_output_slot_free (GstAdaptiveDemux * demux,
777 GstEvent *eos = gst_event_new_eos ();
778 GST_DEBUG_OBJECT (slot->pad, "Releasing slot");
780 /* FIXME: The slot might not have output any data, caps or segment yet */
781 gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
782 gst_pad_push_event (slot->pad, eos);
783 gst_pad_set_active (slot->pad, FALSE);
784 gst_flow_combiner_remove_pad (demux->priv->flowcombiner, slot->pad);
785 gst_element_remove_pad (GST_ELEMENT_CAST (demux), slot->pad);
787 gst_adaptive_demux_track_unref (slot->track);
788 if (slot->pending_track)
789 gst_adaptive_demux_track_unref (slot->pending_track);
791 g_slice_free (OutputSlot, slot);
795 gst_adaptive_demux_output_slot_new (GstAdaptiveDemux * demux,
796 GstStreamType streamtype)
799 GstPadTemplate *tmpl;
802 switch (streamtype) {
803 case GST_STREAM_TYPE_AUDIO:
804 name = g_strdup_printf ("audio_%02u", demux->priv->n_audio_streams++);
806 gst_static_pad_template_get (&gst_adaptive_demux_audiosrc_template);
808 case GST_STREAM_TYPE_VIDEO:
809 name = g_strdup_printf ("video_%02u", demux->priv->n_video_streams++);
811 gst_static_pad_template_get (&gst_adaptive_demux_videosrc_template);
813 case GST_STREAM_TYPE_TEXT:
815 g_strdup_printf ("subtitle_%02u", demux->priv->n_subtitle_streams++);
817 gst_static_pad_template_get
818 (&gst_adaptive_demux_subtitlesrc_template);
821 g_assert_not_reached ();
825 slot = g_slice_new0 (OutputSlot);
826 slot->type = streamtype;
827 slot->pushed_timed_data = FALSE;
829 /* Create and activate new pads */
830 slot->pad = gst_pad_new_from_template (tmpl, name);
832 gst_object_unref (tmpl);
834 gst_element_add_pad (GST_ELEMENT_CAST (demux), slot->pad);
835 gst_flow_combiner_add_pad (demux->priv->flowcombiner, slot->pad);
836 gst_pad_set_active (slot->pad, TRUE);
838 gst_pad_set_query_function (slot->pad,
839 GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
840 gst_pad_set_event_function (slot->pad,
841 GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_event));
843 gst_pad_set_element_private (slot->pad, slot);
845 GST_INFO_OBJECT (demux, "Created output slot %s:%s",
846 GST_DEBUG_PAD_NAME (slot->pad));
851 * * After `process_manifest` or when a period starts
852 * * Or when all tracks have been created
854 * Goes over tracks and creates the collection
856 * Returns TRUE if the collection was fully created.
858 * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
861 gst_adaptive_demux_update_collection (GstAdaptiveDemux * demux,
862 GstAdaptiveDemuxPeriod * period)
864 GstStreamCollection *collection;
867 GST_DEBUG_OBJECT (demux, "tracks_changed : %d", period->tracks_changed);
869 if (!period->tracks_changed) {
870 GST_DEBUG_OBJECT (demux, "Tracks didn't change");
874 if (!period->tracks) {
875 GST_WARNING_OBJECT (demux, "No tracks registered/present");
879 if (gst_adaptive_demux_period_has_pending_tracks (period)) {
880 GST_DEBUG_OBJECT (demux,
881 "Streams still have pending tracks, not creating/updating collection");
885 /* Update collection */
886 collection = gst_stream_collection_new ("adaptivedemux");
888 for (iter = period->tracks; iter; iter = iter->next) {
889 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
891 GST_DEBUG_OBJECT (demux, "Adding '%s' to collection", track->stream_id);
892 gst_stream_collection_add_stream (collection,
893 gst_object_ref (track->stream_object));
896 if (period->collection)
897 gst_object_unref (period->collection);
898 period->collection = collection;
904 * Called for the output period:
905 * * after `update_collection()` if the input period is the same as the output period
906 * * When the output period changes
908 * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
911 gst_adaptive_demux_post_collection (GstAdaptiveDemux * demux)
913 GstStreamCollection *collection;
914 GstAdaptiveDemuxPeriod *period = demux->output_period;
915 guint32 seqnum = g_atomic_int_get (&demux->priv->requested_selection_seqnum);
917 g_return_val_if_fail (period, FALSE);
918 if (!period->collection) {
919 GST_DEBUG_OBJECT (demux, "No collection available yet");
923 collection = period->collection;
925 GST_DEBUG_OBJECT (demux, "Posting collection for period %d",
928 /* Post collection */
929 TRACKS_UNLOCK (demux);
930 GST_MANIFEST_UNLOCK (demux);
932 gst_element_post_message (GST_ELEMENT_CAST (demux),
933 gst_message_new_stream_collection (GST_OBJECT (demux), collection));
935 GST_MANIFEST_LOCK (demux);
938 /* If no stream selection was handled, make a default selection */
939 if (seqnum == g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
940 gst_adaptive_demux_period_select_default_tracks (demux,
941 demux->output_period);
944 /* Make sure the output task is running */
945 if (gst_adaptive_demux2_is_running (demux)) {
946 demux->priv->flushing = FALSE;
947 GST_DEBUG_OBJECT (demux, "Starting the output task");
948 gst_task_start (demux->priv->output_task);
955 handle_incoming_manifest (GstAdaptiveDemux * demux)
957 GstAdaptiveDemuxClass *demux_class;
962 GstBuffer *manifest_buffer;
964 GST_API_LOCK (demux);
965 GST_MANIFEST_LOCK (demux);
967 demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
969 available = gst_adapter_available (demux->priv->input_adapter);
972 goto eos_without_data;
974 GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
976 /* Need to get the URI to use it as a base to generate the fragment's
978 query = gst_query_new_uri ();
979 query_res = gst_pad_peer_query (demux->sinkpad, query);
981 gchar *uri, *redirect_uri;
984 gst_query_parse_uri (query, &uri);
985 gst_query_parse_uri_redirection (query, &redirect_uri);
986 gst_query_parse_uri_redirection_permanent (query, &permanent);
988 if (permanent && redirect_uri) {
989 demux->manifest_uri = redirect_uri;
990 demux->manifest_base_uri = NULL;
993 demux->manifest_uri = uri;
994 demux->manifest_base_uri = redirect_uri;
997 GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
998 demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
1000 if (!g_str_has_prefix (demux->manifest_uri, "data:")
1001 && !g_str_has_prefix (demux->manifest_uri, "http://")
1002 && !g_str_has_prefix (demux->manifest_uri, "https://")) {
1003 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1004 (_("Invalid manifest URI")),
1005 ("Manifest URI needs to use either data:, http:// or https://"));
1006 gst_query_unref (query);
1011 GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
1013 gst_query_unref (query);
1015 /* If somehow we didn't receive a stream-start with a group_id, pick one now */
1016 if (!demux->have_group_id) {
1017 demux->have_group_id = TRUE;
1018 demux->group_id = gst_util_group_id_next ();
1021 /* Let the subclass parse the manifest */
1023 gst_adapter_take_buffer (demux->priv->input_adapter, available);
1024 ret = demux_class->process_manifest (demux, manifest_buffer);
1025 gst_buffer_unref (manifest_buffer);
1027 gst_element_post_message (GST_ELEMENT_CAST (demux),
1028 gst_message_new_element (GST_OBJECT_CAST (demux),
1029 gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
1030 "manifest-uri", G_TYPE_STRING,
1031 demux->manifest_uri, "uri", G_TYPE_STRING,
1032 demux->manifest_uri,
1033 "manifest-download-start", GST_TYPE_CLOCK_TIME,
1034 GST_CLOCK_TIME_NONE,
1035 "manifest-download-stop", GST_TYPE_CLOCK_TIME,
1036 gst_util_get_timestamp (), NULL)));
1039 goto invalid_manifest;
1041 /* Streams should have been added to the input period if the manifest parsing
1043 if (!demux->input_period->streams)
1046 g_atomic_int_set (&demux->priv->have_manifest, TRUE);
1048 GST_DEBUG_OBJECT (demux, "Manifest was processed, setting ourselves up");
1049 /* Send duration message */
1050 if (!gst_adaptive_demux_is_live (demux)) {
1051 GstClockTime duration = demux_class->get_duration (demux);
1053 demux->priv->duration = duration;
1054 if (duration != GST_CLOCK_TIME_NONE) {
1055 GST_DEBUG_OBJECT (demux,
1056 "Sending duration message : %" GST_TIME_FORMAT,
1057 GST_TIME_ARGS (duration));
1058 gst_element_post_message (GST_ELEMENT (demux),
1059 gst_message_new_duration_changed (GST_OBJECT (demux)));
1061 GST_DEBUG_OBJECT (demux,
1062 "media duration unknown, can not send the duration message");
1066 TRACKS_LOCK (demux);
1067 /* New streams/tracks will have been added to the input period */
1068 /* The input period has streams, make it the active output period */
1069 /* FIXME : Factorize this into a function to make a period active */
1070 demux->output_period = gst_adaptive_demux_period_ref (demux->input_period);
1071 ret = gst_adaptive_demux_update_collection (demux, demux->output_period) &&
1072 gst_adaptive_demux_post_collection (demux);
1073 TRACKS_UNLOCK (demux);
1075 gst_adaptive_demux_prepare_streams (demux,
1076 gst_adaptive_demux_is_live (demux));
1077 gst_adaptive_demux_start_tasks (demux);
1078 gst_adaptive_demux_start_manifest_update_task (demux);
1081 GST_MANIFEST_UNLOCK (demux);
1082 GST_API_UNLOCK (demux);
1089 GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
1097 GST_WARNING_OBJECT (demux, "No streams created from manifest");
1098 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1099 (_("This file contains no playable streams.")),
1100 ("No known stream formats found at the Manifest"));
1107 GST_MANIFEST_UNLOCK (demux);
1108 GST_API_UNLOCK (demux);
1110 /* In most cases, this will happen if we set a wrong url in the
1111 * source element and we have received the 404 HTML response instead of
1113 GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."), (NULL));
1118 struct http_headers_collector
1120 GstAdaptiveDemux *demux;
1125 gst_adaptive_demux_handle_upstream_http_header (GQuark field_id,
1126 const GValue * value, gpointer userdata)
1128 struct http_headers_collector *hdr_data = userdata;
1129 GstAdaptiveDemux *demux = hdr_data->demux;
1130 const gchar *field_name = g_quark_to_string (field_id);
1132 if (G_UNLIKELY (value == NULL))
1133 return TRUE; /* This should not happen */
1135 if (g_ascii_strcasecmp (field_name, "User-Agent") == 0) {
1136 const gchar *user_agent = g_value_get_string (value);
1138 GST_INFO_OBJECT (demux, "User-Agent : %s", GST_STR_NULL (user_agent));
1139 downloadhelper_set_user_agent (demux->download_helper, user_agent);
1142 if ((g_ascii_strcasecmp (field_name, "Cookie") == 0) ||
1143 g_ascii_strcasecmp (field_name, "Set-Cookie") == 0) {
1144 guint i = 0, prev_len = 0, total_len = 0;
1145 gchar **cookies = NULL;
1147 if (hdr_data->cookies != NULL)
1148 prev_len = g_strv_length (hdr_data->cookies);
1150 if (GST_VALUE_HOLDS_ARRAY (value)) {
1151 total_len = gst_value_array_get_size (value) + prev_len;
1152 cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1154 for (i = 0; i < gst_value_array_get_size (value); i++) {
1155 GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1156 g_value_get_string (gst_value_array_get_value (value, i)));
1157 cookies[i] = g_value_dup_string (gst_value_array_get_value (value, i));
1159 } else if (G_VALUE_HOLDS_STRING (value)) {
1160 total_len = 1 + prev_len;
1161 cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1163 GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1164 g_value_get_string (value));
1165 cookies[0] = g_value_dup_string (value);
1167 GST_WARNING_OBJECT (demux, "%s field is not string or array",
1168 g_quark_to_string (field_id));
1174 for (j = 0; j < prev_len; j++) {
1175 GST_DEBUG_OBJECT (demux,
1176 "Append existing cookie %s", hdr_data->cookies[j]);
1177 cookies[i + j] = g_strdup (hdr_data->cookies[j]);
1180 cookies[total_len] = NULL;
1182 g_strfreev (hdr_data->cookies);
1183 hdr_data->cookies = cookies;
1187 if (g_ascii_strcasecmp (field_name, "Referer") == 0) {
1188 const gchar *referer = g_value_get_string (value);
1189 GST_INFO_OBJECT (demux, "Referer : %s", GST_STR_NULL (referer));
1191 downloadhelper_set_referer (demux->download_helper, referer);
1194 /* Date header can be used to estimate server offset */
1195 if (g_ascii_strcasecmp (field_name, "Date") == 0) {
1196 const gchar *http_date = g_value_get_string (value);
1199 GstDateTime *datetime =
1200 gst_adaptive_demux_util_parse_http_head_date (http_date);
1203 GDateTime *utc_now = gst_date_time_to_g_date_time (datetime);
1204 gchar *date_string = gst_date_time_to_iso8601_string (datetime);
1206 GST_INFO_OBJECT (demux,
1207 "HTTP response Date %s", GST_STR_NULL (date_string));
1208 g_free (date_string);
1210 gst_adaptive_demux_clock_set_utc_time (demux->realtime_clock, utc_now);
1212 g_date_time_unref (utc_now);
1213 gst_date_time_unref (datetime);
1222 gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
1225 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1228 switch (event->type) {
1229 case GST_EVENT_FLUSH_STOP:{
1230 GST_API_LOCK (demux);
1231 GST_MANIFEST_LOCK (demux);
1233 gst_adaptive_demux_reset (demux);
1235 ret = gst_pad_event_default (pad, parent, event);
1237 GST_MANIFEST_UNLOCK (demux);
1238 GST_API_UNLOCK (demux);
1244 if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1245 if (!handle_incoming_manifest (demux)) {
1246 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1247 return gst_pad_event_default (pad, parent, event);
1249 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1251 GST_ERROR_OBJECT (demux,
1252 "Failed to acquire scheduler to handle manifest");
1253 return gst_pad_event_default (pad, parent, event);
1255 gst_event_unref (event);
1258 case GST_EVENT_STREAM_START:
1259 if (gst_event_parse_group_id (event, &demux->group_id))
1260 demux->have_group_id = TRUE;
1262 demux->have_group_id = FALSE;
1263 /* Swallow stream-start, we'll push our own */
1264 gst_event_unref (event);
1266 case GST_EVENT_SEGMENT:
1267 /* Swallow newsegments, we'll push our own */
1268 gst_event_unref (event);
1270 case GST_EVENT_CUSTOM_DOWNSTREAM_STICKY:{
1271 const GstStructure *structure = gst_event_get_structure (event);
1272 struct http_headers_collector c = { demux, NULL };
1274 if (gst_structure_has_name (structure, "http-headers")) {
1275 if (gst_structure_has_field (structure, "request-headers")) {
1276 GstStructure *req_headers = NULL;
1277 gst_structure_get (structure, "request-headers", GST_TYPE_STRUCTURE,
1278 &req_headers, NULL);
1280 gst_structure_foreach (req_headers,
1281 gst_adaptive_demux_handle_upstream_http_header, &c);
1282 gst_structure_free (req_headers);
1285 if (gst_structure_has_field (structure, "response-headers")) {
1286 GstStructure *res_headers = NULL;
1287 gst_structure_get (structure, "response-headers", GST_TYPE_STRUCTURE,
1288 &res_headers, NULL);
1290 gst_structure_foreach (res_headers,
1291 gst_adaptive_demux_handle_upstream_http_header, &c);
1292 gst_structure_free (res_headers);
1297 downloadhelper_set_cookies (demux->download_helper, c.cookies);
1305 return gst_pad_event_default (pad, parent, event);
1308 static GstFlowReturn
1309 gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
1312 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1314 GST_MANIFEST_LOCK (demux);
1316 gst_adapter_push (demux->priv->input_adapter, buffer);
1318 GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
1319 (gint) gst_adapter_available (demux->priv->input_adapter));
1321 GST_MANIFEST_UNLOCK (demux);
1326 /* Called with TRACKS_LOCK taken */
1328 gst_adaptive_demux_period_reset_tracks (GstAdaptiveDemuxPeriod * period)
1332 for (tmp = period->tracks; tmp; tmp = tmp->next) {
1333 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1335 gst_adaptive_demux_track_flush (track);
1336 if (gst_pad_is_active (track->sinkpad)) {
1337 gst_pad_set_active (track->sinkpad, FALSE);
1338 gst_pad_set_active (track->sinkpad, TRUE);
1343 /* Resets all tracks to their initial state, ready to receive new data. */
1345 gst_adaptive_demux_reset_tracks (GstAdaptiveDemux * demux)
1347 TRACKS_LOCK (demux);
1348 g_queue_foreach (demux->priv->periods,
1349 (GFunc) gst_adaptive_demux_period_reset_tracks, NULL);
1350 TRACKS_UNLOCK (demux);
1353 /* Subclasses will call this function to ensure that a new input period is
1354 * available to receive new streams and tracks */
1356 gst_adaptive_demux_start_new_period (GstAdaptiveDemux * demux)
1358 if (demux->input_period && !demux->input_period->prepared) {
1359 GST_DEBUG_OBJECT (demux, "Using existing input period");
1363 if (demux->input_period) {
1364 GST_DEBUG_OBJECT (demux, "Marking that previous period has a next one");
1365 demux->input_period->has_next_period = TRUE;
1367 GST_DEBUG_OBJECT (demux, "Setting up new period");
1369 demux->input_period = gst_adaptive_demux_period_new (demux);
1374 /* must be called with manifest_lock taken */
1376 gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
1378 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1381 gst_adaptive_demux_stop_tasks (demux, TRUE);
1384 klass->reset (demux);
1386 /* Disable and remove all outputs */
1387 GST_DEBUG_OBJECT (demux, "Disabling and removing all outputs");
1388 for (iter = demux->priv->outputs; iter; iter = iter->next) {
1389 gst_adaptive_demux_output_slot_free (demux, (OutputSlot *) iter->data);
1391 g_list_free (demux->priv->outputs);
1392 demux->priv->outputs = NULL;
1394 g_queue_clear_full (demux->priv->periods,
1395 (GDestroyNotify) gst_adaptive_demux_period_unref);
1397 /* The output period always has an extra ref taken on it */
1398 if (demux->output_period)
1399 gst_adaptive_demux_period_unref (demux->output_period);
1400 demux->output_period = NULL;
1401 /* The input period doesn't have an extra ref taken on it */
1402 demux->input_period = NULL;
1404 gst_adaptive_demux_start_new_period (demux);
1406 g_free (demux->manifest_uri);
1407 g_free (demux->manifest_base_uri);
1408 demux->manifest_uri = NULL;
1409 demux->manifest_base_uri = NULL;
1411 gst_adapter_clear (demux->priv->input_adapter);
1412 g_atomic_int_set (&demux->priv->have_manifest, FALSE);
1414 gst_segment_init (&demux->segment, GST_FORMAT_TIME);
1415 demux->instant_rate_multiplier = 1.0;
1417 demux->priv->duration = GST_CLOCK_TIME_NONE;
1419 demux->priv->percent = -1;
1420 demux->priv->is_buffering = TRUE;
1422 demux->have_group_id = FALSE;
1423 demux->group_id = G_MAXUINT;
1424 demux->priv->segment_seqnum = gst_util_seqnum_next ();
1426 demux->priv->global_output_position = 0;
1428 demux->priv->n_audio_streams = 0;
1429 demux->priv->n_video_streams = 0;
1430 demux->priv->n_subtitle_streams = 0;
1432 gst_flow_combiner_reset (demux->priv->flowcombiner);
1436 gst_adaptive_demux_send_event (GstElement * element, GstEvent * event)
1438 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
1439 gboolean res = FALSE;
1441 GST_DEBUG_OBJECT (demux, "Received event %" GST_PTR_FORMAT, event);
1443 switch (GST_EVENT_TYPE (event)) {
1444 case GST_EVENT_SEEK:
1446 res = gst_adaptive_demux_handle_seek_event (demux, event);
1449 case GST_EVENT_SELECT_STREAMS:
1451 res = gst_adaptive_demux_handle_select_streams_event (demux, event);
1455 res = GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
1461 /* MANIFEST_LOCK held. Find the stream that owns the given element */
1462 static GstAdaptiveDemux2Stream *
1463 find_stream_for_element_locked (GstAdaptiveDemux * demux, GstObject * o)
1467 /* We only look in the streams of the input period (i.e. with active streams) */
1468 for (iter = demux->input_period->streams; iter; iter = iter->next) {
1469 GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1470 if (gst_object_has_as_ancestor (o, GST_OBJECT_CAST (stream->parsebin))) {
1479 gst_adaptive_demux_handle_stream_collection_msg (GstAdaptiveDemux * demux,
1482 GstAdaptiveDemux2Stream *stream;
1483 GstStreamCollection *collection = NULL;
1484 gboolean pending_tracks_activated = FALSE;
1486 GST_MANIFEST_LOCK (demux);
1488 stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1489 if (stream == NULL) {
1490 GST_WARNING_OBJECT (demux,
1491 "Failed to locate stream for collection message");
1495 gst_message_parse_stream_collection (msg, &collection);
1499 TRACKS_LOCK (demux);
1501 if (!gst_adaptive_demux2_stream_handle_collection (stream, collection,
1502 &pending_tracks_activated)) {
1503 TRACKS_UNLOCK (demux);
1505 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1506 (_("Stream format can't be handled")),
1507 ("The streams provided by the multiplex are ambiguous"));
1511 if (pending_tracks_activated) {
1512 /* If pending tracks were handled, then update the demuxer collection */
1513 if (gst_adaptive_demux_update_collection (demux, demux->input_period) &&
1514 demux->input_period == demux->output_period) {
1515 gst_adaptive_demux_post_collection (demux);
1518 /* If we discovered pending tracks and we no longer have any, we can ensure
1519 * selected tracks are started */
1520 if (!gst_adaptive_demux_period_has_pending_tracks (demux->input_period)) {
1521 GList *iter = demux->input_period->streams;
1522 for (; iter; iter = iter->next) {
1523 GstAdaptiveDemux2Stream *new_stream = iter->data;
1525 /* The stream that posted this collection was already started. If a
1526 * different stream is now selected, start it */
1527 if (stream != new_stream
1528 && gst_adaptive_demux2_stream_is_selected_locked (new_stream))
1529 gst_adaptive_demux2_stream_start (new_stream);
1533 TRACKS_UNLOCK (demux);
1536 GST_MANIFEST_UNLOCK (demux);
1539 gst_object_unref (collection);
1540 gst_message_unref (msg);
1545 gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
1547 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);
1549 switch (GST_MESSAGE_TYPE (msg)) {
1550 case GST_MESSAGE_STREAM_COLLECTION:
1552 gst_adaptive_demux_handle_stream_collection_msg (demux, msg);
1555 case GST_MESSAGE_ERROR:{
1556 GstAdaptiveDemux2Stream *stream = NULL;
1558 gchar *debug = NULL;
1559 gchar *new_error = NULL;
1560 const GstStructure *details = NULL;
1562 GST_MANIFEST_LOCK (demux);
1564 stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1565 if (stream == NULL) {
1566 GST_WARNING_OBJECT (demux,
1567 "Failed to locate stream for errored element");
1568 GST_MANIFEST_UNLOCK (demux);
1572 gst_message_parse_error (msg, &err, &debug);
1574 GST_WARNING_OBJECT (demux,
1575 "Source posted error: %d:%d %s (%s)", err->domain, err->code,
1576 err->message, debug);
1579 new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
1581 g_free (err->message);
1582 err->message = new_error;
1585 gst_message_parse_error_details (msg, &details);
1587 gst_structure_get_uint (details, "http-status-code",
1588 &stream->last_status_code);
1591 /* error, but ask to retry */
1592 if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1593 gst_adaptive_demux2_stream_parse_error (stream, err);
1594 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1600 GST_MANIFEST_UNLOCK (demux);
1602 gst_message_unref (msg);
1611 GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
1614 /* must be called with manifest_lock taken */
1616 gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
1618 GstAdaptiveDemuxClass *klass;
1620 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1622 if (klass->get_period_start_time == NULL)
1625 return klass->get_period_start_time (demux);
1628 /* must be called with manifest_lock taken */
1630 gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
1631 gboolean first_and_live)
1634 GstClockTime period_start;
1635 GstClockTimeDiff min_stream_time = GST_CLOCK_STIME_NONE;
1638 g_return_val_if_fail (demux->input_period->streams, FALSE);
1639 g_assert (demux->input_period->prepared == FALSE);
1641 new_streams = demux->input_period->streams;
1643 if (!gst_adaptive_demux2_is_running (demux)) {
1644 GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
1648 GST_DEBUG_OBJECT (demux,
1649 "Preparing %d streams for period %d , first_and_live:%d",
1650 g_list_length (new_streams), demux->input_period->period_num,
1653 for (iter = new_streams; iter; iter = g_list_next (iter)) {
1654 GstAdaptiveDemux2Stream *stream = iter->data;
1656 GST_DEBUG_OBJECT (stream, "Preparing stream");
1658 stream->need_header = TRUE;
1659 stream->discont = TRUE;
1661 /* Grab the first stream time for live streams
1662 * * If the stream is selected
1663 * * Or it provides dynamic tracks (in which case we need to force an update)
1666 && (gst_adaptive_demux2_stream_is_selected_locked (stream)
1667 || stream->pending_tracks)) {
1668 /* TODO we only need the first timestamp, maybe create a simple function to
1669 * get the current PTS of a fragment ? */
1670 GST_DEBUG_OBJECT (stream, "Calling update_fragment_info");
1671 gst_adaptive_demux2_stream_update_fragment_info (stream);
1673 GST_DEBUG_OBJECT (stream,
1674 "Got stream time %" GST_STIME_FORMAT,
1675 GST_STIME_ARGS (stream->fragment.stream_time));
1677 if (GST_CLOCK_STIME_IS_VALID (min_stream_time)) {
1678 min_stream_time = MIN (min_stream_time, stream->fragment.stream_time);
1680 min_stream_time = stream->fragment.stream_time;
1685 period_start = gst_adaptive_demux_get_period_start_time (demux);
1687 /* For live streams, the subclass is supposed to seek to the current fragment
1688 * and then tell us its stream time in stream->fragment.stream_time. We now
1689 * also have to seek our demuxer segment to reflect this.
1691 * FIXME: This needs some refactoring at some point.
1693 if (first_and_live) {
1694 gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
1695 GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_stream_time + period_start,
1696 GST_SEEK_TYPE_NONE, -1, NULL);
1699 GST_DEBUG_OBJECT (demux,
1700 "period_start:%" GST_TIME_FORMAT ", min_stream_time:%" GST_STIME_FORMAT
1701 " demux segment %" GST_SEGMENT_FORMAT,
1702 GST_TIME_ARGS (period_start), GST_STIME_ARGS (min_stream_time),
1705 /* Synchronize stream start/current positions */
1706 if (min_stream_time == GST_CLOCK_STIME_NONE)
1707 min_stream_time = period_start;
1709 min_stream_time += period_start;
1710 for (iter = new_streams; iter; iter = g_list_next (iter)) {
1711 GstAdaptiveDemux2Stream *stream = iter->data;
1712 stream->start_position = stream->current_position = min_stream_time;
1715 for (iter = new_streams; iter; iter = g_list_next (iter)) {
1716 GstAdaptiveDemux2Stream *stream = iter->data;
1717 stream->compute_segment = TRUE;
1718 stream->first_and_live = first_and_live;
1720 demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
1721 demux->input_period->prepared = TRUE;
1726 static GstAdaptiveDemuxTrack *
1727 find_track_for_stream_id (GstAdaptiveDemuxPeriod * period, gchar * stream_id)
1731 for (tmp = period->tracks; tmp; tmp = tmp->next) {
1732 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1733 if (!g_strcmp0 (track->stream_id, stream_id))
1740 /* TRACKS_LOCK hold */
1742 demux_update_buffering_locked (GstAdaptiveDemux * demux)
1744 GstClockTime min_level_time = GST_CLOCK_TIME_NONE;
1745 GstClockTime video_level_time = GST_CLOCK_TIME_NONE;
1746 GstClockTime audio_level_time = GST_CLOCK_TIME_NONE;
1748 gint min_percent = -1, percent;
1749 gboolean all_eos = TRUE;
1751 /* Go over all active tracks of the output period and update level */
1753 /* Check that all tracks are above their respective low thresholds (different
1754 * tracks may have different fragment durations yielding different buffering
1755 * percentages) Overall buffering percent is the lowest. */
1756 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
1757 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1759 GST_LOG_OBJECT (demux,
1760 "Checking track '%s' (period %u) active:%d selected:%d eos:%d level:%"
1761 GST_TIME_FORMAT " buffering_threshold:%" GST_TIME_FORMAT,
1762 track->stream_id, track->period_num, track->active, track->selected,
1763 track->eos, GST_TIME_ARGS (track->level_time),
1764 GST_TIME_ARGS (track->buffering_threshold));
1766 if (track->active && track->selected) {
1771 if (min_level_time == GST_CLOCK_TIME_NONE) {
1772 min_level_time = track->level_time;
1773 } else if (track->level_time < min_level_time) {
1774 min_level_time = track->level_time;
1777 if (track->type & GST_STREAM_TYPE_VIDEO
1778 && video_level_time > track->level_time)
1779 video_level_time = track->level_time;
1781 if (track->type & GST_STREAM_TYPE_AUDIO
1782 && audio_level_time > track->level_time)
1783 audio_level_time = track->level_time;
1785 if (track->level_time != GST_CLOCK_TIME_NONE
1786 && track->buffering_threshold != 0) {
1788 gst_util_uint64_scale (track->level_time, 100,
1789 track->buffering_threshold);
1790 if (min_percent < 0 || cur_percent < min_percent)
1791 min_percent = cur_percent;
1797 GST_DEBUG_OBJECT (demux,
1798 "Minimum time level %" GST_TIME_FORMAT " percent %d all_eos:%d",
1799 GST_TIME_ARGS (min_level_time), min_percent, all_eos);
1801 /* Update demuxer video/audio level properties */
1802 GST_OBJECT_LOCK (demux);
1803 demux->current_level_time_video = video_level_time;
1804 demux->current_level_time_audio = audio_level_time;
1805 GST_OBJECT_UNLOCK (demux);
1807 if (min_percent < 0 && !all_eos)
1810 if (min_percent > 100 || all_eos)
1813 percent = MAX (0, min_percent);
1815 GST_LOG_OBJECT (demux, "percent : %d %%", percent);
1817 if (demux->priv->is_buffering) {
1819 demux->priv->is_buffering = FALSE;
1820 if (demux->priv->percent != percent) {
1821 demux->priv->percent = percent;
1822 demux->priv->percent_changed = TRUE;
1824 } else if (percent < 1) {
1825 demux->priv->is_buffering = TRUE;
1826 if (demux->priv->percent != percent) {
1827 demux->priv->percent = percent;
1828 demux->priv->percent_changed = TRUE;
1832 if (demux->priv->percent_changed)
1833 GST_DEBUG_OBJECT (demux, "Percent changed, %d %% is_buffering:%d", percent,
1834 demux->priv->is_buffering);
1837 /* With TRACKS_LOCK held */
1839 demux_post_buffering_locked (GstAdaptiveDemux * demux)
1844 if (!demux->priv->percent_changed)
1847 BUFFERING_LOCK (demux);
1848 percent = demux->priv->percent;
1849 msg = gst_message_new_buffering ((GstObject *) demux, percent);
1850 TRACKS_UNLOCK (demux);
1851 gst_element_post_message ((GstElement *) demux, msg);
1853 BUFFERING_UNLOCK (demux);
1854 TRACKS_LOCK (demux);
1855 if (percent == demux->priv->percent)
1856 demux->priv->percent_changed = FALSE;
1859 /* MANIFEST_LOCK and TRACKS_LOCK hold */
1860 GstAdaptiveDemux2Stream *
1861 find_stream_for_track_locked (GstAdaptiveDemux * demux,
1862 GstAdaptiveDemuxTrack * track)
1866 for (iter = demux->output_period->streams; iter; iter = iter->next) {
1867 GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1868 if (g_list_find (stream->tracks, track))
1875 /* Called from seek handler
1877 * This function is used when a (flushing) seek caused a new period to be activated.
1879 * This will ensure that:
1880 * * the current output period is marked as finished (EOS)
1881 * * Any potential intermediate (non-input/non-output) periods are removed
1882 * * That the new input period is prepared and ready
1885 gst_adaptive_demux_seek_to_input_period (GstAdaptiveDemux * demux)
1889 GST_DEBUG_OBJECT (demux,
1890 "Preparing new input period %u", demux->input_period->period_num);
1892 /* Prepare the new input period */
1893 gst_adaptive_demux_update_collection (demux, demux->input_period);
1895 /* Transfer the previous selection to the new input period */
1896 gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
1897 demux->output_period);
1898 gst_adaptive_demux_prepare_streams (demux, FALSE);
1900 /* Remove all periods except for the input (last) and output (first) period */
1901 while (demux->priv->periods->length > 2) {
1902 GstAdaptiveDemuxPeriod *period = g_queue_pop_nth (demux->priv->periods, 1);
1903 /* Mark all tracks of the removed period as not selected and EOS so they
1904 * will be skipped / ignored */
1905 for (iter = period->tracks; iter; iter = iter->next) {
1906 GstAdaptiveDemuxTrack *track = iter->data;
1907 track->selected = FALSE;
1910 gst_adaptive_demux_period_unref (period);
1913 /* Mark all tracks of the output period as EOS so that the output loop
1914 * will immediately move to the new period */
1915 for (iter = demux->output_period->tracks; iter; iter = iter->next) {
1916 GstAdaptiveDemuxTrack *track = iter->data;
1920 /* Go over all slots, and clear any pending track */
1921 for (iter = demux->priv->outputs; iter; iter = iter->next) {
1922 OutputSlot *slot = (OutputSlot *) iter->data;
1924 if (slot->pending_track != NULL) {
1925 GST_DEBUG_OBJECT (demux,
1926 "Removing track '%s' as pending from output of current track '%s'",
1927 slot->pending_track->stream_id, slot->track->stream_id);
1928 gst_adaptive_demux_track_unref (slot->pending_track);
1929 slot->pending_track = NULL;
1934 /* must be called with manifest_lock taken */
1936 gst_adaptive_demux_get_live_seek_range (GstAdaptiveDemux * demux,
1937 gint64 * range_start, gint64 * range_stop)
1939 GstAdaptiveDemuxClass *klass;
1941 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1943 g_return_val_if_fail (klass->get_live_seek_range, FALSE);
1945 return klass->get_live_seek_range (demux, range_start, range_stop);
1948 /* must be called with manifest_lock taken */
1950 gst_adaptive_demux2_stream_in_live_seek_range (GstAdaptiveDemux * demux,
1951 GstAdaptiveDemux2Stream * stream)
1953 gint64 range_start, range_stop;
1954 if (gst_adaptive_demux_get_live_seek_range (demux, &range_start, &range_stop)) {
1955 GST_LOG_OBJECT (stream,
1956 "stream position %" GST_TIME_FORMAT " live seek range %"
1957 GST_STIME_FORMAT " - %" GST_STIME_FORMAT,
1958 GST_TIME_ARGS (stream->current_position), GST_STIME_ARGS (range_start),
1959 GST_STIME_ARGS (range_stop));
1960 return (stream->current_position >= range_start
1961 && stream->current_position <= range_stop);
1967 /* must be called with manifest_lock taken */
1969 gst_adaptive_demux_can_seek (GstAdaptiveDemux * demux)
1971 GstAdaptiveDemuxClass *klass;
1973 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1974 if (gst_adaptive_demux_is_live (demux)) {
1975 return klass->get_live_seek_range != NULL;
1978 return klass->seek != NULL;
1982 gst_adaptive_demux_setup_streams_for_restart (GstAdaptiveDemux * demux,
1983 GstSeekType start_type, GstSeekType stop_type)
1987 for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
1988 GstAdaptiveDemux2Stream *stream = iter->data;
1990 /* Make sure the download loop clears and restarts on the next start,
1991 * which will recompute the stream segment */
1992 g_assert (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED ||
1993 stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART);
1994 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
1995 stream->start_position = 0;
1997 if (demux->segment.rate > 0 && start_type != GST_SEEK_TYPE_NONE)
1998 stream->start_position = demux->segment.start;
1999 else if (demux->segment.rate < 0 && stop_type != GST_SEEK_TYPE_NONE)
2000 stream->start_position = demux->segment.stop;
2004 #define IS_SNAP_SEEK(f) (f & (GST_SEEK_FLAG_SNAP_BEFORE | \
2005 GST_SEEK_FLAG_SNAP_AFTER | \
2006 GST_SEEK_FLAG_SNAP_NEAREST | \
2007 GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | \
2008 GST_SEEK_FLAG_KEY_UNIT))
2009 #define REMOVE_SNAP_FLAGS(f) (f & ~(GST_SEEK_FLAG_SNAP_BEFORE | \
2010 GST_SEEK_FLAG_SNAP_AFTER | \
2011 GST_SEEK_FLAG_SNAP_NEAREST))
2014 gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux,
2017 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2021 GstSeekType start_type, stop_type;
2025 gboolean ret = FALSE;
2026 GstSegment oldsegment;
2027 GstEvent *flush_event;
2029 GST_INFO_OBJECT (demux, "Received seek event");
2031 gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
2034 if (format != GST_FORMAT_TIME) {
2035 GST_WARNING_OBJECT (demux,
2036 "Adaptive demuxers only support TIME-based seeking");
2037 gst_event_unref (event);
2041 if (flags & GST_SEEK_FLAG_SEGMENT) {
2042 GST_FIXME_OBJECT (demux, "Handle segment seeks");
2043 gst_event_unref (event);
2047 seqnum = gst_event_get_seqnum (event);
2049 GST_API_LOCK (demux);
2050 if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
2051 GST_LOG_OBJECT (demux, "Failed to acquire scheduler context");
2052 GST_API_UNLOCK (demux);
2056 if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE) {
2057 /* For instant rate seeks, reply directly and update
2058 * our segment so the new rate is reflected in any future
2061 gdouble rate_multiplier;
2063 /* instant rate change only supported if direction does not change. All
2064 * other requirements are already checked before creating the seek event
2065 * but let's double-check here to be sure */
2066 if ((demux->segment.rate > 0 && rate < 0) ||
2067 (demux->segment.rate < 0 && rate > 0) ||
2068 start_type != GST_SEEK_TYPE_NONE ||
2069 stop_type != GST_SEEK_TYPE_NONE || (flags & GST_SEEK_FLAG_FLUSH)) {
2070 GST_ERROR_OBJECT (demux,
2071 "Instant rate change seeks only supported in the "
2072 "same direction, without flushing and position change");
2076 rate_multiplier = rate / demux->segment.rate;
2078 ev = gst_event_new_instant_rate_change (rate_multiplier,
2079 (GstSegmentFlags) flags);
2080 gst_event_set_seqnum (ev, seqnum);
2082 ret = gst_adaptive_demux_push_src_event (demux, ev);
2085 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2086 demux->instant_rate_multiplier = rate_multiplier;
2087 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2092 if (!gst_adaptive_demux_can_seek (demux))
2095 /* We can only accept flushing seeks from this point onward */
2096 if (!(flags & GST_SEEK_FLAG_FLUSH)) {
2097 GST_ERROR_OBJECT (demux,
2098 "Non-flushing non-instant-rate seeks are not possible");
2102 if (gst_adaptive_demux_is_live (demux)) {
2103 gint64 range_start, range_stop;
2104 gboolean changed = FALSE;
2105 gboolean start_valid = TRUE, stop_valid = TRUE;
2107 if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
2109 GST_WARNING_OBJECT (demux, "Failure getting the live seek ranges");
2113 GST_DEBUG_OBJECT (demux,
2114 "Live range is %" GST_STIME_FORMAT " %" GST_STIME_FORMAT,
2115 GST_STIME_ARGS (range_start), GST_STIME_ARGS (range_stop));
2117 /* Handle relative positioning for live streams (relative to the range_stop) */
2118 if (start_type == GST_SEEK_TYPE_END) {
2119 start = range_stop + start;
2120 start_type = GST_SEEK_TYPE_SET;
2123 if (stop_type == GST_SEEK_TYPE_END) {
2124 stop = range_stop + stop;
2125 stop_type = GST_SEEK_TYPE_SET;
2129 /* Adjust the requested start/stop position if it falls beyond the live
2131 * The only case where we don't adjust is for the starting point of
2132 * an accurate seek (start if forward and stop if backwards)
2134 if (start_type == GST_SEEK_TYPE_SET && start < range_start &&
2135 (rate < 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2136 GST_DEBUG_OBJECT (demux,
2137 "seek before live stream start, setting to range start: %"
2138 GST_TIME_FORMAT, GST_TIME_ARGS (range_start));
2139 start = range_start;
2142 /* truncate stop position also if set */
2143 if (stop_type == GST_SEEK_TYPE_SET && stop > range_stop &&
2144 (rate > 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2145 GST_DEBUG_OBJECT (demux,
2146 "seek ending after live start, adjusting to: %"
2147 GST_TIME_FORMAT, GST_TIME_ARGS (range_stop));
2152 if (start_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (start) &&
2153 (start < range_start || start > range_stop)) {
2154 GST_WARNING_OBJECT (demux,
2155 "Seek to invalid position start:%" GST_STIME_FORMAT
2156 " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2157 ")", GST_STIME_ARGS (start), GST_STIME_ARGS (range_start),
2158 GST_STIME_ARGS (range_stop));
2159 start_valid = FALSE;
2161 if (stop_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (stop) &&
2162 (stop < range_start || stop > range_stop)) {
2163 GST_WARNING_OBJECT (demux,
2164 "Seek to invalid position stop:%" GST_STIME_FORMAT
2165 " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2166 ")", GST_STIME_ARGS (stop), GST_STIME_ARGS (range_start),
2167 GST_STIME_ARGS (range_stop));
2171 /* If the seek position is still outside of the seekable range, refuse the seek */
2172 if (!start_valid || !stop_valid)
2175 /* Re-create seek event with changed/updated values */
2177 gst_event_unref (event);
2179 gst_event_new_seek (rate, format, flags,
2180 start_type, start, stop_type, stop);
2181 gst_event_set_seqnum (event, seqnum);
2185 GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
2187 /* have a backup in case seek fails */
2188 gst_segment_copy_into (&demux->segment, &oldsegment);
2190 GST_DEBUG_OBJECT (demux, "sending flush start");
2191 flush_event = gst_event_new_flush_start ();
2192 gst_event_set_seqnum (flush_event, seqnum);
2194 gst_adaptive_demux_push_src_event (demux, flush_event);
2196 gst_adaptive_demux_stop_tasks (demux, FALSE);
2197 gst_adaptive_demux_reset_tracks (demux);
2199 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2201 if (!IS_SNAP_SEEK (flags) && !(flags & GST_SEEK_FLAG_ACCURATE)) {
2202 /* If no accurate seeking was specified, we want to default to seeking to
2203 * the previous segment for efficient/fast playback. */
2204 flags |= GST_SEEK_FLAG_KEY_UNIT;
2207 if (IS_SNAP_SEEK (flags)) {
2208 GstAdaptiveDemux2Stream *default_stream = NULL;
2209 GstAdaptiveDemux2Stream *stream = NULL;
2212 * Handle snap seeks as follows:
2213 * 1) do the snap seeking a (random) active stream
2214 * 1.1) If none are active yet (early-seek), pick a random default one
2215 * 2) use the final position on this stream to seek
2216 * on the other streams to the same position
2218 * We can't snap at all streams at the same time as they might end in
2219 * different positions, so just pick one and align all others to that
2223 /* Pick a random active stream on which to do the stream seek */
2224 for (iter = demux->output_period->streams; iter; iter = iter->next) {
2225 GstAdaptiveDemux2Stream *cand = iter->data;
2226 if (gst_adaptive_demux2_stream_is_selected_locked (cand)) {
2230 if (default_stream == NULL
2231 && gst_adaptive_demux2_stream_is_default_locked (cand))
2232 default_stream = cand;
2236 stream = default_stream;
2239 GstClockTimeDiff ts;
2240 GstSeekFlags stream_seek_flags = flags;
2242 /* snap-seek on the chosen stream and then
2243 * use the resulting position to seek on all streams */
2245 if (start_type != GST_SEEK_TYPE_NONE)
2248 ts = gst_segment_position_from_running_time (&demux->segment,
2249 GST_FORMAT_TIME, demux->priv->global_output_position);
2250 start_type = GST_SEEK_TYPE_SET;
2253 if (stop_type != GST_SEEK_TYPE_NONE)
2256 stop_type = GST_SEEK_TYPE_SET;
2257 ts = gst_segment_position_from_running_time (&demux->segment,
2258 GST_FORMAT_TIME, demux->priv->global_output_position);
2262 if (gst_adaptive_demux2_stream_seek (stream, rate >= 0, stream_seek_flags,
2263 ts, &ts) != GST_FLOW_OK) {
2264 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2267 /* replace event with a new one without snapping to seek on all streams */
2268 gst_event_unref (event);
2275 gst_event_new_seek (rate, format, REMOVE_SNAP_FLAGS (flags),
2276 start_type, start, stop_type, stop);
2277 GST_DEBUG_OBJECT (demux, "Adapted snap seek to %" GST_PTR_FORMAT, event);
2281 ret = gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
2282 start, stop_type, stop, &update);
2285 GST_DEBUG_OBJECT (demux, "Calling subclass seek: %" GST_PTR_FORMAT, event);
2287 ret = demux_class->seek (demux, event);
2291 /* Is there anything else we can do if it fails? */
2292 gst_segment_copy_into (&oldsegment, &demux->segment);
2294 demux->priv->segment_seqnum = seqnum;
2296 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2298 /* Resetting flow combiner */
2299 gst_flow_combiner_reset (demux->priv->flowcombiner);
2301 GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
2302 flush_event = gst_event_new_flush_stop (TRUE);
2303 gst_event_set_seqnum (flush_event, seqnum);
2304 gst_adaptive_demux_push_src_event (demux, flush_event);
2306 /* If the seek generated a new period, prepare it */
2307 if (!demux->input_period->prepared) {
2308 /* This can only happen on flushing seeks */
2309 g_assert (flags & GST_SEEK_FLAG_FLUSH);
2310 gst_adaptive_demux_seek_to_input_period (demux);
2313 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2314 GST_DEBUG_OBJECT (demux, "Demuxer segment after seek: %" GST_SEGMENT_FORMAT,
2316 gst_adaptive_demux_setup_streams_for_restart (demux, start_type, stop_type);
2317 demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2319 /* Reset the global output position (running time) for when the output loop restarts */
2320 demux->priv->global_output_position = 0;
2322 /* After a flushing seek, any instant-rate override is undone */
2323 demux->instant_rate_multiplier = 1.0;
2325 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2327 /* Restart the demux */
2328 gst_adaptive_demux_start_tasks (demux);
2331 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2332 GST_API_UNLOCK (demux);
2333 gst_event_unref (event);
2338 /* Returns TRUE if the stream has at least one selected track */
2340 gst_adaptive_demux2_stream_has_selected_tracks (GstAdaptiveDemux2Stream *
2345 for (tmp = stream->tracks; tmp; tmp = tmp->next) {
2346 GstAdaptiveDemuxTrack *track = tmp->data;
2348 if (track->selected)
2356 handle_stream_selection (GstAdaptiveDemux * demux, GList * streams,
2359 gboolean selection_handled = TRUE;
2361 GList *tracks = NULL;
2363 if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux))
2366 TRACKS_LOCK (demux);
2367 /* We can't do stream selection if we are migrating between periods */
2368 if (demux->input_period && demux->output_period != demux->input_period) {
2369 GST_WARNING_OBJECT (demux,
2370 "Stream selection while migrating between periods is not possible");
2371 TRACKS_UNLOCK (demux);
2374 /* Validate the streams and fill:
2375 * tracks : list of tracks corresponding to requested streams
2377 for (iter = streams; iter; iter = iter->next) {
2378 gchar *stream_id = (gchar *) iter->data;
2379 GstAdaptiveDemuxTrack *track;
2381 GST_DEBUG_OBJECT (demux, "Stream requested : %s", stream_id);
2382 track = find_track_for_stream_id (demux->output_period, stream_id);
2384 GST_WARNING_OBJECT (demux, "Unrecognized stream_id '%s'", stream_id);
2385 selection_handled = FALSE;
2386 goto select_streams_done;
2388 tracks = g_list_append (tracks, track);
2389 GST_DEBUG_OBJECT (demux, "Track found, selected:%d", track->selected);
2392 /* FIXME : ACTIVATING AND DEACTIVATING STREAMS SHOULD BE DONE FROM THE
2393 * SCHEDULING THREAD */
2395 /* FIXME: We want to iterate all streams, mark them as deselected,
2396 * then iterate tracks and mark any streams that have at least 1
2397 * active output track, then loop over all streams again and start/stop
2400 /* Go over all tracks present and (de)select based on current selection */
2401 for (iter = demux->output_period->tracks; iter; iter = iter->next) {
2402 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
2404 if (track->selected && !g_list_find (tracks, track)) {
2405 GST_DEBUG_OBJECT (demux, "De-select track '%s' (active:%d)",
2406 track->stream_id, track->active);
2407 track->selected = FALSE;
2408 track->draining = TRUE;
2409 } else if (!track->selected && g_list_find (tracks, track)) {
2410 GST_DEBUG_OBJECT (demux, "Selecting track '%s'", track->stream_id);
2412 track->selected = TRUE;
2416 /* Start or stop streams based on the updated track selection */
2417 for (iter = demux->output_period->streams; iter; iter = iter->next) {
2418 GstAdaptiveDemux2Stream *stream = iter->data;
2421 gboolean is_running = gst_adaptive_demux2_stream_is_running (stream);
2422 gboolean should_be_running =
2423 gst_adaptive_demux2_stream_has_selected_tracks (stream);
2425 if (!is_running && should_be_running) {
2426 GstClockTime output_running_ts = demux->priv->global_output_position;
2427 GstClockTime start_position;
2429 /* Calculate where we should start the stream, and then
2431 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2433 GST_DEBUG_OBJECT (stream, "(Re)starting stream. Output running ts %"
2434 GST_TIME_FORMAT " in demux segment %" GST_SEGMENT_FORMAT,
2435 GST_TIME_ARGS (output_running_ts), &demux->segment);
2438 gst_segment_position_from_running_time (&demux->segment,
2439 GST_FORMAT_TIME, output_running_ts);
2441 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2443 GST_DEBUG_OBJECT (demux, "Setting stream start position to %"
2444 GST_TIME_FORMAT, GST_TIME_ARGS (start_position));
2446 stream->current_position = stream->start_position = start_position;
2447 stream->compute_segment = TRUE;
2449 /* If output has already begun, ensure we seek this segment
2450 * to the correct restart position when the download loop begins */
2451 if (output_running_ts != 0)
2452 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
2454 /* Activate track pads for this stream */
2455 for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2456 GstAdaptiveDemuxTrack *track =
2457 (GstAdaptiveDemuxTrack *) trackiter->data;
2458 gst_pad_set_active (track->sinkpad, TRUE);
2461 gst_adaptive_demux2_stream_start (stream);
2462 } else if (is_running && !should_be_running) {
2463 /* Stream should not be running and needs stopping */
2464 gst_adaptive_demux2_stream_stop (stream);
2466 /* Set all track sinkpads to inactive for this stream */
2467 for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2468 GstAdaptiveDemuxTrack *track =
2469 (GstAdaptiveDemuxTrack *) trackiter->data;
2470 gst_pad_set_active (track->sinkpad, FALSE);
2475 g_atomic_int_set (&demux->priv->requested_selection_seqnum, seqnum);
2477 select_streams_done:
2478 demux_update_buffering_locked (demux);
2479 demux_post_buffering_locked (demux);
2481 TRACKS_UNLOCK (demux);
2482 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2485 g_list_free (tracks);
2486 return selection_handled;
2490 gst_adaptive_demux_handle_select_streams_event (GstAdaptiveDemux * demux,
2494 gboolean selection_handled;
2496 if (GST_EVENT_SEQNUM (event) ==
2497 g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
2498 GST_DEBUG_OBJECT (demux, "Already handled/handling select-streams %d",
2499 GST_EVENT_SEQNUM (event));
2503 gst_event_parse_select_streams (event, &streams);
2505 handle_stream_selection (demux, streams, GST_EVENT_SEQNUM (event));
2506 g_list_free_full (streams, g_free);
2508 gst_event_unref (event);
2509 return selection_handled;
2513 gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
2516 GstAdaptiveDemux *demux;
2518 demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2520 switch (event->type) {
2521 case GST_EVENT_SEEK:
2523 guint32 seqnum = gst_event_get_seqnum (event);
2524 if (seqnum == demux->priv->segment_seqnum) {
2525 GST_LOG_OBJECT (pad,
2526 "Drop duplicated SEEK event seqnum %" G_GUINT32_FORMAT, seqnum);
2527 gst_event_unref (event);
2530 return gst_adaptive_demux_handle_seek_event (demux, event);
2532 case GST_EVENT_LATENCY:{
2533 /* Upstream and our internal source are irrelevant
2534 * for latency, and we should not fail here to
2535 * configure the latency */
2536 gst_event_unref (event);
2539 case GST_EVENT_QOS:{
2540 GstClockTimeDiff diff;
2541 GstClockTime timestamp;
2542 GstClockTime earliest_time;
2544 gst_event_parse_qos (event, NULL, NULL, &diff, ×tamp);
2545 /* Only take into account lateness if late */
2547 earliest_time = timestamp + 2 * diff;
2549 earliest_time = timestamp;
2551 GST_OBJECT_LOCK (demux);
2552 if (!GST_CLOCK_TIME_IS_VALID (demux->priv->qos_earliest_time) ||
2553 earliest_time > demux->priv->qos_earliest_time) {
2554 demux->priv->qos_earliest_time = earliest_time;
2555 GST_DEBUG_OBJECT (demux, "qos_earliest_time %" GST_TIME_FORMAT,
2556 GST_TIME_ARGS (demux->priv->qos_earliest_time));
2558 GST_OBJECT_UNLOCK (demux);
2561 case GST_EVENT_SELECT_STREAMS:
2563 return gst_adaptive_demux_handle_select_streams_event (demux, event);
2569 return gst_pad_event_default (pad, parent, event);
2573 gst_adaptive_demux_handle_query_seeking (GstAdaptiveDemux * demux,
2576 GstFormat fmt = GST_FORMAT_UNDEFINED;
2579 gboolean ret = FALSE;
2581 if (!g_atomic_int_get (&demux->priv->have_manifest)) {
2582 GST_INFO_OBJECT (demux,
2583 "Don't have manifest yet, can't answer seeking query");
2584 return FALSE; /* can't answer without manifest */
2587 GST_MANIFEST_LOCK (demux);
2589 gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
2590 GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
2591 if (fmt == GST_FORMAT_TIME) {
2592 GstClockTime duration;
2593 gboolean can_seek = gst_adaptive_demux_can_seek (demux);
2597 if (gst_adaptive_demux_is_live (demux)) {
2598 ret = gst_adaptive_demux_get_live_seek_range (demux, &start, &stop);
2601 GST_MANIFEST_UNLOCK (demux);
2602 GST_INFO_OBJECT (demux, "can't answer seeking query");
2606 duration = demux->priv->duration;
2607 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
2611 gst_query_set_seeking (query, fmt, can_seek, start, stop);
2612 GST_INFO_OBJECT (demux, "GST_QUERY_SEEKING returning with start : %"
2613 GST_TIME_FORMAT ", stop : %" GST_TIME_FORMAT,
2614 GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
2616 GST_MANIFEST_UNLOCK (demux);
2621 gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
2624 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2625 gboolean ret = FALSE;
2630 switch (query->type) {
2631 case GST_QUERY_DURATION:{
2633 GstClockTime duration = GST_CLOCK_TIME_NONE;
2635 gst_query_parse_duration (query, &fmt, NULL);
2637 if (gst_adaptive_demux_is_live (demux)) {
2638 /* We are able to answer this query: the duration is unknown */
2639 gst_query_set_duration (query, fmt, -1);
2644 if (fmt == GST_FORMAT_TIME
2645 && g_atomic_int_get (&demux->priv->have_manifest)) {
2647 GST_MANIFEST_LOCK (demux);
2648 duration = demux->priv->duration;
2649 GST_MANIFEST_UNLOCK (demux);
2651 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
2652 gst_query_set_duration (query, GST_FORMAT_TIME, duration);
2657 GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
2658 GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
2661 case GST_QUERY_LATENCY:{
2662 gst_query_set_latency (query, FALSE, 0, -1);
2666 case GST_QUERY_SEEKING:
2667 ret = gst_adaptive_demux_handle_query_seeking (demux, query);
2671 GST_MANIFEST_LOCK (demux);
2673 /* TODO HLS can answer this differently it seems */
2674 if (demux->manifest_uri) {
2675 /* FIXME: (hls) Do we answer with the variant playlist, with the current
2676 * playlist or the the uri of the last downlowaded fragment? */
2677 gst_query_set_uri (query, demux->manifest_uri);
2681 GST_MANIFEST_UNLOCK (demux);
2683 case GST_QUERY_SELECTABLE:
2684 gst_query_set_selectable (query, TRUE);
2688 /* Don't forward queries upstream because of the special nature of this
2689 * "demuxer", which relies on the upstream element only to be fed
2699 gst_adaptive_demux_query (GstElement * element, GstQuery * query)
2701 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
2703 GST_LOG_OBJECT (demux, "%" GST_PTR_FORMAT, query);
2705 switch (GST_QUERY_TYPE (query)) {
2706 case GST_QUERY_BUFFERING:
2709 gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
2711 if (!demux->output_period) {
2712 if (format != GST_FORMAT_TIME) {
2713 GST_DEBUG_OBJECT (demux,
2714 "No period setup yet, can't answer non-TIME buffering queries");
2718 GST_DEBUG_OBJECT (demux,
2719 "No period setup yet, but still answering buffering query");
2723 case GST_QUERY_SEEKING:
2725 /* Source pads might not be present early on which would cause the default
2726 * element query handler to fail, yet we can answer this query */
2727 return gst_adaptive_demux_handle_query_seeking (demux, query);
2733 return GST_ELEMENT_CLASS (parent_class)->query (element, query);
2737 gst_adaptive_demux_handle_lost_sync (GstAdaptiveDemux * demux)
2741 GST_WARNING_OBJECT (demux, "Lost synchronization, seeking back to live head");
2744 gst_event_new_seek (1.0, GST_FORMAT_TIME,
2745 GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT, GST_SEEK_TYPE_END, 0,
2746 GST_SEEK_TYPE_NONE, 0);
2747 gst_adaptive_demux_handle_seek_event (demux, seek);
2752 /* Called when the scheduler starts, to kick off manifest updates
2753 * and stream downloads */
2755 gst_adaptive_demux_scheduler_start_cb (GstAdaptiveDemux * demux)
2759 GST_INFO_OBJECT (demux, "Starting streams' tasks");
2761 iter = demux->input_period->streams;
2763 for (; iter; iter = g_list_next (iter)) {
2764 GstAdaptiveDemux2Stream *stream = iter->data;
2766 /* If we need to process this stream to discover tracks *OR* it has any
2767 * tracks which are selected, start it now */
2768 if ((stream->pending_tracks == TRUE)
2769 || gst_adaptive_demux2_stream_is_selected_locked (stream))
2770 gst_adaptive_demux2_stream_start (stream);
2776 /* must be called with manifest_lock taken */
2778 gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux)
2780 if (!gst_adaptive_demux2_is_running (demux)) {
2781 GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
2785 GST_DEBUG_OBJECT (demux, "Starting the SCHEDULER task");
2786 gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2787 (GSourceFunc) gst_adaptive_demux_scheduler_start_cb, demux, NULL);
2789 TRACKS_LOCK (demux);
2790 demux->priv->flushing = FALSE;
2791 GST_DEBUG_OBJECT (demux, "Starting the output task");
2792 gst_task_start (demux->priv->output_task);
2793 TRACKS_UNLOCK (demux);
2796 /* must be called with manifest_lock taken */
2798 gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux * demux)
2800 GST_DEBUG_OBJECT (demux, "requesting stop of the manifest update task");
2801 if (demux->priv->manifest_updates_cb != 0) {
2802 gst_adaptive_demux_loop_cancel_call (demux->priv->scheduler_task,
2803 demux->priv->manifest_updates_cb);
2804 demux->priv->manifest_updates_cb = 0;
2808 static gboolean gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux);
2810 /* must be called with manifest_lock taken */
2812 gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux * demux)
2814 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2816 if (gst_adaptive_demux_is_live (demux)) {
2817 /* Task to periodically update the manifest */
2818 if (demux_class->requires_periodical_playlist_update (demux)) {
2819 GST_DEBUG_OBJECT (demux, "requesting start of the manifest update task");
2820 if (demux->priv->manifest_updates_cb == 0) {
2821 demux->priv->manifest_updates_cb =
2822 gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2823 (GSourceFunc) gst_adaptive_demux_updates_start_cb, demux, NULL);
2829 /* must be called with manifest_lock taken
2830 * This function will temporarily release manifest_lock in order to join the
2832 * The api_lock will still protect it against other threads trying to modify
2833 * the demux element.
2836 gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux, gboolean stop_updates)
2838 GST_LOG_OBJECT (demux, "Stopping tasks");
2841 gst_adaptive_demux_stop_manifest_update_task (demux);
2843 TRACKS_LOCK (demux);
2844 if (demux->input_period)
2845 gst_adaptive_demux_period_stop_tasks (demux->input_period);
2847 demux->priv->flushing = TRUE;
2848 g_cond_signal (&demux->priv->tracks_add);
2849 gst_task_stop (demux->priv->output_task);
2850 TRACKS_UNLOCK (demux);
2852 gst_task_join (demux->priv->output_task);
2854 demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2857 /* must be called with manifest_lock taken */
2859 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event)
2862 gboolean ret = TRUE;
2864 GST_DEBUG_OBJECT (demux, "event %" GST_PTR_FORMAT, event);
2866 TRACKS_LOCK (demux);
2867 for (iter = demux->priv->outputs; iter; iter = g_list_next (iter)) {
2868 OutputSlot *slot = (OutputSlot *) iter->data;
2869 gst_event_ref (event);
2870 GST_DEBUG_OBJECT (slot->pad, "Pushing event");
2871 ret = ret & gst_pad_push_event (slot->pad, event);
2872 if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
2873 slot->pushed_timed_data = FALSE;
2875 TRACKS_UNLOCK (demux);
2876 gst_event_unref (event);
2880 /* must be called with manifest_lock taken */
2882 gst_adaptive_demux2_stream_set_caps (GstAdaptiveDemux2Stream * stream,
2885 GST_DEBUG_OBJECT (stream,
2886 "setting new caps for stream %" GST_PTR_FORMAT, caps);
2887 gst_caps_replace (&stream->pending_caps, caps);
2888 gst_caps_unref (caps);
2891 /* must be called with manifest_lock taken */
2893 gst_adaptive_demux2_stream_set_tags (GstAdaptiveDemux2Stream * stream,
2896 GST_DEBUG_OBJECT (stream,
2897 "setting new tags for stream %" GST_PTR_FORMAT, tags);
2898 gst_clear_tag_list (&stream->pending_tags);
2899 stream->pending_tags = tags;
2902 /* must be called with manifest_lock taken */
2904 gst_adaptive_demux2_stream_queue_event (GstAdaptiveDemux2Stream * stream,
2907 stream->pending_events = g_list_append (stream->pending_events, event);
2911 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
2917 /* Called when a stream needs waking after the manifest is updated */
2919 gst_adaptive_demux2_stream_wants_manifest_update (GstAdaptiveDemux * demux)
2921 demux->priv->stream_waiting_for_manifest = TRUE;
2925 gst_adaptive_demux_manifest_update_cb (GstAdaptiveDemux * demux)
2927 GstFlowReturn ret = GST_FLOW_OK;
2928 gboolean schedule_again = TRUE;
2930 GST_MANIFEST_LOCK (demux);
2931 demux->priv->manifest_updates_cb = 0;
2933 /* Updating playlist only needed for live playlists */
2934 if (!gst_adaptive_demux_is_live (demux)) {
2935 GST_MANIFEST_UNLOCK (demux);
2936 return G_SOURCE_REMOVE;
2939 GST_DEBUG_OBJECT (demux, "Updating playlist");
2940 ret = gst_adaptive_demux_update_manifest (demux);
2942 if (ret == GST_FLOW_EOS) {
2943 GST_MANIFEST_UNLOCK (demux);
2944 return G_SOURCE_REMOVE;
2947 if (ret == GST_FLOW_OK) {
2948 GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
2949 demux->priv->update_failed_count = 0;
2951 /* Wake up download tasks */
2952 if (demux->priv->stream_waiting_for_manifest) {
2955 for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
2956 GstAdaptiveDemux2Stream *stream = iter->data;
2957 gst_adaptive_demux2_stream_on_manifest_update (stream);
2959 demux->priv->stream_waiting_for_manifest = FALSE;
2962 demux->priv->update_failed_count++;
2964 if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
2965 GST_WARNING_OBJECT (demux, "Could not update the playlist, flow: %s",
2966 gst_flow_get_name (ret));
2968 GST_ELEMENT_ERROR (demux, STREAM, FAILED,
2969 (_("Internal data stream error.")), ("Could not update playlist"));
2970 GST_DEBUG_OBJECT (demux, "Stopped manifest updates because of error");
2971 schedule_again = FALSE;
2975 if (ret == GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC)
2976 gst_adaptive_demux_handle_lost_sync (demux);
2978 if (schedule_again) {
2979 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2981 demux->priv->manifest_updates_cb =
2982 gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
2983 klass->get_manifest_update_interval (demux) * GST_USECOND,
2984 (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
2987 GST_MANIFEST_UNLOCK (demux);
2989 return G_SOURCE_REMOVE;
2993 gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux)
2995 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2997 /* Loop for updating of the playlist. This periodically checks if
2998 * the playlist is updated and does so, then signals the streaming
2999 * thread in case it can continue downloading now. */
3001 /* block until the next scheduled update or the signal to quit this thread */
3002 GST_DEBUG_OBJECT (demux, "Started updates task");
3003 demux->priv->manifest_updates_cb =
3004 gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
3005 klass->get_manifest_update_interval (demux) * GST_USECOND,
3006 (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
3008 return G_SOURCE_REMOVE;
3012 find_replacement_slot_for_track (GstAdaptiveDemux * demux,
3013 GstAdaptiveDemuxTrack * track)
3017 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3018 OutputSlot *slot = (OutputSlot *) tmp->data;
3019 /* Incompatible output type */
3020 if (slot->type != track->type)
3023 /* Slot which is already assigned to this pending track */
3024 if (slot->pending_track == track)
3027 /* slot already used for another pending track */
3028 if (slot->pending_track != NULL)
3031 /* Current output track is of the same type and is draining */
3032 if (slot->track && slot->track->draining)
3039 /* TRACKS_LOCK taken */
3041 find_slot_for_track (GstAdaptiveDemux * demux, GstAdaptiveDemuxTrack * track)
3045 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3046 OutputSlot *slot = (OutputSlot *) tmp->data;
3048 if (slot->track == track)
3055 /* TRACKS_LOCK held */
3057 all_selected_tracks_are_active (GstAdaptiveDemux * demux, guint32 seqnum)
3062 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3063 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3065 if (track->selected && !track->active)
3069 /* All selected tracks are active, created message */
3071 gst_message_new_streams_selected (GST_OBJECT (demux),
3072 demux->output_period->collection);
3073 GST_MESSAGE_SEQNUM (msg) = seqnum;
3074 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3075 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3076 if (track->active) {
3077 gst_message_streams_selected_add (msg, track->stream_object);
3085 gst_adaptive_demux_send_initial_events (GstAdaptiveDemux * demux,
3088 GstAdaptiveDemuxTrack *track = slot->track;
3091 /* Send EVENT_STREAM_START */
3092 event = gst_event_new_stream_start (track->stream_id);
3093 if (demux->have_group_id)
3094 gst_event_set_group_id (event, demux->group_id);
3095 gst_event_set_stream_flags (event, track->flags);
3096 gst_event_set_stream (event, track->stream_object);
3097 GST_DEBUG_OBJECT (demux, "Sending stream-start for track '%s'",
3099 gst_pad_push_event (slot->pad, event);
3101 /* Send EVENT_STREAM_COLLECTION */
3102 event = gst_event_new_stream_collection (demux->output_period->collection);
3103 GST_DEBUG_OBJECT (demux, "Sending stream-collection for track '%s'",
3105 gst_pad_push_event (slot->pad, event);
3107 /* Mark all sticky events for re-sending */
3108 gst_event_store_mark_all_undelivered (&track->sticky_events);
3112 * Called with TRACKS_LOCK taken
3115 check_and_handle_selection_update_locked (GstAdaptiveDemux * demux)
3118 guint requested_selection_seqnum;
3121 /* If requested_selection_seqnum != current_selection_seqnum, re-check all
3122 output slots vs active/draining tracks */
3123 requested_selection_seqnum =
3124 g_atomic_int_get (&demux->priv->requested_selection_seqnum);
3126 if (requested_selection_seqnum == demux->priv->current_selection_seqnum)
3129 GST_DEBUG_OBJECT (demux, "Selection changed, re-checking all output slots");
3131 /* Go over all slots, and if they have a pending track that's no longer
3132 * selected, clear it so the slot can be reused */
3133 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3134 OutputSlot *slot = (OutputSlot *) tmp->data;
3136 if (slot->pending_track != NULL && !slot->pending_track->selected) {
3137 GST_DEBUG_OBJECT (demux,
3138 "Removing deselected track '%s' as pending from output of current track '%s'",
3139 slot->pending_track->stream_id, slot->track->stream_id);
3140 gst_adaptive_demux_track_unref (slot->pending_track);
3141 slot->pending_track = NULL;
3145 /* Go over all tracks and create/re-assign/remove slots */
3146 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3147 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3149 if (track->selected) {
3150 OutputSlot *slot = find_slot_for_track (demux, track);
3152 /* 0. Track is selected and has a slot. Nothing to do */
3154 GST_DEBUG_OBJECT (demux, "Track '%s' is already being outputted",
3159 slot = find_replacement_slot_for_track (demux, track);
3161 /* 1. There is an existing slot of the same type which is currently
3162 * draining, assign this track as a replacement for it */
3163 g_assert (slot->pending_track == NULL || slot->pending_track == track);
3164 if (slot->pending_track == NULL) {
3165 slot->pending_track = gst_adaptive_demux_track_ref (track);
3166 GST_DEBUG_OBJECT (demux,
3167 "Track '%s' (period %u) will be used on output of track '%s' (period %u)",
3168 track->stream_id, track->period_num,
3169 slot->track->stream_id, slot->track->period_num);
3172 /* 2. There is no compatible replacement slot, create a new one */
3173 slot = gst_adaptive_demux_output_slot_new (demux, track->type);
3174 GST_DEBUG_OBJECT (demux, "Created slot for track '%s'",
3176 demux->priv->outputs = g_list_append (demux->priv->outputs, slot);
3178 track->update_next_segment = TRUE;
3180 slot->track = gst_adaptive_demux_track_ref (track);
3181 track->active = TRUE;
3182 gst_adaptive_demux_send_initial_events (demux, slot);
3185 /* If we were draining this track, we no longer are */
3186 track->draining = FALSE;
3190 /* Finally check all slots have a current/pending track. If not remove it */
3191 for (tmp = demux->priv->outputs; tmp;) {
3192 OutputSlot *slot = (OutputSlot *) tmp->data;
3193 /* We should never has slots without target tracks */
3194 g_assert (slot->track);
3195 if (slot->track->draining && !slot->pending_track) {
3196 GstAdaptiveDemux2Stream *stream;
3198 GST_DEBUG_OBJECT (demux, "Output for track '%s' is no longer used",
3199 slot->track->stream_id);
3200 slot->track->active = FALSE;
3202 /* If the stream feeding this track is stopped, flush and clear
3203 * the track now that it's going inactive. If the stream was not
3204 * found, it means we advanced past that period already (and the
3205 * stream was stopped and discarded) */
3206 stream = find_stream_for_track_locked (demux, slot->track);
3207 if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3208 gst_adaptive_demux_track_flush (slot->track);
3210 tmp = demux->priv->outputs = g_list_remove (demux->priv->outputs, slot);
3211 gst_adaptive_demux_output_slot_free (demux, slot);
3216 demux->priv->current_selection_seqnum = requested_selection_seqnum;
3217 msg = all_selected_tracks_are_active (demux, requested_selection_seqnum);
3219 TRACKS_UNLOCK (demux);
3220 GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3221 gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3222 TRACKS_LOCK (demux);
3226 /* TRACKS_LOCK held */
3228 gst_adaptive_demux_advance_output_period (GstAdaptiveDemux * demux)
3231 GstAdaptiveDemuxPeriod *previous_period;
3232 GstStreamCollection *collection;
3234 /* Grab the next period, should be demux->periods->next->data */
3235 previous_period = g_queue_pop_head (demux->priv->periods);
3237 /* Remove ref held by demux->output_period */
3238 gst_adaptive_demux_period_unref (previous_period);
3239 demux->output_period =
3240 gst_adaptive_demux_period_ref (g_queue_peek_head (demux->priv->periods));
3242 GST_DEBUG_OBJECT (demux, "Moved output to period %d",
3243 demux->output_period->period_num);
3245 /* We can now post the collection of the new period */
3246 collection = demux->output_period->collection;
3247 TRACKS_UNLOCK (demux);
3248 gst_element_post_message (GST_ELEMENT_CAST (demux),
3249 gst_message_new_stream_collection (GST_OBJECT (demux), collection));
3250 TRACKS_LOCK (demux);
3252 /* Unselect all tracks of the previous period */
3253 for (iter = previous_period->tracks; iter; iter = iter->next) {
3254 GstAdaptiveDemuxTrack *track = iter->data;
3255 if (track->selected) {
3256 track->selected = FALSE;
3257 track->draining = TRUE;
3261 /* Force a selection re-check */
3262 g_atomic_int_inc (&demux->priv->requested_selection_seqnum);
3263 check_and_handle_selection_update_locked (demux);
3265 /* Remove the final ref on the previous period now that we have done the switch */
3266 gst_adaptive_demux_period_unref (previous_period);
3271 /* Called with TRACKS_LOCK taken */
3273 handle_slot_pending_track_switch_locked (GstAdaptiveDemux * demux,
3276 GstAdaptiveDemuxTrack *track = slot->track;
3278 gboolean pending_is_ready;
3279 GstAdaptiveDemux2Stream *stream;
3281 /* If we have a pending track for this slot, the current track should be
3282 * draining and no longer selected */
3283 g_assert (track->draining && !track->selected);
3285 /* If we're draining, check if the pending track has enough data *or* that
3286 we've already drained out entirely */
3288 (slot->pending_track->level_time >=
3289 slot->pending_track->buffering_threshold);
3290 pending_is_ready |= slot->pending_track->eos;
3292 if (!pending_is_ready && gst_queue_array_get_length (track->queue) > 0) {
3293 GST_DEBUG_OBJECT (demux,
3294 "Replacement track '%s' doesn't have enough data for switching yet",
3295 slot->pending_track->stream_id);
3299 GST_DEBUG_OBJECT (demux,
3300 "Pending replacement track has enough data, switching");
3301 track->active = FALSE;
3302 track->draining = FALSE;
3304 /* If the stream feeding this track is stopped, flush and clear
3305 * the track now that it's going inactive. If the stream was not
3306 * found, it means we advanced past that period already (and the
3307 * stream was stopped and discarded) */
3308 stream = find_stream_for_track_locked (demux, track);
3309 if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3310 gst_adaptive_demux_track_flush (track);
3312 gst_adaptive_demux_track_unref (track);
3313 /* We steal the reference of pending_track */
3314 track = slot->track = slot->pending_track;
3315 slot->pending_track = NULL;
3316 slot->track->active = TRUE;
3318 /* Make sure the track segment will start at the current position */
3319 track->update_next_segment = TRUE;
3321 /* Send stream start and collection, and schedule sticky events */
3322 gst_adaptive_demux_send_initial_events (demux, slot);
3324 /* Can we emit the streams-selected message now ? */
3326 all_selected_tracks_are_active (demux,
3327 g_atomic_int_get (&demux->priv->requested_selection_seqnum));
3329 TRACKS_UNLOCK (demux);
3330 GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3331 gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3332 TRACKS_LOCK (demux);
3338 gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux)
3341 GstClockTimeDiff global_output_position = GST_CLOCK_STIME_NONE;
3342 gboolean wait_for_data = FALSE;
3343 gboolean all_tracks_empty;
3346 GST_DEBUG_OBJECT (demux, "enter");
3348 TRACKS_LOCK (demux);
3350 /* Check if stopping */
3351 if (demux->priv->flushing) {
3352 ret = GST_FLOW_FLUSHING;
3356 /* If the selection changed, handle it */
3357 check_and_handle_selection_update_locked (demux);
3361 global_output_position = GST_CLOCK_STIME_NONE;
3362 all_tracks_empty = TRUE;
3364 if (wait_for_data) {
3365 GST_DEBUG_OBJECT (demux, "Waiting for data");
3366 g_cond_wait (&demux->priv->tracks_add, &demux->priv->tracks_lock);
3367 GST_DEBUG_OBJECT (demux, "Done waiting for data");
3368 if (demux->priv->flushing) {
3369 ret = GST_FLOW_FLUSHING;
3372 wait_for_data = FALSE;
3375 /* Grab/Recalculate current global output position
3376 * This is the minimum pending output position of all tracks used for output
3378 * If there is a track which is empty and not EOS, wait for it to receive data
3379 * then recalculate global output position.
3381 * This also pushes downstream all non-timed data that might be present.
3383 * IF all tracks are EOS : stop task
3385 GST_LOG_OBJECT (demux, "Calculating global output position of output slots");
3386 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3387 OutputSlot *slot = (OutputSlot *) tmp->data;
3388 GstAdaptiveDemuxTrack *track;
3390 /* If there is a pending track, Check if it's time to switch to it */
3391 if (slot->pending_track)
3392 handle_slot_pending_track_switch_locked (demux, slot);
3394 track = slot->track;
3396 if (!track->active) {
3397 /* Note: Edward: I can't see in what cases we would end up with inactive
3398 tracks assigned to slots. */
3399 GST_ERROR_OBJECT (demux, "FIXME : Handle track switching");
3400 g_assert (track->active);
3404 if (track->next_position == GST_CLOCK_STIME_NONE) {
3405 gst_adaptive_demux_track_update_next_position (track);
3408 GST_TRACE_OBJECT (demux,
3409 "Looking at track %s (period %u). next_position %" GST_STIME_FORMAT,
3410 track->stream_id, track->period_num,
3411 GST_STIME_ARGS (track->next_position));
3413 if (track->next_position != GST_CLOCK_STIME_NONE) {
3414 if (global_output_position == GST_CLOCK_STIME_NONE)
3415 global_output_position = track->next_position;
3417 global_output_position =
3418 MIN (global_output_position, track->next_position);
3419 track->waiting_add = FALSE;
3420 all_tracks_empty = FALSE;
3421 } else if (!track->eos) {
3422 GST_DEBUG_OBJECT (demux, "Need timed data on track %s (period %u)",
3423 track->stream_id, track->period_num);
3424 all_tracks_empty = FALSE;
3425 wait_for_data = track->waiting_add = TRUE;
3427 GST_DEBUG_OBJECT (demux,
3428 "Track %s (period %u) is EOS, not waiting for timed data",
3429 track->stream_id, track->period_num);
3431 if (gst_queue_array_get_length (track->queue) > 0) {
3432 all_tracks_empty = FALSE;
3440 if (all_tracks_empty && demux->output_period->has_next_period) {
3441 GST_DEBUG_OBJECT (demux, "Period %d is drained, switching to next period",
3442 demux->output_period->period_num);
3443 if (!gst_adaptive_demux_advance_output_period (demux)) {
3444 /* Failed to move to next period, error out */
3445 ret = GST_FLOW_ERROR;
3448 /* Restart the loop */
3452 GST_DEBUG_OBJECT (demux, "Outputting data for position %" GST_STIME_FORMAT,
3453 GST_STIME_ARGS (global_output_position));
3457 * We know all active tracks have pending timed data
3458 * * while track next_position <= global output position
3459 * * push pending data
3460 * * Update track next_position
3461 * * recalculate global output position
3462 * * Pop next pending data from track and update pending position
3465 gboolean need_restart = FALSE;
3467 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3468 OutputSlot *slot = (OutputSlot *) tmp->data;
3469 GstAdaptiveDemuxTrack *track = slot->track;
3471 GST_LOG_OBJECT (track->element,
3472 "active:%d draining:%d selected:%d next_position:%" GST_STIME_FORMAT
3473 " global_output_position:%" GST_STIME_FORMAT, track->active,
3474 track->draining, track->selected, GST_STIME_ARGS (track->next_position),
3475 GST_STIME_ARGS (global_output_position));
3480 while (global_output_position == GST_CLOCK_STIME_NONE
3481 || !slot->pushed_timed_data
3482 || ((track->next_position != GST_CLOCK_STIME_NONE)
3483 && track->next_position <= global_output_position)
3484 || ((track->next_position == GST_CLOCK_STIME_NONE) && track->eos)) {
3485 GstMiniObject *mo = track_dequeue_data_locked (demux, track, TRUE);
3488 GST_DEBUG_OBJECT (demux,
3489 "Track '%s' (period %u) doesn't have any pending data (eos:%d pushed_timed_data:%d)",
3490 track->stream_id, track->period_num, track->eos,
3491 slot->pushed_timed_data);
3492 /* This should only happen if the track is EOS, or exactly in between
3493 * the parser outputting segment/caps before buffers. */
3494 g_assert (track->eos || !slot->pushed_timed_data);
3496 /* If we drained the track, but there's a pending track on the slot
3497 * loop again to activate it */
3498 if (slot->pending_track) {
3499 GST_DEBUG_OBJECT (demux,
3500 "Track '%s' (period %u) drained, but has a pending track to activate",
3501 track->stream_id, track->period_num);
3507 demux_update_buffering_locked (demux);
3508 demux_post_buffering_locked (demux);
3509 TRACKS_UNLOCK (demux);
3511 GST_DEBUG_OBJECT (demux,
3512 "Track '%s' (period %u) dequeued %" GST_PTR_FORMAT, track->stream_id,
3513 track->period_num, mo);
3515 if (GST_IS_EVENT (mo)) {
3516 GstEvent *event = (GstEvent *) mo;
3517 if (GST_EVENT_TYPE (event) == GST_EVENT_GAP) {
3518 slot->pushed_timed_data = TRUE;
3519 } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
3520 /* If there is a pending next period, don't send the EOS */
3521 if (demux->output_period->has_next_period) {
3522 GST_LOG_OBJECT (demux,
3523 "Dropping EOS on track '%s' (period %u) before next period",
3524 track->stream_id, track->period_num);
3525 gst_event_store_mark_delivered (&track->sticky_events, event);
3526 gst_event_unref (event);
3528 /* We'll need to re-check if all tracks are empty again above */
3529 need_restart = TRUE;
3533 if (event != NULL) {
3534 gst_pad_push_event (slot->pad, gst_event_ref (event));
3536 if (GST_EVENT_IS_STICKY (event))
3537 gst_event_store_mark_delivered (&track->sticky_events, event);
3538 gst_event_unref (event);
3540 } else if (GST_IS_BUFFER (mo)) {
3541 GstBuffer *buffer = (GstBuffer *) mo;
3543 if (track->output_discont) {
3544 if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT)) {
3545 buffer = gst_buffer_make_writable (buffer);
3546 GST_DEBUG_OBJECT (slot->pad,
3547 "track %s marking discont %" GST_PTR_FORMAT, track->stream_id,
3549 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
3551 track->output_discont = FALSE;
3553 slot->flow_ret = gst_pad_push (slot->pad, buffer);
3555 gst_flow_combiner_update_pad_flow (demux->priv->flowcombiner,
3556 slot->pad, slot->flow_ret);
3557 GST_DEBUG_OBJECT (slot->pad,
3558 "track %s (period %u) push returned %s (combined %s)",
3559 track->stream_id, track->period_num,
3560 gst_flow_get_name (slot->flow_ret), gst_flow_get_name (ret));
3561 slot->pushed_timed_data = TRUE;
3563 GST_ERROR ("Unhandled miniobject %" GST_PTR_FORMAT, mo);
3566 TRACKS_LOCK (demux);
3567 gst_adaptive_demux_track_update_next_position (track);
3569 if (ret != GST_FLOW_OK)
3574 /* Store global output position */
3575 if (global_output_position != GST_CLOCK_STIME_NONE) {
3576 demux->priv->global_output_position = global_output_position;
3578 /* And see if any streams need to be woken for more input */
3579 gst_adaptive_demux_period_check_input_wakeup_locked (demux->input_period,
3580 global_output_position);
3586 if (global_output_position == GST_CLOCK_STIME_NONE) {
3587 if (!demux->priv->flushing) {
3588 GST_DEBUG_OBJECT (demux,
3589 "Pausing output task after reaching NONE global_output_position");
3590 gst_task_pause (demux->priv->output_task);
3594 TRACKS_UNLOCK (demux);
3595 GST_DEBUG_OBJECT (demux, "leave");
3600 GST_DEBUG_OBJECT (demux, "Pausing due to %s", gst_flow_get_name (ret));
3601 /* If the flushing flag is set, then the task is being
3602 * externally stopped, so don't go to pause(), otherwise we
3603 * should so we don't keep spinning */
3604 if (!demux->priv->flushing) {
3605 GST_DEBUG_OBJECT (demux, "Pausing task due to %s",
3606 gst_flow_get_name (ret));
3607 gst_task_pause (demux->priv->output_task);
3610 TRACKS_UNLOCK (demux);
3612 if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
3613 GstEvent *eos = gst_event_new_eos ();
3615 if (ret != GST_FLOW_EOS) {
3616 GST_ELEMENT_FLOW_ERROR (demux, ret);
3619 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3620 if (demux->priv->segment_seqnum != GST_SEQNUM_INVALID)
3621 gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
3622 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3624 gst_adaptive_demux_push_src_event (demux, eos);
3631 /* must be called from the scheduler */
3633 gst_adaptive_demux_is_live (GstAdaptiveDemux * demux)
3635 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3638 return klass->is_live (demux);
3643 handle_manifest_download_complete (DownloadRequest * request,
3644 DownloadRequestState state, GstAdaptiveDemux * demux)
3646 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3648 GstFlowReturn result;
3650 g_free (demux->manifest_base_uri);
3651 g_free (demux->manifest_uri);
3653 if (request->redirect_permanent && request->redirect_uri) {
3654 demux->manifest_uri = g_strdup (request->redirect_uri);
3655 demux->manifest_base_uri = NULL;
3657 demux->manifest_uri = g_strdup (request->uri);
3658 demux->manifest_base_uri = g_strdup (request->redirect_uri);
3661 buffer = download_request_take_buffer (request);
3663 /* We should always have a buffer since this function is the non-error
3664 * callback for the download */
3667 result = klass->update_manifest_data (demux, buffer);
3668 gst_buffer_unref (buffer);
3670 /* FIXME: Should the manifest uri vars be reverted to original
3671 * values if updating fails? */
3673 if (result == GST_FLOW_OK) {
3674 GstClockTime duration;
3675 /* Send an updated duration message */
3676 duration = klass->get_duration (demux);
3677 if (duration != GST_CLOCK_TIME_NONE) {
3678 GST_DEBUG_OBJECT (demux,
3679 "Sending duration message : %" GST_TIME_FORMAT,
3680 GST_TIME_ARGS (duration));
3681 gst_element_post_message (GST_ELEMENT (demux),
3682 gst_message_new_duration_changed (GST_OBJECT (demux)));
3684 GST_DEBUG_OBJECT (demux,
3685 "Duration unknown, can not send the duration message");
3688 /* If a manifest changes it's liveness or periodic updateness, we need
3689 * to start/stop the manifest update task appropriately */
3690 /* Keep this condition in sync with the one in
3691 * gst_adaptive_demux_start_manifest_update_task()
3693 if (gst_adaptive_demux_is_live (demux) &&
3694 klass->requires_periodical_playlist_update (demux)) {
3695 gst_adaptive_demux_start_manifest_update_task (demux);
3697 gst_adaptive_demux_stop_manifest_update_task (demux);
3703 handle_manifest_download_failure (DownloadRequest * request,
3704 DownloadRequestState state, GstAdaptiveDemux * demux)
3706 GST_FIXME_OBJECT (demux, "Manifest download failed.");
3707 /* Retry or error out here */
3710 static GstFlowReturn
3711 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux)
3713 DownloadRequest *request;
3714 GstFlowReturn ret = GST_FLOW_OK;
3715 GError *error = NULL;
3717 request = download_request_new_uri (demux->manifest_uri);
3719 download_request_set_callbacks (request,
3720 (DownloadRequestEventCallback) handle_manifest_download_complete,
3721 (DownloadRequestEventCallback) handle_manifest_download_failure,
3724 if (!downloadhelper_submit_request (demux->download_helper, NULL,
3725 DOWNLOAD_FLAG_COMPRESS | DOWNLOAD_FLAG_FORCE_REFRESH, request,
3728 GST_ELEMENT_WARNING (demux, RESOURCE, FAILED,
3729 ("Failed to download manifest: %s", error->message), (NULL));
3730 g_clear_error (&error);
3732 ret = GST_FLOW_NOT_LINKED;
3738 /* must be called with manifest_lock taken */
3740 gst_adaptive_demux_update_manifest (GstAdaptiveDemux * demux)
3742 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3745 ret = klass->update_manifest (demux);
3750 /* must be called with manifest_lock taken */
3752 gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
3754 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3755 gboolean ret = FALSE;
3757 if (klass->has_next_period)
3758 ret = klass->has_next_period (demux);
3759 GST_DEBUG_OBJECT (demux, "Has next period: %d", ret);
3763 /* must be called with manifest_lock taken */
3765 gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
3767 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3768 GstAdaptiveDemuxPeriod *previous_period = demux->input_period;
3770 g_return_if_fail (klass->advance_period != NULL);
3772 GST_DEBUG_OBJECT (demux, "Advancing to next period");
3773 /* FIXME : no return value ? What if it fails ? */
3774 klass->advance_period (demux);
3776 if (previous_period == demux->input_period) {
3777 GST_ERROR_OBJECT (demux, "Advancing period failed");
3781 /* Stop the previous period stream tasks */
3782 gst_adaptive_demux_period_stop_tasks (previous_period);
3784 gst_adaptive_demux_update_collection (demux, demux->input_period);
3785 /* Figure out a pre-emptive selection based on the output period selection */
3786 gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
3787 demux->output_period);
3789 gst_adaptive_demux_prepare_streams (demux, FALSE);
3790 gst_adaptive_demux_start_tasks (demux);
3794 * gst_adaptive_demux_get_monotonic_time:
3795 * Returns: a monotonically increasing time, using the system realtime clock
3798 gst_adaptive_demux2_get_monotonic_time (GstAdaptiveDemux * demux)
3800 g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
3801 return gst_adaptive_demux_clock_get_time (demux->realtime_clock);
3805 * gst_adaptive_demux_get_client_now_utc:
3806 * @demux: #GstAdaptiveDemux
3807 * Returns: the client's estimate of UTC
3809 * Used to find the client's estimate of UTC, using the system realtime clock.
3812 gst_adaptive_demux2_get_client_now_utc (GstAdaptiveDemux * demux)
3814 return gst_adaptive_demux_clock_get_now_utc (demux->realtime_clock);
3818 * gst_adaptive_demux_is_running
3819 * @demux: #GstAdaptiveDemux
3820 * Returns: whether the demuxer is processing data
3822 * Returns FALSE if shutdown has started (transitioning down from
3823 * PAUSED), otherwise TRUE.
3826 gst_adaptive_demux2_is_running (GstAdaptiveDemux * demux)
3828 return g_atomic_int_get (&demux->running);
3832 * gst_adaptive_demux_get_qos_earliest_time:
3834 * Returns: The QOS earliest time
3839 gst_adaptive_demux2_get_qos_earliest_time (GstAdaptiveDemux * demux)
3841 GstClockTime earliest;
3843 GST_OBJECT_LOCK (demux);
3844 earliest = demux->priv->qos_earliest_time;
3845 GST_OBJECT_UNLOCK (demux);
3851 gst_adaptive_demux2_add_stream (GstAdaptiveDemux * demux,
3852 GstAdaptiveDemux2Stream * stream)
3854 g_return_val_if_fail (demux && stream, FALSE);
3856 /* FIXME : Migrate to parent */
3857 g_return_val_if_fail (stream->demux == NULL, FALSE);
3859 GST_DEBUG_OBJECT (demux, "Adding stream %s", GST_OBJECT_NAME (stream));
3861 TRACKS_LOCK (demux);
3862 if (demux->input_period->prepared) {
3863 GST_ERROR_OBJECT (demux,
3864 "Attempted to add streams but no new period was created");
3865 TRACKS_UNLOCK (demux);
3868 stream->demux = demux;
3869 stream->period = demux->input_period;
3870 demux->input_period->streams =
3871 g_list_append (demux->input_period->streams, stream);
3873 if (stream->tracks) {
3875 for (iter = stream->tracks; iter; iter = iter->next)
3876 if (!gst_adaptive_demux_period_add_track (demux->input_period,
3877 (GstAdaptiveDemuxTrack *) iter->data)) {
3878 GST_ERROR_OBJECT (demux, "Failed to add track elements");
3879 TRACKS_UNLOCK (demux);
3883 TRACKS_UNLOCK (demux);
3887 /* Return the current playback rate including any instant rate multiplier */
3889 gst_adaptive_demux_play_rate (GstAdaptiveDemux * demux)
3892 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3893 rate = demux->segment.rate * demux->instant_rate_multiplier;
3894 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);