3 * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4 * Author: Thiago Santos <thiagoss@osg.samsung.com>
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
23 * SECTION:gstadaptivedemux
24 * @short_description: Base class for adaptive demuxers
27 * What is an adaptive demuxer?
28 * Adaptive demuxers are special demuxers in the sense that they don't
29 * actually demux data received from upstream but download the data
32 * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and
33 * a set of fragments. The manifest describes the available media and
34 * the sequence of fragments to use. Each fragments contains a small
35 * part of the media (typically only a few seconds). It is possible for
36 * the manifest to have the same media available in different configurations
37 * (bitrates for example) so that the client can select the one that
38 * best suits its scenario (network fluctuation, hardware requirements...).
39 * It is possible to switch from one representation of the media to another
40 * during playback. That's why it is called 'adaptive', because it can be
41 * adapted to the client's needs.
43 * Architectural overview:
44 * The manifest is received by the demuxer in its sink pad and, upon receiving
45 * EOS, it parses the manifest and exposes the streams available in it. For
46 * each stream a source element will be created and will download the list
47 * of fragments one by one. Once a fragment is finished downloading, the next
48 * URI is set to the source element and it starts fetching it and pushing
49 * through the stream's pad. This implies that each stream is independent from
50 * each other as it runs on a separate thread.
52 * After downloading each fragment, the download rate of it is calculated and
53 * the demuxer has a chance to switch to a different bitrate if needed. The
54 * switch can be done by simply pushing a new caps before the next fragment
55 * when codecs are the same, or by exposing a new pad group if it needs
59 * - Not linked streams: Streams that are not-linked have their download threads
60 * interrupted to save network bandwidth. When they are
61 * relinked a reconfigure event is received and the
62 * stream is restarted.
65 * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
66 * about the intrinsics of the subclass formats, so the subclasses are
67 * resposible for maintaining the manifest data structures and stream
73 The following rules were observed while implementing MT safety in adaptive demux:
74 1. If a variable is accessed from multiple threads and at least one thread
75 writes to it, then all the accesses needs to be done from inside a critical section.
76 2. If thread A wants to join thread B then at the moment it calls gst_task_join
77 it must not hold any mutexes that thread B might take.
79 Adaptive demux API can be called from several threads. More, adaptive demux
80 starts some threads to monitor the download of fragments. In order to protect
81 accesses to shared variables (demux and streams) all the API functions that
82 can be run in different threads will need to get a mutex (manifest_lock)
83 when they start and release it when they end. Because some of those functions
84 can indirectly call other API functions (eg they can generate events or messages
85 that are processed in the same thread) the manifest_lock must be recursive.
87 The manifest_lock will serialize the public API making access to shared
88 variables safe. But some of these functions will try at some moment to join
89 threads created by adaptive demux, or to change the state of src elements
90 (which will block trying to join the src element streaming thread). Because
91 of rule 2, those functions will need to release the manifest_lock during the
92 call of gst_task_join. During this time they can be interrupted by other API calls.
93 For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
94 is called and this will join all threads. In order to prevent interruptions
95 during such period, all the API functions will also use a second lock: api_lock.
96 This will be taken at the beginning of the function and released at the end,
97 but this time this lock will not be temporarily released during join.
98 This lock will be used only by API calls (not by gst_adaptive_demux_stream_download_loop
99 or gst_adaptive_demux_updates_loop or _src_chain or _src_event) so it is safe
100 to hold it while joining the threads or changing the src element state. The
101 api_lock will serialise all external requests to adaptive demux. In order to
102 avoid deadlocks, if a function needs to acquire both manifest and api locks,
103 the api_lock will be taken first and the manifest_lock second.
105 By using the api_lock a thread is protected against other API calls. But when
106 temporarily dropping the manifest_lock, it will be vulnerable to changes from
107 threads that use only the manifest_lock and not the api_lock. These threads run
108 one of the following functions: gst_adaptive_demux_stream_download_loop,
109 gst_adaptive_demux_updates_loop, _src_chain, _src_event. In order to guarantee
110 that all operations during an API call are not impacted by other writes, the
111 above mentioned functions must check a cancelled flag every time they reacquire
112 the manifest_lock. If the flag is set, they must exit immediately, without
113 performing any changes on the shared data. In this way, an API call (eg seek
114 request) can set the cancel flag before releasing the manifest_lock and be sure
115 that the demux object and its streams are not changed by anybody else.
122 #include "gstadaptivedemux.h"
123 #include "gst/gst-i18n-plugin.h"
124 #include <gst/base/gstadapter.h>
126 GST_DEBUG_CATEGORY (adaptivedemux_debug);
127 #define GST_CAT_DEFAULT adaptivedemux_debug
129 #define GST_ADAPTIVE_DEMUX_GET_PRIVATE(obj) \
130 (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_ADAPTIVE_DEMUX, \
131 GstAdaptiveDemuxPrivate))
133 #define MAX_DOWNLOAD_ERROR_COUNT 3
134 #define DEFAULT_FAILED_COUNT 3
135 #define DEFAULT_CONNECTION_SPEED 0
136 #define DEFAULT_BITRATE_LIMIT 0.8
137 #define SRC_QUEUE_MAX_BYTES 20 * 1024 * 1024 /* For safety. Large enough to hold a segment. */
138 #define NUM_LOOKBACK_FRAGMENTS 3
140 #define GST_MANIFEST_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->manifest_lock))
141 #define GST_MANIFEST_LOCK(d) g_rec_mutex_lock (GST_MANIFEST_GET_LOCK (d));
142 #define GST_MANIFEST_UNLOCK(d) g_rec_mutex_unlock (GST_MANIFEST_GET_LOCK (d));
144 #define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
145 #define GST_API_LOCK(d) g_mutex_lock (GST_API_GET_LOCK (d));
146 #define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
148 #define GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK(d) (&GST_ADAPTIVE_DEMUX_CAST(d)->priv->segment_lock)
149 #define GST_ADAPTIVE_DEMUX_SEGMENT_LOCK(d) g_mutex_lock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
150 #define GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK(d) g_mutex_unlock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
155 PROP_CONNECTION_SPEED,
160 enum GstAdaptiveDemuxFlowReturn
162 GST_ADAPTIVE_DEMUX_FLOW_SWITCH = GST_FLOW_CUSTOM_SUCCESS_2 + 1
165 struct _GstAdaptiveDemuxPrivate
167 GstAdapter *input_adapter; /* protected by manifest_lock */
168 gboolean have_manifest; /* protected by manifest_lock */
170 GList *old_streams; /* protected by manifest_lock */
172 GstTask *updates_task; /* MT safe */
173 GRecMutex updates_lock;
174 GMutex updates_timed_lock;
175 GCond updates_timed_cond; /* protected by updates_timed_lock */
176 gboolean stop_updates_task; /* protected by updates_timed_lock */
178 /* used only from updates_task, no need to protect it */
179 gint update_failed_count;
181 guint32 segment_seqnum; /* protected by manifest_lock */
183 /* main lock used to protect adaptive demux and all its streams.
184 * It serializes the adaptive demux public API.
186 GRecMutex manifest_lock;
188 /* condition to wait for manifest updates on a live stream.
189 * In order to signal the manifest_cond, the caller needs to hold both
190 * manifest_lock and manifest_update_lock (taken in this order)
193 GMutex manifest_update_lock;
197 /* Protects demux and stream segment information
198 * Needed because seeks can update segment information
199 * without needing to stop tasks when they just want to
200 * update the segment boundaries */
204 typedef struct _GstAdaptiveDemuxTimer
206 volatile gint ref_count;
211 } GstAdaptiveDemuxTimer;
213 static GstBinClass *parent_class = NULL;
214 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
215 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
216 GstAdaptiveDemuxClass * klass);
217 static void gst_adaptive_demux_finalize (GObject * object);
218 static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
219 element, GstStateChange transition);
221 static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
223 static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
225 static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
226 GstObject * parent, GstBuffer * buffer);
227 static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
229 static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
233 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
235 static void gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux);
236 static void gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream *
238 static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
239 static gboolean gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux,
240 gboolean first_and_live);
241 static gboolean gst_adaptive_demux_is_live (GstAdaptiveDemux * demux);
242 static GstFlowReturn gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
243 GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
244 GstClockTime ts, GstClockTime * final_ts);
245 static gboolean gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux *
246 demux, GstAdaptiveDemuxStream * stream);
247 static gboolean gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
248 demux, GstAdaptiveDemuxStream * stream, guint64 bitrate);
250 gst_adaptive_demux_stream_update_fragment_info (GstAdaptiveDemux * demux,
251 GstAdaptiveDemuxStream * stream);
253 gst_adaptive_demux_stream_get_fragment_waiting_time (GstAdaptiveDemux * demux,
254 GstAdaptiveDemuxStream * stream);
255 static GstFlowReturn gst_adaptive_demux_update_manifest (GstAdaptiveDemux *
258 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
259 static gboolean gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux);
260 static void gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux);
262 static void gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream);
264 gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
267 static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux);
268 static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux);
269 static GstFlowReturn gst_adaptive_demux_combine_flows (GstAdaptiveDemux *
272 gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
273 stream, GstFlowReturn ret, GError * err);
275 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
276 GstAdaptiveDemuxStream * stream, GstBuffer * buffer);
278 gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
279 GstAdaptiveDemuxStream * stream);
281 gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
282 GstAdaptiveDemuxStream * stream, GstClockTime duration);
284 gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
285 GstClockTime end_time);
286 static gboolean gst_adaptive_demux_clock_callback (GstClock * clock,
287 GstClockTime time, GstClockID id, gpointer user_data);
289 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
290 * method to get to the padtemplates */
292 gst_adaptive_demux_get_type (void)
294 static volatile gsize type = 0;
296 if (g_once_init_enter (&type)) {
298 static const GTypeInfo info = {
299 sizeof (GstAdaptiveDemuxClass),
302 (GClassInitFunc) gst_adaptive_demux_class_init,
305 sizeof (GstAdaptiveDemux),
307 (GInstanceInitFunc) gst_adaptive_demux_init,
310 _type = g_type_register_static (GST_TYPE_BIN,
311 "GstAdaptiveDemux", &info, G_TYPE_FLAG_ABSTRACT);
312 g_once_init_leave (&type, _type);
318 gst_adaptive_demux_set_property (GObject * object, guint prop_id,
319 const GValue * value, GParamSpec * pspec)
321 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
323 GST_API_LOCK (demux);
324 GST_MANIFEST_LOCK (demux);
327 case PROP_CONNECTION_SPEED:
328 demux->connection_speed = g_value_get_uint (value) * 1000;
329 GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
330 demux->connection_speed);
332 case PROP_BITRATE_LIMIT:
333 demux->bitrate_limit = g_value_get_float (value);
336 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
340 GST_MANIFEST_UNLOCK (demux);
341 GST_API_UNLOCK (demux);
345 gst_adaptive_demux_get_property (GObject * object, guint prop_id,
346 GValue * value, GParamSpec * pspec)
348 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
350 GST_MANIFEST_LOCK (demux);
353 case PROP_CONNECTION_SPEED:
354 g_value_set_uint (value, demux->connection_speed / 1000);
356 case PROP_BITRATE_LIMIT:
357 g_value_set_float (value, demux->bitrate_limit);
360 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
364 GST_MANIFEST_UNLOCK (demux);
368 gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
370 GObjectClass *gobject_class;
371 GstElementClass *gstelement_class;
372 GstBinClass *gstbin_class;
374 gobject_class = G_OBJECT_CLASS (klass);
375 gstelement_class = GST_ELEMENT_CLASS (klass);
376 gstbin_class = GST_BIN_CLASS (klass);
378 GST_DEBUG_CATEGORY_INIT (adaptivedemux_debug, "adaptivedemux", 0,
379 "Base Adaptive Demux");
381 parent_class = g_type_class_peek_parent (klass);
382 g_type_class_add_private (klass, sizeof (GstAdaptiveDemuxPrivate));
384 gobject_class->set_property = gst_adaptive_demux_set_property;
385 gobject_class->get_property = gst_adaptive_demux_get_property;
386 gobject_class->finalize = gst_adaptive_demux_finalize;
388 g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
389 g_param_spec_uint ("connection-speed", "Connection Speed",
390 "Network connection speed in kbps (0 = calculate from downloaded"
391 " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_SPEED,
392 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
394 /* FIXME 2.0: rename this property to bandwidth-usage or any better name */
395 g_object_class_install_property (gobject_class, PROP_BITRATE_LIMIT,
396 g_param_spec_float ("bitrate-limit",
397 "Bitrate limit in %",
398 "Limit of the available bitrate to use when switching to alternates.",
399 0, 1, DEFAULT_BITRATE_LIMIT,
400 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
402 gstelement_class->change_state = gst_adaptive_demux_change_state;
404 gstbin_class->handle_message = gst_adaptive_demux_handle_message;
406 klass->data_received = gst_adaptive_demux_stream_data_received_default;
407 klass->finish_fragment = gst_adaptive_demux_stream_finish_fragment_default;
408 klass->update_manifest = gst_adaptive_demux_update_manifest_default;
412 gst_adaptive_demux_init (GstAdaptiveDemux * demux,
413 GstAdaptiveDemuxClass * klass)
415 GstPadTemplate *pad_template;
416 GstClockType clock_type = GST_CLOCK_TYPE_OTHER;
417 GObjectClass *gobject_class;
419 GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
421 demux->priv = GST_ADAPTIVE_DEMUX_GET_PRIVATE (demux);
422 demux->priv->input_adapter = gst_adapter_new ();
423 demux->downloader = gst_uri_downloader_new ();
424 demux->stream_struct_size = sizeof (GstAdaptiveDemuxStream);
425 demux->priv->segment_seqnum = gst_util_seqnum_next ();
426 demux->have_group_id = FALSE;
427 demux->group_id = G_MAXUINT;
429 gst_segment_init (&demux->segment, GST_FORMAT_TIME);
431 demux->realtime_clock = gst_system_clock_obtain ();
432 g_assert (demux->realtime_clock != NULL);
433 gobject_class = G_OBJECT_GET_CLASS (demux->realtime_clock);
434 if (g_object_class_find_property (gobject_class, "clock-type")) {
435 g_object_get (demux->realtime_clock, "clock-type", &clock_type, NULL);
437 GST_WARNING_OBJECT (demux,
438 "System clock does not have clock-type property");
440 if (clock_type == GST_CLOCK_TYPE_REALTIME) {
441 demux->clock_offset = 0;
444 GstClockTime rtc_now;
447 utc_now = g_date_time_new_now_utc ();
448 rtc_now = gst_clock_get_time (demux->realtime_clock);
449 g_date_time_to_timeval (utc_now, >v);
450 demux->clock_offset =
451 gtv.tv_sec * G_TIME_SPAN_SECOND + gtv.tv_usec -
452 GST_TIME_AS_USECONDS (rtc_now);
453 g_date_time_unref (utc_now);
455 g_rec_mutex_init (&demux->priv->updates_lock);
456 demux->priv->updates_task =
457 gst_task_new ((GstTaskFunction) gst_adaptive_demux_updates_loop,
459 gst_task_set_lock (demux->priv->updates_task, &demux->priv->updates_lock);
461 g_mutex_init (&demux->priv->updates_timed_lock);
462 g_cond_init (&demux->priv->updates_timed_cond);
464 g_cond_init (&demux->priv->manifest_cond);
465 g_mutex_init (&demux->priv->manifest_update_lock);
467 g_rec_mutex_init (&demux->priv->manifest_lock);
468 g_mutex_init (&demux->priv->api_lock);
469 g_mutex_init (&demux->priv->segment_lock);
472 gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
473 g_return_if_fail (pad_template != NULL);
475 demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
476 gst_pad_set_event_function (demux->sinkpad,
477 GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
478 gst_pad_set_chain_function (demux->sinkpad,
479 GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));
482 demux->bitrate_limit = DEFAULT_BITRATE_LIMIT;
483 demux->connection_speed = DEFAULT_CONNECTION_SPEED;
485 gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
489 gst_adaptive_demux_finalize (GObject * object)
491 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
492 GstAdaptiveDemuxPrivate *priv = demux->priv;
494 GST_DEBUG_OBJECT (object, "finalize");
496 g_object_unref (priv->input_adapter);
497 g_object_unref (demux->downloader);
499 g_mutex_clear (&priv->updates_timed_lock);
500 g_cond_clear (&priv->updates_timed_cond);
501 g_mutex_clear (&demux->priv->manifest_update_lock);
502 g_cond_clear (&demux->priv->manifest_cond);
503 g_object_unref (priv->updates_task);
504 g_rec_mutex_clear (&priv->updates_lock);
505 g_rec_mutex_clear (&demux->priv->manifest_lock);
506 g_mutex_clear (&demux->priv->api_lock);
507 g_mutex_clear (&demux->priv->segment_lock);
508 if (demux->realtime_clock) {
509 gst_object_unref (demux->realtime_clock);
510 demux->realtime_clock = NULL;
513 G_OBJECT_CLASS (parent_class)->finalize (object);
516 static GstStateChangeReturn
517 gst_adaptive_demux_change_state (GstElement * element,
518 GstStateChange transition)
520 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
521 GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
523 GST_API_LOCK (demux);
525 switch (transition) {
526 case GST_STATE_CHANGE_PAUSED_TO_READY:
527 GST_MANIFEST_LOCK (demux);
528 gst_adaptive_demux_reset (demux);
529 GST_MANIFEST_UNLOCK (demux);
535 /* this must be run without MANIFEST_LOCK taken.
536 * For PLAYING to PLAYING state changes, it will want to take a lock in
537 * src element and that lock is held while the streaming thread is running.
538 * The streaming thread will take the MANIFEST_LOCK, leading to a deadlock.
540 result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
542 GST_API_UNLOCK (demux);
547 gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
550 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
551 GstAdaptiveDemuxClass *demux_class;
554 switch (event->type) {
555 case GST_EVENT_FLUSH_STOP:
556 GST_API_LOCK (demux);
557 GST_MANIFEST_LOCK (demux);
559 gst_adaptive_demux_reset (demux);
561 ret = gst_pad_event_default (pad, parent, event);
563 GST_MANIFEST_UNLOCK (demux);
564 GST_API_UNLOCK (demux);
572 GstBuffer *manifest_buffer;
574 GST_API_LOCK (demux);
575 GST_MANIFEST_LOCK (demux);
577 demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
579 available = gst_adapter_available (demux->priv->input_adapter);
581 if (available == 0) {
582 GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
583 ret = gst_pad_event_default (pad, parent, event);
585 GST_MANIFEST_UNLOCK (demux);
586 GST_API_UNLOCK (demux);
591 GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
593 /* Need to get the URI to use it as a base to generate the fragment's
595 query = gst_query_new_uri ();
596 query_res = gst_pad_peer_query (pad, query);
598 gchar *uri, *redirect_uri;
601 gst_query_parse_uri (query, &uri);
602 gst_query_parse_uri_redirection (query, &redirect_uri);
603 gst_query_parse_uri_redirection_permanent (query, &permanent);
605 if (permanent && redirect_uri) {
606 demux->manifest_uri = redirect_uri;
607 demux->manifest_base_uri = NULL;
610 demux->manifest_uri = uri;
611 demux->manifest_base_uri = redirect_uri;
614 GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
615 demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
617 GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
619 gst_query_unref (query);
621 /* Let the subclass parse the manifest */
623 gst_adapter_take_buffer (demux->priv->input_adapter, available);
624 if (!demux_class->process_manifest (demux, manifest_buffer)) {
625 /* In most cases, this will happen if we set a wrong url in the
626 * source element and we have received the 404 HTML response instead of
628 GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."),
632 demux->priv->have_manifest = TRUE;
634 gst_buffer_unref (manifest_buffer);
636 gst_element_post_message (GST_ELEMENT_CAST (demux),
637 gst_message_new_element (GST_OBJECT_CAST (demux),
638 gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
639 "manifest-uri", G_TYPE_STRING,
640 demux->manifest_uri, "uri", G_TYPE_STRING,
642 "manifest-download-start", GST_TYPE_CLOCK_TIME,
644 "manifest-download-stop", GST_TYPE_CLOCK_TIME,
645 gst_util_get_timestamp (), NULL)));
648 /* Send duration message */
649 if (!gst_adaptive_demux_is_live (demux)) {
650 GstClockTime duration = demux_class->get_duration (demux);
652 if (duration != GST_CLOCK_TIME_NONE) {
653 GST_DEBUG_OBJECT (demux,
654 "Sending duration message : %" GST_TIME_FORMAT,
655 GST_TIME_ARGS (duration));
656 gst_element_post_message (GST_ELEMENT (demux),
657 gst_message_new_duration_changed (GST_OBJECT (demux)));
659 GST_DEBUG_OBJECT (demux,
660 "media duration unknown, can not send the duration message");
664 if (demux->next_streams) {
665 gst_adaptive_demux_expose_streams (demux,
666 gst_adaptive_demux_is_live (demux));
667 gst_adaptive_demux_start_tasks (demux);
668 if (gst_adaptive_demux_is_live (demux)) {
669 /* Task to periodically update the manifest */
670 gst_task_start (demux->priv->updates_task);
674 GST_WARNING_OBJECT (demux, "No streams created from manifest");
675 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
676 (_("This file contains no playable streams.")),
677 ("No known stream formats found at the Manifest"));
682 GST_MANIFEST_UNLOCK (demux);
683 GST_API_UNLOCK (demux);
685 gst_event_unref (event);
688 case GST_EVENT_SEGMENT:
689 /* Swallow newsegments, we'll push our own */
690 gst_event_unref (event);
696 return gst_pad_event_default (pad, parent, event);
700 gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
703 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
705 GST_MANIFEST_LOCK (demux);
707 gst_adapter_push (demux->priv->input_adapter, buffer);
709 GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
710 (gint) gst_adapter_available (demux->priv->input_adapter));
712 GST_MANIFEST_UNLOCK (demux);
716 /* must be called with manifest_lock taken */
718 gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
720 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
725 /* take ownership of old_streams before releasing the manifest_lock in
726 * gst_adaptive_demux_stop_tasks
728 old_streams = demux->priv->old_streams;
729 demux->priv->old_streams = NULL;
731 gst_adaptive_demux_stop_tasks (demux);
732 gst_uri_downloader_reset (demux->downloader);
735 klass->reset (demux);
737 eos = gst_event_new_eos ();
738 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
739 GstAdaptiveDemuxStream *stream = iter->data;
741 gst_pad_push_event (stream->pad, gst_event_ref (eos));
742 gst_pad_set_active (stream->pad, FALSE);
744 gst_element_remove_pad (GST_ELEMENT_CAST (demux), stream->pad);
746 gst_adaptive_demux_stream_free (stream);
748 gst_event_unref (eos);
749 g_list_free (demux->streams);
750 demux->streams = NULL;
753 g_list_free_full (old_streams,
754 (GDestroyNotify) gst_adaptive_demux_stream_free);
757 g_free (demux->manifest_uri);
758 g_free (demux->manifest_base_uri);
759 demux->manifest_uri = NULL;
760 demux->manifest_base_uri = NULL;
762 gst_adapter_clear (demux->priv->input_adapter);
763 demux->priv->have_manifest = FALSE;
765 gst_segment_init (&demux->segment, GST_FORMAT_TIME);
767 demux->have_group_id = FALSE;
768 demux->group_id = G_MAXUINT;
769 demux->priv->segment_seqnum = gst_util_seqnum_next ();
773 gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
775 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);
777 switch (GST_MESSAGE_TYPE (msg)) {
778 case GST_MESSAGE_ERROR:{
780 GstAdaptiveDemuxStream *stream;
783 gchar *new_error = NULL;
785 GST_MANIFEST_LOCK (demux);
787 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
789 if (gst_object_has_as_ancestor (GST_MESSAGE_SRC (msg),
790 GST_OBJECT_CAST (stream->src))) {
791 gst_message_parse_error (msg, &err, &debug);
793 GST_WARNING_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
794 "Source posted error: %d:%d %s (%s)", err->domain, err->code,
795 err->message, debug);
798 new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
800 g_free (err->message);
801 err->message = new_error;
804 /* error, but ask to retry */
805 gst_adaptive_demux_stream_fragment_download_finish (stream,
806 GST_FLOW_CUSTOM_ERROR, err);
814 GST_MANIFEST_UNLOCK (demux);
816 gst_message_unref (msg);
825 GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
829 gst_adaptive_demux_set_stream_struct_size (GstAdaptiveDemux * demux,
832 GST_API_LOCK (demux);
833 GST_MANIFEST_LOCK (demux);
834 demux->stream_struct_size = struct_size;
835 GST_MANIFEST_UNLOCK (demux);
836 GST_API_UNLOCK (demux);
839 /* must be called with manifest_lock taken */
841 gst_adaptive_demux_expose_stream (GstAdaptiveDemux * demux,
842 GstAdaptiveDemuxStream * stream)
844 GstPad *pad = stream->pad;
845 gchar *name = gst_pad_get_name (pad);
849 gst_pad_set_active (pad, TRUE);
850 stream->need_header = TRUE;
852 stream_id = gst_pad_create_stream_id (pad, GST_ELEMENT_CAST (demux), name);
855 gst_pad_get_sticky_event (GST_ADAPTIVE_DEMUX_SINK_PAD (demux),
856 GST_EVENT_STREAM_START, 0);
858 if (gst_event_parse_group_id (event, &demux->group_id))
859 demux->have_group_id = TRUE;
861 demux->have_group_id = FALSE;
862 gst_event_unref (event);
863 } else if (!demux->have_group_id) {
864 demux->have_group_id = TRUE;
865 demux->group_id = gst_util_group_id_next ();
867 event = gst_event_new_stream_start (stream_id);
868 if (demux->have_group_id)
869 gst_event_set_group_id (event, demux->group_id);
871 gst_pad_push_event (pad, event);
875 GST_DEBUG_OBJECT (demux, "Adding srcpad %s:%s with caps %" GST_PTR_FORMAT,
876 GST_DEBUG_PAD_NAME (pad), stream->pending_caps);
878 if (stream->pending_caps) {
879 gst_pad_set_caps (pad, stream->pending_caps);
880 gst_caps_unref (stream->pending_caps);
881 stream->pending_caps = NULL;
884 stream->discont = TRUE;
886 gst_object_ref (pad);
888 return gst_element_add_pad (GST_ELEMENT_CAST (demux), pad);
891 /* must be called with manifest_lock taken */
893 gst_adaptive_demux_stream_get_presentation_offset (GstAdaptiveDemux * demux,
894 GstAdaptiveDemuxStream * stream)
896 GstAdaptiveDemuxClass *klass;
898 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
900 if (klass->get_presentation_offset == NULL)
903 return klass->get_presentation_offset (demux, stream);
906 /* must be called with manifest_lock taken */
908 gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
910 GstAdaptiveDemuxClass *klass;
912 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
914 if (klass->get_period_start_time == NULL)
917 return klass->get_period_start_time (demux);
920 /* must be called with manifest_lock taken */
922 gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux,
923 gboolean first_and_live)
927 GstClockTime period_start, min_pts = GST_CLOCK_TIME_NONE;
929 g_return_val_if_fail (demux->next_streams != NULL, FALSE);
931 old_streams = demux->streams;
932 demux->streams = demux->next_streams;
933 demux->next_streams = NULL;
935 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
936 GstAdaptiveDemuxStream *stream = iter->data;
938 if (!gst_adaptive_demux_expose_stream (demux,
939 GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
940 /* TODO act on error */
943 if (first_and_live) {
944 /* TODO we only need the first timestamp, maybe create a simple function */
945 gst_adaptive_demux_stream_update_fragment_info (demux, stream);
947 if (GST_CLOCK_TIME_IS_VALID (min_pts)) {
948 min_pts = MIN (min_pts, stream->fragment.timestamp);
950 min_pts = stream->fragment.timestamp;
955 /* For live streams, the subclass is supposed to seek to the current
956 * fragment and then tell us its timestamp in stream->fragment.timestamp.
957 * We now also have to seek our demuxer segment to reflect this.
959 * FIXME: This needs some refactoring at some point.
961 if (first_and_live) {
962 gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
963 GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_pts, GST_SEEK_TYPE_NONE, -1,
967 period_start = gst_adaptive_demux_get_period_start_time (demux);
969 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
970 GstAdaptiveDemuxStream *stream = iter->data;
973 offset = gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
974 stream->segment = demux->segment;
976 /* The demuxer segment is just built from seek events, but for each stream
977 * we have to adjust segments according to the current period and the
978 * stream specific presentation time offset.
980 * For each period, buffer timestamps start again from 0. Additionally the
981 * buffer timestamps are shifted by the stream specific presentation time
982 * offset, so the first buffer timestamp of a period is 0 + presentation
983 * time offset. If the stream contains timestamps itself, this is also
984 * supposed to be the presentation time stored inside the stream.
986 * The stream time over periods is supposed to be continuous, that is the
987 * buffer timestamp 0 + presentation time offset should map to the start
988 * time of the current period.
991 * The adjustment of the stream segments as such works the following.
993 * If the demuxer segment start is bigger than the period start, this
994 * means that we have to drop some media at the beginning of the current
995 * period, e.g. because a seek into the middle of the period has
996 * happened. The amount of media to drop is the difference between the
997 * period start and the demuxer segment start, and as each period starts
998 * again from 0, this difference is going to be the actual stream's
999 * segment start. As all timestamps of the stream are shifted by the
1000 * presentation time offset, we will also have to move the segment start
1003 * Now the running time and stream time at the stream's segment start has to
1004 * be the one that is stored inside the demuxer's segment, which means
1005 * that segment.base and segment.time have to be copied over.
1008 * If the demuxer segment start is smaller than the period start time,
1009 * this means that the whole period is inside the segment. As each period
1010 * starts timestamps from 0, and additionally timestamps are shifted by
1011 * the presentation time offset, the stream's first timestamp (and as such
1012 * the stream's segment start) has to be the presentation time offset.
1013 * The stream time at the segment start is supposed to be the stream time
1014 * of the period start according to the demuxer segment, so the stream
1015 * segment's time would be set to that. The same goes for the stream
1016 * segment's base, which is supposed to be the running time of the period
1017 * start according to the demuxer's segment.
1020 * For the first case where not the complete period is inside the segment,
1021 * the segment time and base as calculated by the second case would be
1025 if (demux->segment.start > period_start) {
1026 stream->segment.start = demux->segment.start - period_start + offset;
1027 stream->segment.position = offset;
1028 stream->segment.time = demux->segment.time;
1029 stream->segment.base = demux->segment.base;
1031 stream->segment.start = offset;
1032 stream->segment.position = offset;
1033 stream->segment.time =
1034 gst_segment_to_stream_time (&demux->segment, GST_FORMAT_TIME,
1036 stream->segment.base =
1037 gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
1041 stream->pending_segment = gst_event_new_segment (&stream->segment);
1042 gst_event_set_seqnum (stream->pending_segment, demux->priv->segment_seqnum);
1045 gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
1048 GstEvent *eos = gst_event_new_eos ();
1050 /* before we put streams in the demux->priv->old_streams list,
1051 * we ask the download task to stop. In this way, it will no longer be
1052 * allowed to change the demux object.
1054 for (iter = old_streams; iter; iter = g_list_next (iter)) {
1055 GstAdaptiveDemuxStream *stream = iter->data;
1057 GST_LOG_OBJECT (stream->pad, "Removing stream");
1058 gst_pad_push_event (stream->pad, gst_event_ref (eos));
1059 gst_pad_set_active (stream->pad, FALSE);
1060 gst_element_remove_pad (GST_ELEMENT (demux), stream->pad);
1062 /* ask the download task to stop.
1063 * We will not join it now, because our thread can be one of these tasks.
1064 * We will do the joining later, from another stream download task or
1065 * from gst_adaptive_demux_stop_tasks.
1066 * We also cannot change the state of the stream->src element, because
1067 * that will wait on the streaming thread (which could be this thread)
1069 * Because we sent an EOS to the downstream element, the stream->src
1070 * element should detect this in its streaming task and stop.
1071 * Even if it doesn't do that, we will change its state later in
1072 * gst_adaptive_demux_stop_tasks.
1074 gst_task_stop (stream->download_task);
1075 g_mutex_lock (&stream->fragment_download_lock);
1076 stream->cancelled = TRUE;
1077 g_cond_signal (&stream->fragment_download_cond);
1078 g_mutex_unlock (&stream->fragment_download_lock);
1080 gst_event_unref (eos);
1082 /* The list should be freed from another thread as we can't properly
1083 * cleanup a GstTask from itself */
1084 demux->priv->old_streams =
1085 g_list_concat (demux->priv->old_streams, old_streams);
1091 /* must be called with manifest_lock taken */
1092 GstAdaptiveDemuxStream *
1093 gst_adaptive_demux_stream_new (GstAdaptiveDemux * demux, GstPad * pad)
1095 GstAdaptiveDemuxStream *stream;
1097 stream = g_malloc0 (demux->stream_struct_size);
1099 /* Downloading task */
1100 g_rec_mutex_init (&stream->download_lock);
1101 stream->download_task =
1102 gst_task_new ((GstTaskFunction) gst_adaptive_demux_stream_download_loop,
1104 gst_task_set_lock (stream->download_task, &stream->download_lock);
1107 stream->demux = demux;
1108 stream->fragment_bitrates =
1109 g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS);
1110 gst_pad_set_element_private (pad, stream);
1112 gst_pad_set_query_function (pad,
1113 GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
1114 gst_pad_set_event_function (pad,
1115 GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_event));
1117 gst_segment_init (&stream->segment, GST_FORMAT_TIME);
1118 g_cond_init (&stream->fragment_download_cond);
1119 g_mutex_init (&stream->fragment_download_lock);
1121 demux->next_streams = g_list_append (demux->next_streams, stream);
1126 GstAdaptiveDemuxStream *
1127 gst_adaptive_demux_find_stream_for_pad (GstAdaptiveDemux * demux, GstPad * pad)
1131 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1132 GstAdaptiveDemuxStream *stream = iter->data;
1133 if (stream->pad == pad) {
1140 /* must be called with manifest_lock taken.
1141 * It will temporarily drop the manifest_lock in order to join the task.
1142 * It will join only the old_streams (the demux->streams are joined by
1143 * gst_adaptive_demux_stop_tasks before gst_adaptive_demux_stream_free is
1147 gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream)
1149 GstAdaptiveDemux *demux = stream->demux;
1150 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1152 if (klass->stream_free)
1153 klass->stream_free (stream);
1155 g_clear_error (&stream->last_error);
1156 if (stream->download_task) {
1157 if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) {
1158 GST_DEBUG_OBJECT (demux, "Leaving streaming task %s:%s",
1159 GST_DEBUG_PAD_NAME (stream->pad));
1161 gst_task_stop (stream->download_task);
1163 g_mutex_lock (&stream->fragment_download_lock);
1164 stream->cancelled = TRUE;
1165 g_cond_signal (&stream->fragment_download_cond);
1166 g_mutex_unlock (&stream->fragment_download_lock);
1168 GST_LOG_OBJECT (demux, "Waiting for task to finish");
1170 /* temporarily drop the manifest lock to join the task */
1171 GST_MANIFEST_UNLOCK (demux);
1173 gst_task_join (stream->download_task);
1175 GST_MANIFEST_LOCK (demux);
1177 GST_LOG_OBJECT (demux, "Finished");
1178 gst_object_unref (stream->download_task);
1179 g_rec_mutex_clear (&stream->download_lock);
1180 stream->download_task = NULL;
1183 gst_adaptive_demux_stream_fragment_clear (&stream->fragment);
1185 if (stream->pending_segment) {
1186 gst_event_unref (stream->pending_segment);
1187 stream->pending_segment = NULL;
1190 if (stream->pending_events) {
1191 g_list_free_full (stream->pending_events, (GDestroyNotify) gst_event_unref);
1192 stream->pending_events = NULL;
1195 if (stream->internal_pad) {
1196 gst_object_unparent (GST_OBJECT_CAST (stream->internal_pad));
1199 if (stream->src_srcpad) {
1200 gst_object_unref (stream->src_srcpad);
1201 stream->src_srcpad = NULL;
1205 gst_element_set_state (stream->src, GST_STATE_NULL);
1206 gst_bin_remove (GST_BIN_CAST (demux), stream->src);
1210 g_cond_clear (&stream->fragment_download_cond);
1211 g_mutex_clear (&stream->fragment_download_lock);
1212 g_free (stream->fragment_bitrates);
1215 gst_object_unref (stream->pad);
1218 if (stream->pending_caps)
1219 gst_caps_unref (stream->pending_caps);
1221 g_clear_pointer (&stream->pending_tags, gst_tag_list_unref);
1226 /* must be called with manifest_lock taken */
1228 gst_adaptive_demux_get_live_seek_range (GstAdaptiveDemux * demux,
1229 gint64 * range_start, gint64 * range_stop)
1231 GstAdaptiveDemuxClass *klass;
1233 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1235 g_return_val_if_fail (klass->get_live_seek_range, FALSE);
1237 return klass->get_live_seek_range (demux, range_start, range_stop);
1240 /* must be called with manifest_lock taken */
1242 gst_adaptive_demux_can_seek (GstAdaptiveDemux * demux)
1244 GstAdaptiveDemuxClass *klass;
1246 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1247 if (gst_adaptive_demux_is_live (demux)) {
1248 return klass->get_live_seek_range != NULL;
1251 return klass->seek != NULL;
1254 #define IS_SNAP_SEEK(f) (f & (GST_SEEK_FLAG_SNAP_BEFORE | \
1255 GST_SEEK_FLAG_SNAP_AFTER | \
1256 GST_SEEK_FLAG_SNAP_NEAREST))
1257 #define REMOVE_SNAP_FLAGS(f) (f & ~(GST_SEEK_FLAG_SNAP_BEFORE | \
1258 GST_SEEK_FLAG_SNAP_AFTER | \
1259 GST_SEEK_FLAG_SNAP_NEAREST))
1262 gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux, GstPad * pad,
1265 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1269 GstSeekType start_type, stop_type;
1274 GstSegment oldsegment;
1275 GstAdaptiveDemuxStream *stream = NULL;
1277 GST_INFO_OBJECT (demux, "Received seek event");
1279 GST_API_LOCK (demux);
1280 GST_MANIFEST_LOCK (demux);
1282 if (!gst_adaptive_demux_can_seek (demux)) {
1283 GST_MANIFEST_UNLOCK (demux);
1284 GST_API_UNLOCK (demux);
1285 gst_event_unref (event);
1289 gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
1292 if (format != GST_FORMAT_TIME) {
1293 GST_MANIFEST_UNLOCK (demux);
1294 GST_API_UNLOCK (demux);
1295 gst_event_unref (event);
1299 if (gst_adaptive_demux_is_live (demux)) {
1300 gint64 range_start, range_stop;
1301 if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
1303 GST_MANIFEST_UNLOCK (demux);
1304 GST_API_UNLOCK (demux);
1305 gst_event_unref (event);
1308 if (start < range_start || start >= range_stop) {
1309 GST_MANIFEST_UNLOCK (demux);
1310 GST_API_UNLOCK (demux);
1311 GST_WARNING_OBJECT (demux, "Seek to invalid position");
1312 gst_event_unref (event);
1317 seqnum = gst_event_get_seqnum (event);
1319 GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
1321 /* have a backup in case seek fails */
1322 gst_segment_copy_into (&demux->segment, &oldsegment);
1324 if (flags & GST_SEEK_FLAG_FLUSH) {
1327 GST_DEBUG_OBJECT (demux, "sending flush start");
1328 fevent = gst_event_new_flush_start ();
1329 gst_event_set_seqnum (fevent, seqnum);
1330 gst_adaptive_demux_push_src_event (demux, fevent);
1332 gst_adaptive_demux_stop_tasks (demux);
1333 } else if ((rate > 0 && start_type != GST_SEEK_TYPE_NONE) ||
1334 (rate < 0 && stop_type != GST_SEEK_TYPE_NONE)) {
1336 gst_adaptive_demux_stop_tasks (demux);
1339 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1342 * Handle snap seeks as follows:
1343 * 1) do the snap seeking on the stream that received
1345 * 2) use the final position on this stream to seek
1346 * on the other streams to the same position
1348 * We can't snap at all streams at the same time as
1349 * they might end in different positions, so just
1350 * use the one that received the event as the 'leading'
1351 * one to do the snap seek.
1353 if (IS_SNAP_SEEK (flags) && demux_class->stream_seek && (stream =
1354 gst_adaptive_demux_find_stream_for_pad (demux, pad))) {
1356 GstSeekFlags stream_seek_flags = flags;
1358 /* snap-seek on the stream that received the event and then
1359 * use the resulting position to seek on all streams */
1362 if (start_type != GST_SEEK_TYPE_NONE)
1365 ts = stream->segment.position;
1366 start_type = GST_SEEK_TYPE_SET;
1369 if (stop_type != GST_SEEK_TYPE_NONE)
1372 stop_type = GST_SEEK_TYPE_SET;
1373 ts = stream->segment.position;
1377 demux_class->stream_seek (stream, rate >= 0, stream_seek_flags, ts, &ts);
1379 /* replace event with a new one without snaping to seek on all streams */
1380 gst_event_unref (event);
1387 gst_event_new_seek (rate, format, REMOVE_SNAP_FLAGS (flags),
1388 start_type, start, stop_type, stop);
1389 GST_DEBUG_OBJECT (demux, "Adapted snap seek to %" GST_PTR_FORMAT, event);
1393 gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
1394 start, stop_type, stop, &update);
1396 /* FIXME - this seems unatural, do_seek() is updating base when we
1397 * only want the start/stop position to change, maybe do_seek() needs
1399 if (!(flags & GST_SEEK_FLAG_FLUSH) && ((rate > 0
1400 && start_type == GST_SEEK_TYPE_NONE) || (rate < 0
1401 && stop_type == GST_SEEK_TYPE_NONE))) {
1402 demux->segment.base = oldsegment.base;
1405 GST_DEBUG_OBJECT (demux, "Calling subclass seek: %" GST_PTR_FORMAT, event);
1407 ret = demux_class->seek (demux, event);
1410 /* Is there anything else we can do if it fails? */
1411 gst_segment_copy_into (&oldsegment, &demux->segment);
1413 demux->priv->segment_seqnum = seqnum;
1415 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1417 if (flags & GST_SEEK_FLAG_FLUSH) {
1420 GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
1421 fevent = gst_event_new_flush_stop (TRUE);
1422 gst_event_set_seqnum (fevent, seqnum);
1423 gst_adaptive_demux_push_src_event (demux, fevent);
1426 if (demux->next_streams) {
1427 gst_adaptive_demux_expose_streams (demux, FALSE);
1430 GstClockTime period_start =
1431 gst_adaptive_demux_get_period_start_time (demux);
1433 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1434 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1435 GstAdaptiveDemuxStream *stream = iter->data;
1437 GstClockTime offset;
1439 /* See comments in gst_adaptive_demux_get_period_start_time() for
1440 * an explanation of the segment modifications */
1441 stream->segment = demux->segment;
1443 gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
1444 stream->segment.start += offset - period_start;
1445 if (demux->segment.rate > 0 && start_type != GST_SEEK_TYPE_NONE)
1446 stream->segment.position = stream->segment.start;
1447 else if (demux->segment.rate < 0 && stop_type != GST_SEEK_TYPE_NONE)
1448 stream->segment.position = stream->segment.stop;
1449 seg_evt = gst_event_new_segment (&stream->segment);
1450 gst_event_set_seqnum (seg_evt, demux->priv->segment_seqnum);
1451 gst_event_replace (&stream->pending_segment, seg_evt);
1452 gst_event_unref (seg_evt);
1453 /* Make sure the first buffer after a seek has the discont flag */
1454 stream->discont = TRUE;
1457 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1460 /* Restart the demux */
1461 gst_adaptive_demux_start_tasks (demux);
1462 GST_MANIFEST_UNLOCK (demux);
1463 GST_API_UNLOCK (demux);
1464 gst_event_unref (event);
1470 gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
1473 GstAdaptiveDemux *demux;
1475 demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1477 /* FIXME handle events received on pads that are to be removed */
1479 switch (event->type) {
1480 case GST_EVENT_SEEK:
1482 return gst_adaptive_demux_handle_seek_event (demux, pad, event);
1484 case GST_EVENT_RECONFIGURE:{
1485 GstAdaptiveDemuxStream *stream;
1487 GST_MANIFEST_LOCK (demux);
1488 stream = gst_adaptive_demux_find_stream_for_pad (demux, pad);
1491 if (stream->last_ret == GST_FLOW_NOT_LINKED) {
1492 stream->last_ret = GST_FLOW_OK;
1493 stream->restart_download = TRUE;
1494 stream->need_header = TRUE;
1495 stream->discont = TRUE;
1496 GST_DEBUG_OBJECT (stream->pad, "Restarting download loop");
1497 gst_task_start (stream->download_task);
1499 gst_event_unref (event);
1500 GST_MANIFEST_UNLOCK (demux);
1503 GST_MANIFEST_UNLOCK (demux);
1506 case GST_EVENT_LATENCY:{
1507 /* Upstream and our internal source are irrelevant
1508 * for latency, and we should not fail here to
1509 * configure the latency */
1510 gst_event_unref (event);
1517 return gst_pad_event_default (pad, parent, event);
1521 gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
1524 GstAdaptiveDemux *demux;
1525 GstAdaptiveDemuxClass *demux_class;
1526 gboolean ret = FALSE;
1531 demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1532 demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1534 switch (query->type) {
1535 case GST_QUERY_DURATION:{
1536 GstClockTime duration = -1;
1539 gst_query_parse_duration (query, &fmt, NULL);
1541 GST_MANIFEST_LOCK (demux);
1543 if (fmt == GST_FORMAT_TIME && demux->priv->have_manifest
1544 && !gst_adaptive_demux_is_live (demux)) {
1545 duration = demux_class->get_duration (demux);
1547 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
1548 gst_query_set_duration (query, GST_FORMAT_TIME, duration);
1553 GST_MANIFEST_UNLOCK (demux);
1555 GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
1556 GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
1559 case GST_QUERY_LATENCY:{
1560 gst_query_set_latency (query, FALSE, 0, -1);
1564 case GST_QUERY_SEEKING:{
1569 GST_MANIFEST_LOCK (demux);
1571 if (!demux->priv->have_manifest) {
1572 GST_MANIFEST_UNLOCK (demux);
1573 GST_INFO_OBJECT (demux,
1574 "Don't have manifest yet, can't answer seeking query");
1575 return FALSE; /* can't answer without manifest */
1578 gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
1579 GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
1580 if (fmt == GST_FORMAT_TIME) {
1581 GstClockTime duration;
1582 gboolean can_seek = gst_adaptive_demux_can_seek (demux);
1586 if (gst_adaptive_demux_is_live (demux)) {
1587 ret = gst_adaptive_demux_get_live_seek_range (demux, &start, &stop);
1589 GST_MANIFEST_UNLOCK (demux);
1590 GST_INFO_OBJECT (demux, "can't answer seeking query");
1594 duration = demux_class->get_duration (demux);
1595 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
1599 gst_query_set_seeking (query, fmt, can_seek, start, stop);
1600 GST_INFO_OBJECT (demux, "GST_QUERY_SEEKING returning with start : %"
1601 GST_TIME_FORMAT ", stop : %" GST_TIME_FORMAT,
1602 GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
1604 GST_MANIFEST_UNLOCK (demux);
1609 GST_MANIFEST_LOCK (demux);
1611 /* TODO HLS can answer this differently it seems */
1612 if (demux->manifest_uri) {
1613 /* FIXME: (hls) Do we answer with the variant playlist, with the current
1614 * playlist or the the uri of the last downlowaded fragment? */
1615 gst_query_set_uri (query, demux->manifest_uri);
1619 GST_MANIFEST_UNLOCK (demux);
1622 /* Don't forward queries upstream because of the special nature of this
1623 * "demuxer", which relies on the upstream element only to be fed
1632 /* must be called with manifest_lock taken */
1634 gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux)
1638 GST_INFO_OBJECT (demux, "Starting streams' tasks");
1639 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1640 GstAdaptiveDemuxStream *stream = iter->data;
1642 g_mutex_lock (&stream->fragment_download_lock);
1643 stream->cancelled = FALSE;
1644 g_mutex_unlock (&stream->fragment_download_lock);
1646 stream->last_ret = GST_FLOW_OK;
1647 gst_task_start (stream->download_task);
1651 /* must be called with manifest_lock taken
1652 * This function will temporarily release manifest_lock in order to join the
1654 * The api_lock will still protect it against other threads trying to modify
1655 * the demux element.
1658 gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux)
1662 gst_task_stop (demux->priv->updates_task);
1664 g_mutex_lock (&demux->priv->updates_timed_lock);
1665 demux->priv->stop_updates_task = TRUE;
1666 g_cond_signal (&demux->priv->updates_timed_cond);
1667 g_mutex_unlock (&demux->priv->updates_timed_lock);
1669 gst_uri_downloader_cancel (demux->downloader);
1671 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1672 GstAdaptiveDemuxStream *stream = iter->data;
1674 g_mutex_lock (&stream->fragment_download_lock);
1675 stream->cancelled = TRUE;
1676 gst_task_stop (stream->download_task);
1677 g_cond_signal (&stream->fragment_download_cond);
1678 g_mutex_unlock (&stream->fragment_download_lock);
1681 g_mutex_lock (&demux->priv->manifest_update_lock);
1682 g_cond_broadcast (&demux->priv->manifest_cond);
1683 g_mutex_unlock (&demux->priv->manifest_update_lock);
1685 /* need to release manifest_lock before stopping the src element.
1686 * The streams were asked to cancel, so they will not make any writes to demux
1687 * object. Even if we temporarily release manifest_lock, the demux->streams
1688 * cannot change and iter cannot be invalidated.
1690 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1691 GstAdaptiveDemuxStream *stream = iter->data;
1692 GstElement *src = stream->src;
1694 GST_MANIFEST_UNLOCK (demux);
1697 gst_element_set_state (src, GST_STATE_READY);
1700 /* stream->download_task value never changes, so it is safe to read it
1701 * outside critical section
1703 gst_task_join (stream->download_task);
1705 GST_MANIFEST_LOCK (demux);
1708 GST_MANIFEST_UNLOCK (demux);
1710 /* demux->priv->updates_task value never changes, so it is safe to read it
1711 * outside critical section
1713 gst_task_join (demux->priv->updates_task);
1715 GST_MANIFEST_LOCK (demux);
1717 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1718 GstAdaptiveDemuxStream *stream = iter->data;
1720 stream->download_error_count = 0;
1721 stream->need_header = TRUE;
1725 /* must be called with manifest_lock taken */
1727 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event)
1730 gboolean ret = TRUE;
1732 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1733 GstAdaptiveDemuxStream *stream = iter->data;
1734 gst_event_ref (event);
1735 ret = ret & gst_pad_push_event (stream->pad, event);
1737 gst_event_unref (event);
1741 /* must be called with manifest_lock taken */
1743 gst_adaptive_demux_stream_set_caps (GstAdaptiveDemuxStream * stream,
1746 GST_DEBUG_OBJECT (stream->pad, "setting new caps for stream %" GST_PTR_FORMAT,
1748 gst_caps_replace (&stream->pending_caps, caps);
1749 gst_caps_unref (caps);
1752 /* must be called with manifest_lock taken */
1754 gst_adaptive_demux_stream_set_tags (GstAdaptiveDemuxStream * stream,
1757 GST_DEBUG_OBJECT (stream->pad, "setting new tags for stream %" GST_PTR_FORMAT,
1759 if (stream->pending_tags) {
1760 gst_tag_list_unref (stream->pending_tags);
1762 stream->pending_tags = tags;
1765 /* must be called with manifest_lock taken */
1767 gst_adaptive_demux_stream_queue_event (GstAdaptiveDemuxStream * stream,
1770 stream->pending_events = g_list_append (stream->pending_events, event);
1773 /* must be called with manifest_lock taken */
1775 _update_average_bitrate (GstAdaptiveDemux * demux,
1776 GstAdaptiveDemuxStream * stream, guint64 new_bitrate)
1778 gint index = stream->moving_index % NUM_LOOKBACK_FRAGMENTS;
1780 stream->moving_bitrate -= stream->fragment_bitrates[index];
1781 stream->fragment_bitrates[index] = new_bitrate;
1782 stream->moving_bitrate += new_bitrate;
1784 stream->moving_index += 1;
1786 if (stream->moving_index > NUM_LOOKBACK_FRAGMENTS)
1787 return stream->moving_bitrate / NUM_LOOKBACK_FRAGMENTS;
1788 return stream->moving_bitrate / stream->moving_index;
1791 /* must be called with manifest_lock taken */
1793 gst_adaptive_demux_stream_update_current_bitrate (GstAdaptiveDemux * demux,
1794 GstAdaptiveDemuxStream * stream)
1796 guint64 average_bitrate;
1797 guint64 fragment_bitrate;
1799 if (demux->connection_speed) {
1800 GST_LOG_OBJECT (demux, "Connection-speed is set to %u kbps, using it",
1801 demux->connection_speed / 1000);
1802 return demux->connection_speed;
1805 g_object_get (stream->queue, "avg-in-rate", &fragment_bitrate, NULL);
1806 fragment_bitrate *= 8;
1807 GST_DEBUG_OBJECT (demux, "Download bitrate is : %" G_GUINT64_FORMAT " bps",
1810 average_bitrate = _update_average_bitrate (demux, stream, fragment_bitrate);
1812 GST_INFO_OBJECT (stream, "last fragment bitrate was %" G_GUINT64_FORMAT,
1814 GST_INFO_OBJECT (stream,
1815 "Last %u fragments average bitrate is %" G_GUINT64_FORMAT,
1816 NUM_LOOKBACK_FRAGMENTS, average_bitrate);
1818 /* Conservative approach, make sure we don't upgrade too fast */
1819 stream->current_download_rate = MIN (average_bitrate, fragment_bitrate);
1821 stream->current_download_rate *= demux->bitrate_limit;
1822 GST_DEBUG_OBJECT (demux, "Bitrate after bitrate limit (%0.2f): %"
1823 G_GUINT64_FORMAT, demux->bitrate_limit,
1824 stream->current_download_rate * 8);
1825 return stream->current_download_rate;
1828 /* must be called with manifest_lock taken */
1829 static GstFlowReturn
1830 gst_adaptive_demux_combine_flows (GstAdaptiveDemux * demux)
1832 gboolean all_notlinked = TRUE;
1833 gboolean all_eos = TRUE;
1836 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1837 GstAdaptiveDemuxStream *stream = iter->data;
1839 if (stream->last_ret != GST_FLOW_NOT_LINKED) {
1840 all_notlinked = FALSE;
1841 if (stream->last_ret != GST_FLOW_EOS)
1845 if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED
1846 || stream->last_ret == GST_FLOW_FLUSHING) {
1847 return stream->last_ret;
1851 return GST_FLOW_NOT_LINKED;
1853 return GST_FLOW_EOS;
1857 /* must be called with manifest_lock taken.
1858 * Temporarily releases manifest_lock
1861 gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream,
1864 GstAdaptiveDemux *demux = stream->demux;
1865 GstFlowReturn ret = GST_FLOW_OK;
1866 gboolean discont = FALSE;
1867 /* Pending events */
1868 GstEvent *pending_caps = NULL, *pending_segment = NULL, *pending_tags = NULL;
1869 GList *pending_events = NULL;
1871 if (stream->first_fragment_buffer) {
1872 GstClockTime offset =
1873 gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
1874 GstClockTime period_start =
1875 gst_adaptive_demux_get_period_start_time (demux);
1877 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1878 if (demux->segment.rate < 0)
1879 /* Set DISCONT flag for every first buffer in reverse playback mode
1880 * as each fragment for its own has to be reversed */
1883 GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
1884 if (GST_BUFFER_PTS_IS_VALID (buffer))
1885 GST_BUFFER_PTS (buffer) += offset;
1887 if (GST_BUFFER_PTS_IS_VALID (buffer)) {
1888 stream->segment.position = GST_BUFFER_PTS (buffer);
1890 /* Convert from position inside the stream's segment to the demuxer's
1891 * segment, they are not necessarily the same */
1892 if (stream->segment.position - offset + period_start >
1893 demux->segment.position)
1894 demux->segment.position =
1895 stream->segment.position - offset + period_start;
1897 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1899 GST_LOG_OBJECT (stream->pad,
1900 "Going to push buffer with PTS %" GST_TIME_FORMAT,
1901 GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
1903 GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
1906 if (stream->discont) {
1908 stream->discont = FALSE;
1912 GST_DEBUG_OBJECT (stream->pad, "Marking fragment as discontinuous");
1913 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
1915 GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
1918 stream->first_fragment_buffer = FALSE;
1920 GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
1921 GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
1922 if (G_UNLIKELY (stream->pending_caps)) {
1923 pending_caps = gst_event_new_caps (stream->pending_caps);
1924 gst_caps_unref (stream->pending_caps);
1925 stream->pending_caps = NULL;
1927 if (G_UNLIKELY (stream->pending_segment)) {
1928 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1929 pending_segment = stream->pending_segment;
1930 stream->pending_segment = NULL;
1931 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1933 if (G_UNLIKELY (stream->pending_tags || stream->bitrate_changed)) {
1934 GstTagList *tags = stream->pending_tags;
1936 stream->pending_tags = NULL;
1937 stream->bitrate_changed = 0;
1939 if (stream->fragment.bitrate != 0) {
1941 tags = gst_tag_list_make_writable (tags);
1943 tags = gst_tag_list_new_empty ();
1945 gst_tag_list_add (tags, GST_TAG_MERGE_KEEP,
1946 GST_TAG_NOMINAL_BITRATE, stream->fragment.bitrate, NULL);
1948 pending_tags = gst_event_new_tag (tags);
1950 if (G_UNLIKELY (stream->pending_events)) {
1951 pending_events = stream->pending_events;
1952 stream->pending_events = NULL;
1955 GST_MANIFEST_UNLOCK (demux);
1957 /* Do not push events or buffers holding the manifest lock */
1958 if (G_UNLIKELY (pending_caps)) {
1959 GST_DEBUG_OBJECT (stream->pad, "Setting pending caps: %" GST_PTR_FORMAT,
1961 gst_pad_push_event (stream->pad, pending_caps);
1963 if (G_UNLIKELY (pending_segment)) {
1964 GST_DEBUG_OBJECT (stream->pad, "Sending pending seg: %" GST_PTR_FORMAT,
1966 gst_pad_push_event (stream->pad, pending_segment);
1968 if (G_UNLIKELY (pending_tags)) {
1969 GST_DEBUG_OBJECT (stream->pad, "Sending pending tags: %" GST_PTR_FORMAT,
1971 gst_pad_push_event (stream->pad, pending_tags);
1973 while (pending_events != NULL) {
1974 GstEvent *event = pending_events->data;
1976 if (!gst_pad_push_event (stream->pad, event))
1977 GST_ERROR_OBJECT (stream->pad, "Failed to send pending event");
1979 pending_events = g_list_delete_link (pending_events, pending_events);
1982 ret = gst_pad_push (stream->pad, buffer);
1984 GST_MANIFEST_LOCK (demux);
1986 g_mutex_lock (&stream->fragment_download_lock);
1987 if (G_UNLIKELY (stream->cancelled)) {
1988 ret = stream->last_ret = GST_FLOW_FLUSHING;
1989 g_mutex_unlock (&stream->fragment_download_lock);
1992 g_mutex_unlock (&stream->fragment_download_lock);
1994 GST_LOG_OBJECT (stream->pad, "Push result: %d %s", ret,
1995 gst_flow_get_name (ret));
2000 /* must be called with manifest_lock taken */
2001 static GstFlowReturn
2002 gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
2003 GstAdaptiveDemuxStream * stream)
2005 /* No need to advance, this isn't a real fragment */
2006 if (G_UNLIKELY (stream->downloading_header || stream->downloading_index))
2009 return gst_adaptive_demux_stream_advance_fragment (demux, stream,
2010 stream->fragment.duration);
2013 /* must be called with manifest_lock taken.
2014 * Can temporarily release manifest_lock
2016 static GstFlowReturn
2017 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
2018 GstAdaptiveDemuxStream * stream, GstBuffer * buffer)
2020 return gst_adaptive_demux_stream_push_buffer (stream, buffer);
2023 static GstFlowReturn
2024 _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2026 GstAdaptiveDemuxStream *stream;
2027 GstAdaptiveDemux *demux;
2028 GstAdaptiveDemuxClass *klass;
2029 GstFlowReturn ret = GST_FLOW_OK;
2031 demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2032 stream = gst_pad_get_element_private (pad);
2033 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2035 GST_MANIFEST_LOCK (demux);
2037 /* do not make any changes if the stream is cancelled */
2038 g_mutex_lock (&stream->fragment_download_lock);
2039 if (G_UNLIKELY (stream->cancelled)) {
2040 g_mutex_unlock (&stream->fragment_download_lock);
2041 gst_buffer_unref (buffer);
2042 ret = stream->last_ret = GST_FLOW_FLUSHING;
2043 GST_MANIFEST_UNLOCK (demux);
2046 g_mutex_unlock (&stream->fragment_download_lock);
2048 if (stream->starting_fragment) {
2049 GstClockTime offset =
2050 gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
2051 GstClockTime period_start =
2052 gst_adaptive_demux_get_period_start_time (demux);
2054 stream->starting_fragment = FALSE;
2055 if (klass->start_fragment) {
2056 if (!klass->start_fragment (demux, stream)) {
2057 ret = GST_FLOW_ERROR;
2062 GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
2063 if (GST_BUFFER_PTS_IS_VALID (buffer))
2064 GST_BUFFER_PTS (buffer) += offset;
2066 GST_LOG_OBJECT (stream->pad, "set fragment pts=%" GST_TIME_FORMAT,
2067 GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
2069 if (GST_BUFFER_PTS_IS_VALID (buffer)) {
2070 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2071 stream->segment.position = GST_BUFFER_PTS (buffer);
2073 /* Convert from position inside the stream's segment to the demuxer's
2074 * segment, they are not necessarily the same */
2075 if (stream->segment.position - offset + period_start >
2076 demux->segment.position)
2077 demux->segment.position =
2078 stream->segment.position - offset + period_start;
2079 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2083 GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
2085 if (stream->downloading_first_buffer) {
2086 gint64 chunk_size = 0;
2088 stream->downloading_first_buffer = FALSE;
2090 if (!stream->downloading_header && !stream->downloading_index) {
2091 /* If this is the first buffer of a fragment (not the headers or index)
2092 * and we don't have a birate from the sub-class, then see if we
2093 * can work it out from the fragment size and duration */
2094 if (stream->fragment.bitrate == 0 &&
2095 stream->fragment.duration != 0 &&
2096 gst_element_query_duration (stream->uri_handler, GST_FORMAT_BYTES,
2098 guint bitrate = MIN (G_MAXUINT, gst_util_uint64_scale (chunk_size,
2099 8 * GST_SECOND, stream->fragment.duration));
2100 GST_LOG_OBJECT (demux,
2101 "Fragment has size %" G_GUINT64_FORMAT " duration %" GST_TIME_FORMAT
2102 " = bitrate %u", chunk_size,
2103 GST_TIME_ARGS (stream->fragment.duration), bitrate);
2104 stream->fragment.bitrate = bitrate;
2106 if (stream->fragment.bitrate) {
2107 stream->bitrate_changed = TRUE;
2109 GST_WARNING_OBJECT (demux, "Bitrate for fragment not available");
2114 stream->download_total_time +=
2115 GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux)) -
2116 stream->download_chunk_start_time;
2117 stream->download_total_bytes += gst_buffer_get_size (buffer);
2119 GST_DEBUG_OBJECT (stream->pad, "Received buffer of size %" G_GSIZE_FORMAT,
2120 gst_buffer_get_size (buffer));
2122 ret = klass->data_received (demux, stream, buffer);
2124 if (ret == GST_FLOW_FLUSHING) {
2125 /* do not make any changes if the stream is cancelled */
2126 g_mutex_lock (&stream->fragment_download_lock);
2127 if (G_UNLIKELY (stream->cancelled)) {
2128 g_mutex_unlock (&stream->fragment_download_lock);
2129 GST_MANIFEST_UNLOCK (demux);
2132 g_mutex_unlock (&stream->fragment_download_lock);
2135 stream->download_chunk_start_time =
2136 GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
2138 if (ret != GST_FLOW_OK) {
2139 if (ret < GST_FLOW_EOS) {
2140 GST_ELEMENT_ERROR (demux, STREAM, FAILED, (NULL),
2141 ("stream stopped, reason %s", gst_flow_get_name (ret)));
2143 /* TODO push this on all pads */
2144 gst_pad_push_event (stream->pad, gst_event_new_eos ());
2146 GST_DEBUG_OBJECT (stream->pad, "stream stopped, reason %s",
2147 gst_flow_get_name (ret));
2150 gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
2151 if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH)
2152 ret = GST_FLOW_EOS; /* return EOS to make the source stop */
2157 GST_MANIFEST_UNLOCK (demux);
2162 /* must be called with manifest_lock taken */
2164 gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
2165 stream, GstFlowReturn ret, GError * err)
2167 GST_DEBUG_OBJECT (stream->pad, "Download finish: %d %s - err: %p", ret,
2168 gst_flow_get_name (ret), err);
2170 /* if we have an error, only replace last_ret if it was OK before to avoid
2171 * overwriting the first error we got */
2172 if (stream->last_ret == GST_FLOW_OK) {
2173 stream->last_ret = ret;
2175 g_clear_error (&stream->last_error);
2176 stream->last_error = g_error_copy (err);
2179 g_mutex_lock (&stream->fragment_download_lock);
2180 stream->download_finished = TRUE;
2181 g_cond_signal (&stream->fragment_download_cond);
2182 g_mutex_unlock (&stream->fragment_download_lock);
2186 _src_event (GstPad * pad, GstObject * parent, GstEvent * event)
2188 GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (pad);
2189 GstAdaptiveDemux *demux = stream->demux;
2191 switch (GST_EVENT_TYPE (event)) {
2192 case GST_EVENT_EOS:{
2193 GstAdaptiveDemuxClass *klass;
2196 GST_MANIFEST_LOCK (demux);
2198 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2199 ret = klass->finish_fragment (demux, stream);
2200 gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
2202 GST_MANIFEST_UNLOCK (demux);
2209 gst_event_unref (event);
2215 _src_query (GstPad * pad, GstObject * parent, GstQuery * query)
2217 GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (pad);
2219 switch (GST_QUERY_TYPE (query)) {
2220 case GST_QUERY_ALLOCATION:
2227 return gst_pad_peer_query (stream->pad, query);
2230 /* must be called with manifest_lock taken.
2231 * Can temporarily release manifest_lock
2234 gst_adaptive_demux_stream_wait_manifest_update (GstAdaptiveDemux * demux,
2235 GstAdaptiveDemuxStream * stream)
2237 gboolean ret = TRUE;
2239 /* Wait until we're cancelled or there's something for
2240 * us to download in the playlist or the playlist
2241 * became non-live */
2243 GST_DEBUG_OBJECT (demux, "No fragment left but live playlist, wait a bit");
2245 /* get the manifest_update_lock while still holding the manifest_lock.
2246 * This will prevent other threads to signal the condition (they will need
2247 * both manifest_lock and manifest_update_lock in order to signal).
2248 * It cannot deadlock because all threads always get the manifest_lock first
2249 * and manifest_update_lock second.
2251 g_mutex_lock (&demux->priv->manifest_update_lock);
2253 GST_MANIFEST_UNLOCK (demux);
2255 g_cond_wait (&demux->priv->manifest_cond,
2256 &demux->priv->manifest_update_lock);
2257 g_mutex_unlock (&demux->priv->manifest_update_lock);
2259 GST_MANIFEST_LOCK (demux);
2261 /* check for cancelled every time we get the manifest_lock */
2262 g_mutex_lock (&stream->fragment_download_lock);
2263 if (G_UNLIKELY (stream->cancelled)) {
2265 stream->last_ret = GST_FLOW_FLUSHING;
2266 g_mutex_unlock (&stream->fragment_download_lock);
2269 g_mutex_unlock (&stream->fragment_download_lock);
2271 /* Got a new fragment or not live anymore? */
2272 if (gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
2273 GST_DEBUG_OBJECT (demux, "new fragment available, "
2274 "not waiting for manifest update");
2279 if (!gst_adaptive_demux_is_live (demux)) {
2280 GST_DEBUG_OBJECT (demux, "Not live anymore, "
2281 "not waiting for manifest update");
2286 GST_DEBUG_OBJECT (demux, "Retrying now");
2290 /* must be called with manifest_lock taken */
2292 gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream,
2293 const gchar * uri, const gchar * referer, gboolean refresh,
2294 gboolean allow_cache)
2296 GstAdaptiveDemux *demux = stream->demux;
2298 if (!gst_uri_is_valid (uri)) {
2299 GST_WARNING_OBJECT (stream->pad, "Invalid URI: %s", uri);
2303 if (stream->src != NULL) {
2304 gchar *old_protocol, *new_protocol;
2307 old_uri = gst_uri_handler_get_uri (GST_URI_HANDLER (stream->uri_handler));
2308 old_protocol = gst_uri_get_protocol (old_uri);
2309 new_protocol = gst_uri_get_protocol (uri);
2311 if (!g_str_equal (old_protocol, new_protocol)) {
2312 gst_object_unref (stream->src_srcpad);
2313 gst_element_set_state (stream->src, GST_STATE_NULL);
2314 gst_bin_remove (GST_BIN_CAST (demux), stream->src);
2316 stream->src_srcpad = NULL;
2317 GST_DEBUG_OBJECT (demux, "Can't re-use old source element");
2321 GST_DEBUG_OBJECT (demux, "Re-using old source element");
2322 if (!gst_uri_handler_set_uri (GST_URI_HANDLER (stream->uri_handler), uri,
2324 GST_DEBUG_OBJECT (demux, "Failed to re-use old source element: %s",
2326 g_clear_error (&err);
2327 gst_object_unref (stream->src_srcpad);
2328 gst_element_set_state (stream->src, GST_STATE_NULL);
2329 gst_bin_remove (GST_BIN_CAST (demux), stream->src);
2331 stream->src_srcpad = NULL;
2335 g_free (old_protocol);
2336 g_free (new_protocol);
2339 if (stream->src == NULL) {
2340 GstPad *uri_handler_src;
2343 GstElement *uri_handler;
2345 GstPadLinkReturn pad_link_ret;
2346 GObjectClass *gobject_class;
2347 gchar *internal_name, *bin_name;
2349 /* Our src consists of a bin containing uri_handler -> queue2 . The
2350 * purpose of the queue2 is to allow the uri_handler to download an
2351 * entire fragment without blocking, so we can accurately measure the
2352 * download bitrate. */
2354 queue = gst_element_factory_make ("queue2", NULL);
2358 g_object_set (queue, "max-size-bytes", (guint) SRC_QUEUE_MAX_BYTES, NULL);
2359 g_object_set (queue, "max-size-buffers", (guint) 0, NULL);
2360 g_object_set (queue, "max-size-time", (guint64) 0, NULL);
2362 uri_handler = gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL);
2363 if (uri_handler == NULL) {
2364 GST_ELEMENT_ERROR (demux, CORE, MISSING_PLUGIN,
2365 ("Missing plugin to handle URI: '%s'", uri), (NULL));
2366 gst_object_unref (queue);
2370 gobject_class = G_OBJECT_GET_CLASS (uri_handler);
2372 if (g_object_class_find_property (gobject_class, "compress"))
2373 g_object_set (uri_handler, "compress", FALSE, NULL);
2374 if (g_object_class_find_property (gobject_class, "keep-alive"))
2375 g_object_set (uri_handler, "keep-alive", TRUE, NULL);
2376 if (g_object_class_find_property (gobject_class, "extra-headers")) {
2377 if (referer || refresh || !allow_cache) {
2378 GstStructure *extra_headers = gst_structure_new_empty ("headers");
2381 gst_structure_set (extra_headers, "Referer", G_TYPE_STRING, referer,
2385 gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
2388 gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
2391 g_object_set (uri_handler, "extra-headers", extra_headers, NULL);
2393 gst_structure_free (extra_headers);
2395 g_object_set (uri_handler, "extra-headers", NULL, NULL);
2399 /* Source bin creation */
2400 bin_name = g_strdup_printf ("srcbin-%s", GST_PAD_NAME (stream->pad));
2401 stream->src = gst_bin_new (bin_name);
2403 if (stream->src == NULL) {
2404 gst_object_unref (queue);
2405 gst_object_unref (uri_handler);
2409 gst_bin_add (GST_BIN_CAST (stream->src), queue);
2410 gst_bin_add (GST_BIN_CAST (stream->src), uri_handler);
2412 uri_handler_src = gst_element_get_static_pad (uri_handler, "src");
2413 queue_sink = gst_element_get_static_pad (queue, "sink");
2416 gst_pad_link_full (uri_handler_src, queue_sink,
2417 GST_PAD_LINK_CHECK_NOTHING);
2418 if (GST_PAD_LINK_FAILED (pad_link_ret)) {
2419 GST_WARNING_OBJECT (demux,
2420 "Could not link pads %s:%s to %s:%s for reason %d",
2421 GST_DEBUG_PAD_NAME (uri_handler_src), GST_DEBUG_PAD_NAME (queue_sink),
2423 g_object_unref (queue_sink);
2424 g_object_unref (uri_handler_src);
2425 gst_object_unref (stream->src);
2430 g_object_unref (queue_sink);
2431 g_object_unref (uri_handler_src);
2432 queue_src = gst_element_get_static_pad (queue, "src");
2433 stream->src_srcpad = gst_ghost_pad_new ("src", queue_src);
2434 g_object_unref (queue_src);
2435 gst_element_add_pad (stream->src, stream->src_srcpad);
2437 gst_element_set_locked_state (stream->src, TRUE);
2438 gst_bin_add (GST_BIN_CAST (demux), stream->src);
2439 stream->src_srcpad = gst_element_get_static_pad (stream->src, "src");
2441 /* set up our internal floating pad to drop all events from
2442 * the http src we don't care about. On the chain function
2443 * we just push the buffer forward */
2444 internal_name = g_strdup_printf ("internal-%s", GST_PAD_NAME (stream->pad));
2445 stream->internal_pad = gst_pad_new (internal_name, GST_PAD_SINK);
2446 g_free (internal_name);
2447 gst_object_set_parent (GST_OBJECT_CAST (stream->internal_pad),
2448 GST_OBJECT_CAST (demux));
2449 GST_OBJECT_FLAG_SET (stream->internal_pad, GST_PAD_FLAG_NEED_PARENT);
2450 gst_pad_set_element_private (stream->internal_pad, stream);
2451 gst_pad_set_active (stream->internal_pad, TRUE);
2452 gst_pad_set_chain_function (stream->internal_pad, _src_chain);
2453 gst_pad_set_event_function (stream->internal_pad, _src_event);
2454 gst_pad_set_query_function (stream->internal_pad, _src_query);
2456 if (gst_pad_link_full (stream->src_srcpad, stream->internal_pad,
2457 GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
2458 GST_ERROR_OBJECT (stream->pad, "Failed to link internal pad");
2462 stream->uri_handler = uri_handler;
2463 stream->queue = queue;
2468 /* must be called with manifest_lock taken.
2469 * Can temporarily release manifest_lock
2471 static GstFlowReturn
2472 gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux,
2473 GstAdaptiveDemuxStream * stream, const gchar * uri, gint64 start,
2476 GstFlowReturn ret = GST_FLOW_OK;
2477 GST_DEBUG_OBJECT (stream->pad, "Downloading uri: %s, range:%" G_GINT64_FORMAT
2478 " - %" G_GINT64_FORMAT, uri, start, end);
2480 if (!gst_adaptive_demux_stream_update_source (stream, uri, NULL, FALSE, TRUE)) {
2481 ret = stream->last_ret = GST_FLOW_ERROR;
2485 if (gst_element_set_state (stream->src,
2486 GST_STATE_READY) != GST_STATE_CHANGE_FAILURE) {
2487 if (start != 0 || end != -1) {
2488 /* HTTP ranges are inclusive, GStreamer segments are exclusive for the
2492 /* Send the seek event to the uri_handler, as the other pipeline elements
2493 * can't handle it when READY. */
2494 if (!gst_element_send_event (stream->uri_handler, gst_event_new_seek (1.0,
2495 GST_FORMAT_BYTES, (GstSeekFlags) GST_SEEK_FLAG_FLUSH,
2496 GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET, end))) {
2498 /* looks like the source can't handle seeks in READY */
2499 g_clear_error (&stream->last_error);
2500 stream->last_error = g_error_new (GST_CORE_ERROR,
2501 GST_CORE_ERROR_NOT_IMPLEMENTED,
2502 "Source element can't handle range requests");
2503 stream->last_ret = GST_FLOW_ERROR;
2507 if (G_LIKELY (stream->last_ret == GST_FLOW_OK)) {
2508 stream->download_start_time =
2509 GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
2510 stream->download_chunk_start_time = stream->download_start_time;
2512 /* src element is in state READY. Before we start it, we reset
2515 g_mutex_lock (&stream->fragment_download_lock);
2516 stream->download_finished = FALSE;
2517 stream->downloading_first_buffer = TRUE;
2518 g_mutex_unlock (&stream->fragment_download_lock);
2520 GST_MANIFEST_UNLOCK (demux);
2522 if (!gst_element_sync_state_with_parent (stream->src)) {
2523 GST_WARNING_OBJECT (demux, "Could not sync state for src element");
2524 GST_MANIFEST_LOCK (demux);
2525 ret = stream->last_ret = GST_FLOW_ERROR;
2529 /* wait for the fragment to be completely downloaded */
2530 GST_DEBUG_OBJECT (stream->pad,
2531 "Waiting for fragment download to finish: %s", uri);
2533 g_mutex_lock (&stream->fragment_download_lock);
2534 if (G_UNLIKELY (stream->cancelled)) {
2535 g_mutex_unlock (&stream->fragment_download_lock);
2536 GST_MANIFEST_LOCK (demux);
2537 ret = stream->last_ret = GST_FLOW_FLUSHING;
2540 while (!stream->cancelled && !stream->download_finished) {
2541 g_cond_wait (&stream->fragment_download_cond,
2542 &stream->fragment_download_lock);
2544 g_mutex_unlock (&stream->fragment_download_lock);
2546 GST_MANIFEST_LOCK (demux);
2547 g_mutex_lock (&stream->fragment_download_lock);
2548 if (G_UNLIKELY (stream->cancelled)) {
2549 ret = stream->last_ret = GST_FLOW_FLUSHING;
2550 g_mutex_unlock (&stream->fragment_download_lock);
2553 g_mutex_unlock (&stream->fragment_download_lock);
2555 ret = stream->last_ret;
2557 GST_DEBUG_OBJECT (stream->pad, "Fragment download finished: %s %d %s",
2558 uri, stream->last_ret, gst_flow_get_name (stream->last_ret));
2561 if (stream->last_ret == GST_FLOW_OK)
2562 stream->last_ret = GST_FLOW_CUSTOM_ERROR;
2563 ret = GST_FLOW_CUSTOM_ERROR;
2566 /* changing src element state might try to join the streaming thread, so
2567 * we must not hold the manifest lock.
2569 GST_MANIFEST_UNLOCK (demux);
2571 gst_element_set_state (stream->src, GST_STATE_READY);
2573 GST_MANIFEST_LOCK (demux);
2574 g_mutex_lock (&stream->fragment_download_lock);
2575 if (G_UNLIKELY (stream->cancelled)) {
2576 ret = stream->last_ret = GST_FLOW_FLUSHING;
2577 g_mutex_unlock (&stream->fragment_download_lock);
2580 g_mutex_unlock (&stream->fragment_download_lock);
2582 /* deactivate and reactivate our ghostpad to make it fresh for a new
2584 gst_pad_set_active (stream->internal_pad, FALSE);
2585 gst_pad_set_active (stream->internal_pad, TRUE);
2590 /* must be called with manifest_lock taken.
2591 * Can temporarily release manifest_lock
2593 static GstFlowReturn
2594 gst_adaptive_demux_stream_download_header_fragment (GstAdaptiveDemuxStream *
2597 GstAdaptiveDemux *demux = stream->demux;
2598 GstFlowReturn ret = GST_FLOW_OK;
2600 if (stream->fragment.header_uri != NULL) {
2601 GST_DEBUG_OBJECT (demux, "Fetching header %s %" G_GINT64_FORMAT "-%"
2602 G_GINT64_FORMAT, stream->fragment.header_uri,
2603 stream->fragment.header_range_start, stream->fragment.header_range_end);
2605 stream->downloading_header = TRUE;
2606 ret = gst_adaptive_demux_stream_download_uri (demux, stream,
2607 stream->fragment.header_uri, stream->fragment.header_range_start,
2608 stream->fragment.header_range_end);
2609 stream->downloading_header = FALSE;
2612 /* check if we have an index */
2613 if (ret == GST_FLOW_OK) { /* TODO check for other valid types */
2615 if (stream->fragment.index_uri != NULL) {
2616 GST_DEBUG_OBJECT (demux,
2617 "Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
2618 stream->fragment.index_uri,
2619 stream->fragment.index_range_start, stream->fragment.index_range_end);
2620 stream->downloading_index = TRUE;
2621 ret = gst_adaptive_demux_stream_download_uri (demux, stream,
2622 stream->fragment.index_uri, stream->fragment.index_range_start,
2623 stream->fragment.index_range_end);
2624 stream->downloading_index = FALSE;
2631 /* must be called with manifest_lock taken.
2632 * Can temporarily release manifest_lock
2634 static GstFlowReturn
2635 gst_adaptive_demux_stream_download_fragment (GstAdaptiveDemuxStream * stream)
2637 GstAdaptiveDemux *demux = stream->demux;
2639 GstFlowReturn ret = GST_FLOW_OK;
2641 stream->starting_fragment = TRUE;
2642 stream->last_ret = GST_FLOW_OK;
2643 stream->first_fragment_buffer = TRUE;
2645 if (stream->fragment.uri == NULL && stream->fragment.header_uri == NULL &&
2646 stream->fragment.index_uri == NULL)
2649 if (stream->need_header) {
2650 ret = gst_adaptive_demux_stream_download_header_fragment (stream);
2651 if (ret != GST_FLOW_OK) {
2654 stream->need_header = FALSE;
2657 url = stream->fragment.uri;
2658 GST_DEBUG_OBJECT (stream->pad, "Got url '%s' for stream %p", url, stream);
2661 gst_adaptive_demux_stream_download_uri (demux, stream, url,
2662 stream->fragment.range_start, stream->fragment.range_end);
2663 GST_DEBUG_OBJECT (stream->pad, "Fragment download result: %d %s",
2664 stream->last_ret, gst_flow_get_name (stream->last_ret));
2665 if (ret != GST_FLOW_OK) {
2666 g_mutex_lock (&stream->fragment_download_lock);
2667 if (G_UNLIKELY (stream->cancelled)) {
2668 g_mutex_unlock (&stream->fragment_download_lock);
2671 g_mutex_unlock (&stream->fragment_download_lock);
2673 /* TODO check if we are truly stopping */
2674 if (ret == GST_FLOW_CUSTOM_ERROR && gst_adaptive_demux_is_live (demux)) {
2675 if (++stream->download_error_count <= MAX_DOWNLOAD_ERROR_COUNT) {
2676 /* looks like there is no way of knowing when a live stream has ended
2677 * Have to assume we are falling behind and cause a manifest reload */
2678 GST_DEBUG_OBJECT (stream->pad,
2679 "Converting error of live stream to EOS");
2680 return GST_FLOW_EOS;
2682 } else if (ret == GST_FLOW_CUSTOM_ERROR
2683 && !gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
2684 /* If this is the last fragment, consider failures EOS and not actual
2685 * errors. Due to rounding errors in the durations, the last fragment
2686 * might not actually exist */
2687 GST_DEBUG_OBJECT (stream->pad,
2688 "Converting error for last fragment to EOS");
2689 return GST_FLOW_EOS;
2698 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
2699 (_("Failed to get fragment URL.")),
2700 ("An error happened when getting fragment URL"));
2701 gst_task_stop (stream->download_task);
2702 return GST_FLOW_ERROR;
2706 /* this function will take the manifest_lock and will keep it until the end.
2707 * It will release it temporarily only when going to sleep.
2708 * Every time it takes the manifest_lock, it will check for cancelled condition
2711 gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
2713 GstAdaptiveDemux *demux = stream->demux;
2714 GstClockTime next_download = gst_adaptive_demux_get_monotonic_time (demux);
2718 GST_LOG_OBJECT (stream->pad, "download loop start");
2720 GST_MANIFEST_LOCK (demux);
2722 g_mutex_lock (&stream->fragment_download_lock);
2723 if (G_UNLIKELY (stream->cancelled)) {
2724 stream->last_ret = GST_FLOW_FLUSHING;
2725 g_mutex_unlock (&stream->fragment_download_lock);
2728 g_mutex_unlock (&stream->fragment_download_lock);
2730 /* Check if we're done with our segment */
2731 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2732 if (demux->segment.rate > 0) {
2733 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop)
2734 && stream->segment.position >= stream->segment.stop) {
2735 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2737 gst_task_stop (stream->download_task);
2738 goto end_of_manifest;
2741 if (GST_CLOCK_TIME_IS_VALID (demux->segment.start)
2742 && stream->segment.position <= stream->segment.start) {
2743 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2745 gst_task_stop (stream->download_task);
2746 goto end_of_manifest;
2749 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2751 /* Cleanup old streams if any */
2752 if (G_UNLIKELY (demux->priv->old_streams != NULL)) {
2753 GList *old_streams = demux->priv->old_streams;
2754 demux->priv->old_streams = NULL;
2756 GST_DEBUG_OBJECT (stream->pad, "Cleaning up old streams");
2757 g_list_free_full (old_streams,
2758 (GDestroyNotify) gst_adaptive_demux_stream_free);
2759 GST_DEBUG_OBJECT (stream->pad, "Cleaning up old streams (done)");
2761 /* gst_adaptive_demux_stream_free had temporarily released the manifest_lock.
2762 * Recheck the cancelled flag.
2764 g_mutex_lock (&stream->fragment_download_lock);
2765 if (G_UNLIKELY (stream->cancelled)) {
2766 stream->last_ret = GST_FLOW_FLUSHING;
2767 g_mutex_unlock (&stream->fragment_download_lock);
2770 g_mutex_unlock (&stream->fragment_download_lock);
2773 if (G_UNLIKELY (stream->restart_download)) {
2774 GstEvent *seg_event;
2775 GstClockTime cur, ts = 0;
2778 GST_DEBUG_OBJECT (stream->pad,
2779 "Activating stream due to reconfigure event");
2781 if (gst_pad_peer_query_position (stream->pad, GST_FORMAT_TIME, &pos)) {
2782 ts = (GstClockTime) pos;
2783 GST_DEBUG_OBJECT (demux, "Downstream position: %"
2784 GST_TIME_FORMAT, GST_TIME_ARGS (ts));
2786 /* query other pads as some faulty element in the pad's branch might
2787 * reject position queries. This should be better than using the
2788 * demux segment position that can be much ahead */
2791 for (iter = demux->streams; iter != NULL; iter = g_list_next (iter)) {
2792 GstAdaptiveDemuxStream *cur_stream =
2793 (GstAdaptiveDemuxStream *) iter->data;
2795 if (gst_pad_peer_query_position (cur_stream->pad, GST_FORMAT_TIME,
2797 ts = (GstClockTime) pos;
2798 GST_DEBUG_OBJECT (stream->pad, "Downstream position: %"
2799 GST_TIME_FORMAT, GST_TIME_ARGS (ts));
2805 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2807 gst_segment_to_stream_time (&stream->segment, GST_FORMAT_TIME,
2808 stream->segment.position);
2810 /* we might have already pushed this data */
2813 GST_DEBUG_OBJECT (stream->pad, "Restarting stream at "
2814 "position %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
2816 if (GST_CLOCK_TIME_IS_VALID (ts)) {
2817 GstClockTime offset, period_start;
2820 gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
2821 period_start = gst_adaptive_demux_get_period_start_time (demux);
2823 /* TODO check return */
2824 gst_adaptive_demux_stream_seek (demux, stream, demux->segment.rate >= 0,
2827 stream->segment.position = ts - period_start + offset;
2830 /* The stream's segment is still correct except for
2831 * the position, so let's send a new one with the
2832 * updated position */
2833 seg_event = gst_event_new_segment (&stream->segment);
2834 gst_event_set_seqnum (seg_event, demux->priv->segment_seqnum);
2835 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2837 GST_DEBUG_OBJECT (stream->pad, "Sending restart segment: %"
2838 GST_PTR_FORMAT, seg_event);
2839 gst_pad_push_event (stream->pad, seg_event);
2841 stream->discont = TRUE;
2842 stream->restart_download = FALSE;
2845 live = gst_adaptive_demux_is_live (demux);
2847 ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream);
2848 GST_DEBUG_OBJECT (stream->pad, "Fragment info update result: %d %s",
2849 ret, gst_flow_get_name (ret));
2850 if (ret == GST_FLOW_OK) {
2852 /* wait for live fragments to be available */
2855 gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
2856 if (wait_time > 0) {
2857 GstClockTime end_time =
2858 gst_adaptive_demux_get_monotonic_time (demux) + wait_time;
2860 GST_DEBUG_OBJECT (stream->pad, "Download waiting for %" GST_TIME_FORMAT,
2861 GST_TIME_ARGS (wait_time));
2863 GST_MANIFEST_UNLOCK (demux);
2865 g_mutex_lock (&stream->fragment_download_lock);
2866 if (G_UNLIKELY (stream->cancelled)) {
2867 g_mutex_unlock (&stream->fragment_download_lock);
2868 GST_MANIFEST_LOCK (demux);
2869 stream->last_ret = GST_FLOW_FLUSHING;
2872 gst_adaptive_demux_wait_until (demux->realtime_clock,
2873 &stream->fragment_download_cond, &stream->fragment_download_lock,
2875 g_mutex_unlock (&stream->fragment_download_lock);
2877 GST_DEBUG_OBJECT (stream->pad, "Download finished waiting");
2879 GST_MANIFEST_LOCK (demux);
2881 g_mutex_lock (&stream->fragment_download_lock);
2882 if (G_UNLIKELY (stream->cancelled)) {
2883 stream->last_ret = GST_FLOW_FLUSHING;
2884 g_mutex_unlock (&stream->fragment_download_lock);
2887 g_mutex_unlock (&stream->fragment_download_lock);
2891 stream->last_ret = GST_FLOW_OK;
2893 next_download = gst_adaptive_demux_get_monotonic_time (demux);
2894 ret = gst_adaptive_demux_stream_download_fragment (stream);
2896 if (ret == GST_FLOW_FLUSHING) {
2897 g_mutex_lock (&stream->fragment_download_lock);
2898 if (G_UNLIKELY (stream->cancelled)) {
2899 stream->last_ret = GST_FLOW_FLUSHING;
2900 g_mutex_unlock (&stream->fragment_download_lock);
2903 g_mutex_unlock (&stream->fragment_download_lock);
2907 stream->last_ret = ret;
2912 break; /* all is good, let's go */
2914 GST_DEBUG_OBJECT (stream->pad, "EOS, checking to stop download loop");
2915 /* we push the EOS after releasing the object lock */
2916 if (gst_adaptive_demux_is_live (demux)) {
2917 if (gst_adaptive_demux_stream_wait_manifest_update (demux, stream)) {
2920 gst_task_stop (stream->download_task);
2922 gst_task_stop (stream->download_task);
2923 if (gst_adaptive_demux_combine_flows (demux) == GST_FLOW_EOS) {
2924 if (gst_adaptive_demux_has_next_period (demux)) {
2925 gst_adaptive_demux_advance_period (demux);
2932 case GST_FLOW_NOT_LINKED:
2933 gst_task_stop (stream->download_task);
2934 if (gst_adaptive_demux_combine_flows (demux)
2935 == GST_FLOW_NOT_LINKED) {
2936 GST_ELEMENT_ERROR (demux, STREAM, FAILED,
2937 (_("Internal data stream error.")), ("stream stopped, reason %s",
2938 gst_flow_get_name (GST_FLOW_NOT_LINKED)));
2942 case GST_FLOW_FLUSHING:{
2945 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
2946 GstAdaptiveDemuxStream *other;
2949 gst_task_stop (other->download_task);
2955 if (ret <= GST_FLOW_ERROR) {
2956 gboolean is_live = gst_adaptive_demux_is_live (demux);
2957 GST_WARNING_OBJECT (demux, "Error while downloading fragment");
2958 if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
2959 goto download_error;
2962 g_clear_error (&stream->last_error);
2964 /* First try to update the playlist for non-live playlists
2965 * in case the URIs have changed in the meantime. But only
2966 * try it the first time, after that we're going to wait a
2967 * a bit to not flood the server */
2968 if (stream->download_error_count == 1 && !is_live) {
2969 /* TODO hlsdemux had more options to this function (boolean and err) */
2971 if (gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
2972 /* Retry immediately, the playlist actually has changed */
2973 GST_DEBUG_OBJECT (demux, "Updated the playlist");
2978 /* Wait half the fragment duration before retrying */
2979 next_download += stream->fragment.duration / 2;
2981 GST_MANIFEST_UNLOCK (demux);
2983 g_mutex_lock (&stream->fragment_download_lock);
2984 if (G_UNLIKELY (stream->cancelled)) {
2985 g_mutex_unlock (&stream->fragment_download_lock);
2986 GST_MANIFEST_LOCK (demux);
2987 stream->last_ret = GST_FLOW_FLUSHING;
2990 gst_adaptive_demux_wait_until (demux->realtime_clock,
2991 &stream->fragment_download_cond, &stream->fragment_download_lock,
2993 g_mutex_unlock (&stream->fragment_download_lock);
2995 GST_DEBUG_OBJECT (demux, "Retrying now");
2997 GST_MANIFEST_LOCK (demux);
2999 g_mutex_lock (&stream->fragment_download_lock);
3000 if (G_UNLIKELY (stream->cancelled)) {
3001 stream->last_ret = GST_FLOW_FLUSHING;
3002 g_mutex_unlock (&stream->fragment_download_lock);
3005 g_mutex_unlock (&stream->fragment_download_lock);
3007 /* Refetch the playlist now after we waited */
3009 && gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
3010 GST_DEBUG_OBJECT (demux, "Updated the playlist");
3018 if (G_UNLIKELY (ret == GST_FLOW_EOS)) {
3019 gst_adaptive_demux_stream_push_event (stream, gst_event_new_eos ());
3023 GST_MANIFEST_UNLOCK (demux);
3024 GST_LOG_OBJECT (stream->pad, "download loop end");
3029 GST_DEBUG_OBJECT (stream->pad, "Stream has been cancelled");
3036 if (stream->last_error) {
3037 gchar *debug = g_strdup_printf ("Error on stream %s:%s",
3038 GST_DEBUG_PAD_NAME (stream->pad));
3040 gst_message_new_error (GST_OBJECT_CAST (demux), stream->last_error,
3042 GST_ERROR_OBJECT (stream->pad, "Download error: %s",
3043 stream->last_error->message);
3047 g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_NOT_FOUND,
3048 _("Couldn't download fragments"));
3050 gst_message_new_error (GST_OBJECT_CAST (demux), err,
3051 "Fragment downloading has failed consecutive times");
3053 GST_ERROR_OBJECT (stream->pad,
3054 "Download error: Couldn't download fragments, too many failures");
3057 gst_task_stop (stream->download_task);
3059 gst_element_set_state (stream->src, GST_STATE_NULL);
3060 gst_bin_remove (GST_BIN_CAST (demux), stream->src);
3064 gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3071 gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
3073 GstClockTime next_update;
3074 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3076 /* Loop for updating of the playlist. This periodically checks if
3077 * the playlist is updated and does so, then signals the streaming
3078 * thread in case it can continue downloading now. */
3080 /* block until the next scheduled update or the signal to quit this thread */
3081 GST_DEBUG_OBJECT (demux, "Started updates task");
3083 GST_MANIFEST_LOCK (demux);
3086 gst_adaptive_demux_get_monotonic_time (demux) +
3087 klass->get_manifest_update_interval (demux) * GST_USECOND;
3089 /* Updating playlist only needed for live playlists */
3090 while (gst_adaptive_demux_is_live (demux)) {
3091 GstFlowReturn ret = GST_FLOW_OK;
3093 /* Wait here until we should do the next update or we're cancelled */
3094 GST_DEBUG_OBJECT (demux, "Wait for next playlist update");
3096 GST_MANIFEST_UNLOCK (demux);
3098 g_mutex_lock (&demux->priv->updates_timed_lock);
3099 if (demux->priv->stop_updates_task) {
3100 g_mutex_unlock (&demux->priv->updates_timed_lock);
3103 gst_adaptive_demux_wait_until (demux->realtime_clock,
3104 &demux->priv->updates_timed_cond,
3105 &demux->priv->updates_timed_lock, next_update);
3106 g_mutex_unlock (&demux->priv->updates_timed_lock);
3108 GST_MANIFEST_LOCK (demux);
3110 g_mutex_lock (&demux->priv->updates_timed_lock);
3111 if (demux->priv->stop_updates_task) {
3112 g_mutex_unlock (&demux->priv->updates_timed_lock);
3113 GST_MANIFEST_UNLOCK (demux);
3116 g_mutex_unlock (&demux->priv->updates_timed_lock);
3118 GST_DEBUG_OBJECT (demux, "Updating playlist");
3120 ret = gst_adaptive_demux_update_manifest (demux);
3122 if (ret == GST_FLOW_EOS) {
3123 } else if (ret != GST_FLOW_OK) {
3124 /* update_failed_count is used only here, no need to protect it */
3125 demux->priv->update_failed_count++;
3126 if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
3127 GST_WARNING_OBJECT (demux, "Could not update the playlist");
3128 next_update = gst_adaptive_demux_get_monotonic_time (demux)
3129 + klass->get_manifest_update_interval (demux) * GST_USECOND;
3131 GST_ELEMENT_ERROR (demux, STREAM, FAILED,
3132 (_("Internal data stream error.")), ("Could not update playlist"));
3133 GST_DEBUG_OBJECT (demux, "Stopped updates task because of error");
3134 gst_task_stop (demux->priv->updates_task);
3135 GST_MANIFEST_UNLOCK (demux);
3139 GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
3141 gst_adaptive_demux_get_monotonic_time (demux) +
3142 klass->get_manifest_update_interval (demux) * GST_USECOND;
3144 /* Wake up download tasks */
3145 g_mutex_lock (&demux->priv->manifest_update_lock);
3146 g_cond_broadcast (&demux->priv->manifest_cond);
3147 g_mutex_unlock (&demux->priv->manifest_update_lock);
3151 GST_MANIFEST_UNLOCK (demux);
3155 GST_DEBUG_OBJECT (demux, "Stop updates task request detected.");
3164 /* must be called with manifest_lock taken */
3166 gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
3171 if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
3174 GST_DEBUG_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
3175 "Pushing event %" GST_PTR_FORMAT, event);
3176 ret = gst_pad_push_event (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream), event);
3180 /* must be called with manifest_lock taken */
3182 gst_adaptive_demux_is_live (GstAdaptiveDemux * demux)
3184 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3187 return klass->is_live (demux);
3191 /* must be called with manifest_lock taken */
3192 static GstFlowReturn
3193 gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
3194 GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
3195 GstClockTime ts, GstClockTime * final_ts)
3197 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3199 if (klass->stream_seek)
3200 return klass->stream_seek (stream, forward, flags, ts, final_ts);
3201 return GST_FLOW_ERROR;
3204 /* must be called with manifest_lock taken */
3206 gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux * demux,
3207 GstAdaptiveDemuxStream * stream)
3209 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3210 gboolean ret = TRUE;
3212 if (klass->stream_has_next_fragment)
3213 ret = klass->stream_has_next_fragment (stream);
3218 /* must be called with manifest_lock taken */
3220 gst_adaptive_demux_stream_advance_fragment (GstAdaptiveDemux * demux,
3221 GstAdaptiveDemuxStream * stream, GstClockTime duration)
3225 if (stream->last_ret == GST_FLOW_OK) {
3227 gst_adaptive_demux_stream_advance_fragment_unlocked (demux, stream,
3230 ret = stream->last_ret;
3235 /* must be called with manifest_lock taken */
3237 gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
3238 GstAdaptiveDemuxStream * stream, GstClockTime duration)
3240 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3243 g_return_val_if_fail (klass->stream_advance_fragment != NULL, GST_FLOW_ERROR);
3245 stream->download_error_count = 0;
3246 g_clear_error (&stream->last_error);
3248 /* FIXME - url has no indication of byte ranges for subsegments */
3249 gst_element_post_message (GST_ELEMENT_CAST (demux),
3250 gst_message_new_element (GST_OBJECT_CAST (demux),
3251 gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
3252 "manifest-uri", G_TYPE_STRING,
3253 demux->manifest_uri, "uri", G_TYPE_STRING,
3254 stream->fragment.uri, "fragment-start-time",
3255 GST_TYPE_CLOCK_TIME, stream->download_start_time,
3256 "fragment-stop-time", GST_TYPE_CLOCK_TIME,
3257 gst_util_get_timestamp (), "fragment-size", G_TYPE_UINT64,
3258 stream->download_total_bytes, "fragment-download-time",
3259 GST_TYPE_CLOCK_TIME,
3260 stream->download_total_time * GST_USECOND, NULL)));
3262 /* Don't update to the end of the segment if in reverse playback */
3263 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3264 if (GST_CLOCK_TIME_IS_VALID (duration) && demux->segment.rate > 0) {
3265 GstClockTime offset =
3266 gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
3267 GstClockTime period_start =
3268 gst_adaptive_demux_get_period_start_time (demux);
3270 stream->segment.position += duration;
3272 /* Convert from position inside the stream's segment to the demuxer's
3273 * segment, they are not necessarily the same */
3274 if (stream->segment.position - offset + period_start >
3275 demux->segment.position)
3276 demux->segment.position =
3277 stream->segment.position - offset + period_start;
3279 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3281 if (gst_adaptive_demux_is_live (demux)
3282 || gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
3283 ret = klass->stream_advance_fragment (stream);
3288 stream->download_start_time = stream->download_chunk_start_time =
3289 GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
3291 if (ret == GST_FLOW_OK) {
3292 if (gst_adaptive_demux_stream_select_bitrate (demux, stream,
3293 gst_adaptive_demux_stream_update_current_bitrate (demux, stream))) {
3294 stream->need_header = TRUE;
3295 ret = (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH;
3298 /* the subclass might want to switch pads */
3299 if (G_UNLIKELY (demux->next_streams)) {
3300 gst_task_stop (stream->download_task);
3301 /* TODO only allow switching streams if other downloads are not ongoing */
3302 GST_DEBUG_OBJECT (demux, "Subclass wants new pads "
3303 "to do bitrate switching");
3304 gst_adaptive_demux_expose_streams (demux, FALSE);
3305 gst_adaptive_demux_start_tasks (demux);
3313 /* must be called with manifest_lock taken */
3315 gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
3316 demux, GstAdaptiveDemuxStream * stream, guint64 bitrate)
3318 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3320 /* FIXME: Currently several issues have be found when letting bitrate adaptation
3321 * happen using trick modes (such as 'All streams finished without buffers') and
3322 * the adaptive algorithm does not properly behave. */
3323 if (demux->segment.rate != 1.0)
3326 if (klass->stream_select_bitrate)
3327 return klass->stream_select_bitrate (stream, bitrate);
3331 /* must be called with manifest_lock taken */
3332 static GstFlowReturn
3333 gst_adaptive_demux_stream_update_fragment_info (GstAdaptiveDemux * demux,
3334 GstAdaptiveDemuxStream * stream)
3336 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3338 g_return_val_if_fail (klass->stream_update_fragment_info != NULL,
3341 /* Make sure the sub-class will update bitrate, or else
3343 stream->fragment.bitrate = 0;
3345 return klass->stream_update_fragment_info (stream);
3348 /* must be called with manifest_lock taken */
3350 gst_adaptive_demux_stream_get_fragment_waiting_time (GstAdaptiveDemux *
3351 demux, GstAdaptiveDemuxStream * stream)
3353 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3355 if (klass->stream_get_fragment_waiting_time)
3356 return klass->stream_get_fragment_waiting_time (stream);
3360 /* must be called with manifest_lock taken */
3361 static GstFlowReturn
3362 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux)
3364 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3365 GstFragment *download;
3369 download = gst_uri_downloader_fetch_uri (demux->downloader,
3370 demux->manifest_uri, NULL, TRUE, TRUE, TRUE, NULL);
3372 g_free (demux->manifest_uri);
3373 g_free (demux->manifest_base_uri);
3374 if (download->redirect_permanent && download->redirect_uri) {
3375 demux->manifest_uri = g_strdup (download->redirect_uri);
3376 demux->manifest_base_uri = NULL;
3378 demux->manifest_uri = g_strdup (download->uri);
3379 demux->manifest_base_uri = g_strdup (download->redirect_uri);
3382 buffer = gst_fragment_get_buffer (download);
3383 g_object_unref (download);
3384 ret = klass->update_manifest_data (demux, buffer);
3385 gst_buffer_unref (buffer);
3386 /* FIXME: Should the manifest uri vars be reverted to original
3387 * values if updating fails? */
3389 ret = GST_FLOW_NOT_LINKED;
3395 /* must be called with manifest_lock taken */
3396 static GstFlowReturn
3397 gst_adaptive_demux_update_manifest (GstAdaptiveDemux * demux)
3399 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3402 ret = klass->update_manifest (demux);
3404 if (ret == GST_FLOW_OK) {
3405 GstClockTime duration;
3406 /* Send an updated duration message */
3407 duration = klass->get_duration (demux);
3408 if (duration != GST_CLOCK_TIME_NONE) {
3409 GST_DEBUG_OBJECT (demux,
3410 "Sending duration message : %" GST_TIME_FORMAT,
3411 GST_TIME_ARGS (duration));
3412 gst_element_post_message (GST_ELEMENT (demux),
3413 gst_message_new_duration_changed (GST_OBJECT (demux)));
3415 GST_DEBUG_OBJECT (demux,
3416 "Duration unknown, can not send the duration message");
3424 gst_adaptive_demux_stream_fragment_clear (GstAdaptiveDemuxStreamFragment * f)
3431 g_free (f->header_uri);
3432 f->header_uri = NULL;
3433 f->header_range_start = 0;
3434 f->header_range_end = -1;
3436 g_free (f->index_uri);
3437 f->index_uri = NULL;
3438 f->index_range_start = 0;
3439 f->index_range_end = -1;
3442 /* must be called with manifest_lock taken */
3444 gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
3446 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3447 gboolean ret = FALSE;
3449 if (klass->has_next_period)
3450 ret = klass->has_next_period (demux);
3451 GST_DEBUG_OBJECT (demux, "Has next period: %d", ret);
3455 /* must be called with manifest_lock taken */
3457 gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
3459 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3461 g_return_if_fail (klass->advance_period != NULL);
3463 GST_DEBUG_OBJECT (demux, "Advancing to next period");
3464 klass->advance_period (demux);
3465 gst_adaptive_demux_expose_streams (demux, FALSE);
3466 gst_adaptive_demux_start_tasks (demux);
3470 * gst_adaptive_demux_get_monotonic_time:
3471 * Returns: a monotonically increasing time, using the system realtime clock
3474 gst_adaptive_demux_get_monotonic_time (GstAdaptiveDemux * demux)
3476 g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
3477 return gst_clock_get_time (demux->realtime_clock);
3481 * gst_adaptive_demux_get_client_now_utc:
3482 * @demux: #GstAdaptiveDemux
3483 * Returns: the client's estimate of UTC
3485 * Used to find the client's estimate of UTC, using the system realtime clock.
3488 gst_adaptive_demux_get_client_now_utc (GstAdaptiveDemux * demux)
3490 GstClockTime rtc_now;
3494 rtc_now = gst_clock_get_time (demux->realtime_clock);
3495 utc_now = demux->clock_offset + GST_TIME_AS_USECONDS (rtc_now);
3496 gtv.tv_sec = utc_now / G_TIME_SPAN_SECOND;
3497 gtv.tv_usec = utc_now % G_TIME_SPAN_SECOND;
3498 return g_date_time_new_from_timeval_utc (>v);
3501 static GstAdaptiveDemuxTimer *
3502 gst_adaptive_demux_timer_new (GCond * cond, GMutex * mutex)
3504 GstAdaptiveDemuxTimer *timer;
3506 timer = g_slice_new (GstAdaptiveDemuxTimer);
3507 timer->fired = FALSE;
3509 timer->mutex = mutex;
3510 timer->ref_count = 1;
3514 static GstAdaptiveDemuxTimer *
3515 gst_adaptive_demux_timer_ref (GstAdaptiveDemuxTimer * timer)
3517 g_return_val_if_fail (timer != NULL, NULL);
3518 g_atomic_int_inc (&timer->ref_count);
3523 gst_adaptive_demux_timer_unref (GstAdaptiveDemuxTimer * timer)
3525 g_return_if_fail (timer != NULL);
3526 if (g_atomic_int_dec_and_test (&timer->ref_count)) {
3527 g_slice_free (GstAdaptiveDemuxTimer, timer);
3531 /* gst_adaptive_demux_wait_until:
3532 * A replacement for g_cond_wait_until that uses the clock rather
3533 * than system time to control the duration of the sleep. Typically
3534 * clock is actually a #GstSystemClock, in which case this function
3535 * behaves exactly like g_cond_wait_until. Inside unit tests,
3536 * the clock is typically a #GstTestClock, which allows tests to run
3538 * This function must be called with mutex held.
3541 gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
3542 GstClockTime end_time)
3544 GstAdaptiveDemuxTimer *timer;
3548 if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (end_time))) {
3549 /* for an invalid time, gst_clock_id_wait_async will try to call
3550 * gst_adaptive_demux_clock_callback from the current thread.
3551 * It still holds the mutex while doing that, so it will deadlock.
3552 * g_cond_wait_until would return immediately with false, so we'll do the same.
3556 timer = gst_adaptive_demux_timer_new (cond, mutex);
3557 timer->clock_id = gst_clock_new_single_shot_id (clock, end_time);
3559 gst_clock_id_wait_async (timer->clock_id,
3560 gst_adaptive_demux_clock_callback, gst_adaptive_demux_timer_ref (timer),
3561 (GDestroyNotify) gst_adaptive_demux_timer_unref);
3562 /* clock does not support asynchronously wait. Assert and return */
3563 if (res == GST_CLOCK_UNSUPPORTED) {
3564 gst_clock_id_unref (timer->clock_id);
3565 gst_adaptive_demux_timer_unref (timer);
3566 g_return_val_if_reached (TRUE);
3568 g_assert (!timer->fired);
3569 /* the gst_adaptive_demux_clock_callback() will signal the
3570 * cond when the clock's single shot timer fires, or the cond will be
3571 * signalled by another thread that wants to cause this wait to finish
3572 * early (e.g. to terminate the waiting thread).
3573 * There is no need for a while loop here, because that logic is
3574 * implemented by the function calling gst_adaptive_demux_wait_until() */
3575 g_cond_wait (cond, mutex);
3576 fired = timer->fired;
3578 gst_clock_id_unschedule (timer->clock_id);
3579 gst_clock_id_unref (timer->clock_id);
3580 gst_adaptive_demux_timer_unref (timer);
3585 gst_adaptive_demux_clock_callback (GstClock * clock,
3586 GstClockTime time, GstClockID id, gpointer user_data)
3588 GstAdaptiveDemuxTimer *timer = (GstAdaptiveDemuxTimer *) user_data;
3589 g_return_val_if_fail (timer != NULL, FALSE);
3590 g_mutex_lock (timer->mutex);
3591 timer->fired = TRUE;
3592 g_cond_signal (timer->cond);
3593 g_mutex_unlock (timer->mutex);