2 * Copyright (C) 2010 Marc-Andre Lureau <marcandre.lureau@gmail.com>
3 * Copyright (C) 2010 Andoni Morales Alastruey <ylatuya@gmail.com>
4 * Copyright (C) 2011, Hewlett-Packard Development Company, L.P.
5 * Author: Youness Alaoui <youness.alaoui@collabora.co.uk>, Collabora Ltd.
6 * Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>, Collabora Ltd.
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Library General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Library General Public License for more details.
20 * You should have received a copy of the GNU Library General Public
21 * License along with this library; if not, write to the
22 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23 * Boston, MA 02110-1301, USA.
26 * SECTION:element-hlsdemux
28 * HTTP Live Streaming demuxer element.
31 * <title>Example launch line</title>
33 * gst-launch souphttpsrc location=http://devimages.apple.com/iphone/samples/bipbop/gear4/prog_index.m3u8 ! hlsdemux ! decodebin2 ! videoconvert ! videoscale ! autovideosink
37 * Last reviewed on 2010-10-07
44 /* FIXME 0.11: suppress warnings for deprecated API such as GStaticRecMutex
45 * with newer GLib versions (>= 2.31.0) */
46 #define GLIB_DISABLE_DEPRECATION_WARNINGS
49 #include <gst/glib-compat-private.h>
50 #include <gnutls/gnutls.h>
51 #include <gnutls/crypto.h>
52 #include "gsthlsdemux.h"
54 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
59 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
62 GST_STATIC_CAPS ("application/x-hls"));
64 GST_DEBUG_CATEGORY_STATIC (gst_hls_demux_debug);
65 #define GST_CAT_DEFAULT gst_hls_demux_debug
73 PROP_CONNECTION_SPEED,
77 static const float update_interval_factor[] = { 1, 0.5, 1.5, 3 };
79 #define DEFAULT_FRAGMENTS_CACHE 3
80 #define DEFAULT_FAILED_COUNT 3
81 #define DEFAULT_BITRATE_LIMIT 0.8
82 #define DEFAULT_CONNECTION_SPEED 0
85 static void gst_hls_demux_set_property (GObject * object, guint prop_id,
86 const GValue * value, GParamSpec * pspec);
87 static void gst_hls_demux_get_property (GObject * object, guint prop_id,
88 GValue * value, GParamSpec * pspec);
89 static void gst_hls_demux_dispose (GObject * obj);
92 static GstStateChangeReturn
93 gst_hls_demux_change_state (GstElement * element, GstStateChange transition);
96 static GstFlowReturn gst_hls_demux_chain (GstPad * pad, GstObject * parent,
98 static gboolean gst_hls_demux_sink_event (GstPad * pad, GstObject * parent,
100 static gboolean gst_hls_demux_src_event (GstPad * pad, GstObject * parent,
102 static gboolean gst_hls_demux_src_query (GstPad * pad, GstObject * parent,
104 static void gst_hls_demux_stream_loop (GstHLSDemux * demux);
105 static void gst_hls_demux_updates_loop (GstHLSDemux * demux);
106 static void gst_hls_demux_stop (GstHLSDemux * demux);
107 static void gst_hls_demux_pause_tasks (GstHLSDemux * demux, gboolean caching);
108 static gboolean gst_hls_demux_cache_fragments (GstHLSDemux * demux);
109 static gboolean gst_hls_demux_schedule (GstHLSDemux * demux);
110 static gboolean gst_hls_demux_switch_playlist (GstHLSDemux * demux);
111 static gboolean gst_hls_demux_get_next_fragment (GstHLSDemux * demux,
113 static gboolean gst_hls_demux_update_playlist (GstHLSDemux * demux,
115 static void gst_hls_demux_reset (GstHLSDemux * demux, gboolean dispose);
116 static gboolean gst_hls_demux_set_location (GstHLSDemux * demux,
118 static gchar *gst_hls_src_buf_to_utf8_playlist (GstBuffer * buf);
120 #define gst_hls_demux_parent_class parent_class
121 G_DEFINE_TYPE (GstHLSDemux, gst_hls_demux, GST_TYPE_ELEMENT);
124 gst_hls_demux_dispose (GObject * obj)
126 GstHLSDemux *demux = GST_HLS_DEMUX (obj);
128 if (demux->stream_task) {
129 if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) {
130 GST_DEBUG_OBJECT (demux, "Leaving streaming task");
131 gst_task_stop (demux->stream_task);
132 g_rec_mutex_lock (&demux->stream_lock);
133 g_rec_mutex_unlock (&demux->stream_lock);
134 gst_task_join (demux->stream_task);
136 gst_object_unref (demux->stream_task);
137 g_rec_mutex_clear (&demux->stream_lock);
138 demux->stream_task = NULL;
141 if (demux->updates_task) {
142 if (GST_TASK_STATE (demux->updates_task) != GST_TASK_STOPPED) {
143 GST_DEBUG_OBJECT (demux, "Leaving updates task");
144 demux->cancelled = TRUE;
145 gst_uri_downloader_cancel (demux->downloader);
146 gst_task_stop (demux->updates_task);
147 g_mutex_lock (&demux->updates_timed_lock);
148 GST_TASK_SIGNAL (demux->updates_task);
149 g_rec_mutex_lock (&demux->updates_lock);
150 g_rec_mutex_unlock (&demux->updates_lock);
151 g_mutex_unlock (&demux->updates_timed_lock);
152 gst_task_join (demux->updates_task);
154 gst_object_unref (demux->updates_task);
155 g_mutex_clear (&demux->updates_timed_lock);
156 g_rec_mutex_clear (&demux->updates_lock);
157 demux->updates_task = NULL;
160 if (demux->downloader != NULL) {
161 g_object_unref (demux->downloader);
162 demux->downloader = NULL;
165 gst_hls_demux_reset (demux, TRUE);
167 g_queue_free (demux->queue);
169 G_OBJECT_CLASS (parent_class)->dispose (obj);
173 gst_hls_demux_class_init (GstHLSDemuxClass * klass)
175 GObjectClass *gobject_class;
176 GstElementClass *element_class;
178 gobject_class = (GObjectClass *) klass;
179 element_class = (GstElementClass *) klass;
181 gobject_class->set_property = gst_hls_demux_set_property;
182 gobject_class->get_property = gst_hls_demux_get_property;
183 gobject_class->dispose = gst_hls_demux_dispose;
185 g_object_class_install_property (gobject_class, PROP_FRAGMENTS_CACHE,
186 g_param_spec_uint ("fragments-cache", "Fragments cache",
187 "Number of fragments needed to be cached to start playing",
188 2, G_MAXUINT, DEFAULT_FRAGMENTS_CACHE,
189 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
191 g_object_class_install_property (gobject_class, PROP_BITRATE_LIMIT,
192 g_param_spec_float ("bitrate-limit",
193 "Bitrate limit in %",
194 "Limit of the available bitrate to use when switching to alternates.",
195 0, 1, DEFAULT_BITRATE_LIMIT,
196 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
198 g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
199 g_param_spec_uint ("connection-speed", "Connection Speed",
200 "Network connection speed in kbps (0 = unknown)",
201 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_SPEED,
202 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
204 element_class->change_state = GST_DEBUG_FUNCPTR (gst_hls_demux_change_state);
206 gst_element_class_add_pad_template (element_class,
207 gst_static_pad_template_get (&srctemplate));
209 gst_element_class_add_pad_template (element_class,
210 gst_static_pad_template_get (&sinktemplate));
212 gst_element_class_set_static_metadata (element_class,
215 "HTTP Live Streaming demuxer",
216 "Marc-Andre Lureau <marcandre.lureau@gmail.com>\n"
217 "Andoni Morales Alastruey <ylatuya@gmail.com>");
219 GST_DEBUG_CATEGORY_INIT (gst_hls_demux_debug, "hlsdemux", 0,
224 gst_hls_demux_init (GstHLSDemux * demux)
227 demux->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
228 gst_pad_set_chain_function (demux->sinkpad,
229 GST_DEBUG_FUNCPTR (gst_hls_demux_chain));
230 gst_pad_set_event_function (demux->sinkpad,
231 GST_DEBUG_FUNCPTR (gst_hls_demux_sink_event));
232 gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
235 demux->downloader = gst_uri_downloader_new ();
237 demux->do_typefind = TRUE;
240 demux->fragments_cache = DEFAULT_FRAGMENTS_CACHE;
241 demux->bitrate_limit = DEFAULT_BITRATE_LIMIT;
242 demux->connection_speed = DEFAULT_CONNECTION_SPEED;
244 demux->queue = g_queue_new ();
247 g_rec_mutex_init (&demux->updates_lock);
248 demux->updates_task =
249 gst_task_new ((GstTaskFunction) gst_hls_demux_updates_loop, demux, NULL);
250 gst_task_set_lock (demux->updates_task, &demux->updates_lock);
251 g_mutex_init (&demux->updates_timed_lock);
254 g_rec_mutex_init (&demux->stream_lock);
256 gst_task_new ((GstTaskFunction) gst_hls_demux_stream_loop, demux, NULL);
257 gst_task_set_lock (demux->stream_task, &demux->stream_lock);
259 demux->have_group_id = FALSE;
260 demux->group_id = G_MAXUINT;
264 gst_hls_demux_set_property (GObject * object, guint prop_id,
265 const GValue * value, GParamSpec * pspec)
267 GstHLSDemux *demux = GST_HLS_DEMUX (object);
270 case PROP_FRAGMENTS_CACHE:
271 demux->fragments_cache = g_value_get_uint (value);
273 case PROP_BITRATE_LIMIT:
274 demux->bitrate_limit = g_value_get_float (value);
276 case PROP_CONNECTION_SPEED:
277 demux->connection_speed = g_value_get_uint (value) * 1000;
280 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
286 gst_hls_demux_get_property (GObject * object, guint prop_id, GValue * value,
289 GstHLSDemux *demux = GST_HLS_DEMUX (object);
292 case PROP_FRAGMENTS_CACHE:
293 g_value_set_uint (value, demux->fragments_cache);
295 case PROP_BITRATE_LIMIT:
296 g_value_set_float (value, demux->bitrate_limit);
298 case PROP_CONNECTION_SPEED:
299 g_value_set_uint (value, demux->connection_speed / 1000);
302 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
307 static GstStateChangeReturn
308 gst_hls_demux_change_state (GstElement * element, GstStateChange transition)
310 GstStateChangeReturn ret;
311 GstHLSDemux *demux = GST_HLS_DEMUX (element);
313 switch (transition) {
314 case GST_STATE_CHANGE_READY_TO_PAUSED:
315 gst_hls_demux_reset (demux, FALSE);
321 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
323 switch (transition) {
324 case GST_STATE_CHANGE_PAUSED_TO_READY:
325 demux->cancelled = TRUE;
326 gst_hls_demux_stop (demux);
327 gst_task_join (demux->stream_task);
328 gst_hls_demux_reset (demux, FALSE);
337 gst_hls_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
341 demux = GST_HLS_DEMUX (parent);
343 switch (event->type) {
349 GstSeekType start_type, stop_type;
352 GstClockTime position, current_pos, target_pos;
353 gint current_sequence;
354 GstM3U8MediaFile *file;
356 GST_INFO_OBJECT (demux, "Received GST_EVENT_SEEK");
358 if (gst_m3u8_client_is_live (demux->client)) {
359 GST_WARNING_OBJECT (demux, "Received seek event for live stream");
363 gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
366 if (format != GST_FORMAT_TIME)
369 GST_DEBUG_OBJECT (demux, "seek event, rate: %f start: %" GST_TIME_FORMAT
370 " stop: %" GST_TIME_FORMAT, rate, GST_TIME_ARGS (start),
371 GST_TIME_ARGS (stop));
373 GST_M3U8_CLIENT_LOCK (demux->client);
374 file = GST_M3U8_MEDIA_FILE (demux->client->current->files->data);
375 current_sequence = file->sequence;
377 target_pos = (GstClockTime) start;
378 for (walk = demux->client->current->files; walk; walk = walk->next) {
381 current_sequence = file->sequence;
382 if (current_pos <= target_pos
383 && target_pos < current_pos + file->duration) {
386 current_pos += file->duration;
388 GST_M3U8_CLIENT_UNLOCK (demux->client);
391 GST_WARNING_OBJECT (demux, "Could not find seeked fragment");
395 if (flags & GST_SEEK_FLAG_FLUSH) {
396 GST_DEBUG_OBJECT (demux, "sending flush start");
397 gst_pad_push_event (demux->srcpad, gst_event_new_flush_start ());
400 demux->cancelled = TRUE;
401 gst_task_pause (demux->stream_task);
402 gst_uri_downloader_cancel (demux->downloader);
403 gst_task_stop (demux->updates_task);
404 g_mutex_lock (&demux->updates_timed_lock);
405 GST_TASK_SIGNAL (demux->updates_task);
406 g_mutex_unlock (&demux->updates_timed_lock);
407 g_rec_mutex_lock (&demux->updates_lock);
408 g_rec_mutex_unlock (&demux->updates_lock);
409 gst_task_pause (demux->stream_task);
411 /* wait for streaming to finish */
412 g_rec_mutex_lock (&demux->stream_lock);
414 demux->need_cache = TRUE;
415 while (!g_queue_is_empty (demux->queue)) {
416 GstFragment *fragment = g_queue_pop_head (demux->queue);
417 g_object_unref (fragment);
419 g_queue_clear (demux->queue);
421 GST_M3U8_CLIENT_LOCK (demux->client);
422 GST_DEBUG_OBJECT (demux, "seeking to sequence %d", current_sequence);
423 demux->client->sequence = current_sequence;
424 gst_m3u8_client_get_current_position (demux->client, &position);
425 demux->position_shift = start - position;
426 demux->need_segment = TRUE;
427 GST_M3U8_CLIENT_UNLOCK (demux->client);
430 if (flags & GST_SEEK_FLAG_FLUSH) {
431 GST_DEBUG_OBJECT (demux, "sending flush stop");
432 gst_pad_push_event (demux->srcpad, gst_event_new_flush_stop (TRUE));
435 demux->cancelled = FALSE;
436 gst_task_start (demux->stream_task);
437 g_rec_mutex_unlock (&demux->stream_lock);
445 return gst_pad_event_default (pad, parent, event);
449 gst_hls_demux_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
456 demux = GST_HLS_DEMUX (parent);
458 switch (event->type) {
460 gchar *playlist = NULL;
462 if (demux->playlist == NULL) {
463 GST_WARNING_OBJECT (demux, "Received EOS without a playlist.");
467 GST_DEBUG_OBJECT (demux,
468 "Got EOS on the sink pad: main playlist fetched");
470 query = gst_query_new_uri ();
471 ret = gst_pad_peer_query (demux->sinkpad, query);
473 gst_query_parse_uri_redirection (query, &uri);
475 gst_query_parse_uri (query, &uri);
476 gst_hls_demux_set_location (demux, uri);
479 gst_query_unref (query);
481 playlist = gst_hls_src_buf_to_utf8_playlist (demux->playlist);
482 demux->playlist = NULL;
483 if (playlist == NULL) {
484 GST_WARNING_OBJECT (demux, "Error validating first playlist.");
485 } else if (!gst_m3u8_client_update (demux->client, playlist)) {
486 /* In most cases, this will happen if we set a wrong url in the
487 * source element and we have received the 404 HTML response instead of
489 GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid playlist."),
494 if (!ret && gst_m3u8_client_is_live (demux->client)) {
495 GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND,
496 ("Failed querying the playlist uri, "
497 "required for live sources."), (NULL));
501 gst_task_start (demux->stream_task);
502 gst_event_unref (event);
505 case GST_EVENT_SEGMENT:
506 /* Swallow newsegments, we'll push our own */
507 gst_event_unref (event);
513 return gst_pad_event_default (pad, parent, event);
517 gst_hls_demux_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
519 GstHLSDemux *hlsdemux;
520 gboolean ret = FALSE;
525 hlsdemux = GST_HLS_DEMUX (parent);
527 switch (query->type) {
528 case GST_QUERY_DURATION:{
529 GstClockTime duration = -1;
532 gst_query_parse_duration (query, &fmt, NULL);
533 if (fmt == GST_FORMAT_TIME) {
534 duration = gst_m3u8_client_get_duration (hlsdemux->client);
535 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
536 gst_query_set_duration (query, GST_FORMAT_TIME, duration);
540 GST_INFO_OBJECT (hlsdemux, "GST_QUERY_DURATION returns %s with duration %"
541 GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
545 if (hlsdemux->client) {
546 /* FIXME: Do we answer with the variant playlist, with the current
547 * playlist or the the uri of the least downlowaded fragment? */
548 gst_query_set_uri (query, gst_m3u8_client_get_uri (hlsdemux->client));
552 case GST_QUERY_SEEKING:{
556 gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
557 GST_INFO_OBJECT (hlsdemux, "Received GST_QUERY_SEEKING with format %d",
559 if (fmt == GST_FORMAT_TIME) {
560 GstClockTime duration;
562 duration = gst_m3u8_client_get_duration (hlsdemux->client);
563 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
566 gst_query_set_seeking (query, fmt,
567 !gst_m3u8_client_is_live (hlsdemux->client), 0, stop);
569 GST_INFO_OBJECT (hlsdemux, "GST_QUERY_SEEKING returning with stop : %"
570 GST_TIME_FORMAT, GST_TIME_ARGS (stop));
575 /* Don't fordward queries upstream because of the special nature of this
576 * "demuxer", which relies on the upstream element only to be fed with the
585 gst_hls_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
587 GstHLSDemux *demux = GST_HLS_DEMUX (parent);
589 if (demux->playlist == NULL)
590 demux->playlist = buf;
592 demux->playlist = gst_buffer_append (demux->playlist, buf);
598 gst_hls_demux_pause_tasks (GstHLSDemux * demux, gboolean caching)
600 if (GST_TASK_STATE (demux->updates_task) != GST_TASK_STOPPED) {
601 demux->cancelled = TRUE;
602 gst_uri_downloader_cancel (demux->downloader);
603 gst_task_pause (demux->updates_task);
605 g_mutex_lock (&demux->updates_timed_lock);
606 GST_TASK_SIGNAL (demux->updates_task);
608 g_mutex_unlock (&demux->updates_timed_lock);
611 if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) {
612 demux->stop_stream_task = TRUE;
613 gst_task_pause (demux->stream_task);
618 gst_hls_demux_stop (GstHLSDemux * demux)
620 gst_uri_downloader_cancel (demux->downloader);
622 if (GST_TASK_STATE (demux->updates_task) != GST_TASK_STOPPED) {
623 demux->cancelled = TRUE;
624 gst_uri_downloader_cancel (demux->downloader);
625 gst_task_stop (demux->updates_task);
626 g_mutex_lock (&demux->updates_timed_lock);
627 GST_TASK_SIGNAL (demux->updates_task);
628 g_mutex_unlock (&demux->updates_timed_lock);
629 g_rec_mutex_lock (&demux->updates_lock);
630 g_rec_mutex_unlock (&demux->updates_lock);
633 if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) {
634 demux->stop_stream_task = TRUE;
635 gst_task_stop (demux->stream_task);
636 g_rec_mutex_lock (&demux->stream_lock);
637 g_rec_mutex_unlock (&demux->stream_lock);
642 switch_pads (GstHLSDemux * demux, GstCaps * newcaps)
644 GstPad *oldpad = demux->srcpad;
648 GST_DEBUG ("Switching pads (oldpad:%p) with caps: %" GST_PTR_FORMAT, oldpad,
651 /* First create and activate new pad */
652 demux->srcpad = gst_pad_new_from_static_template (&srctemplate, NULL);
653 gst_pad_set_event_function (demux->srcpad,
654 GST_DEBUG_FUNCPTR (gst_hls_demux_src_event));
655 gst_pad_set_query_function (demux->srcpad,
656 GST_DEBUG_FUNCPTR (gst_hls_demux_src_query));
657 gst_pad_set_element_private (demux->srcpad, demux);
658 gst_pad_set_active (demux->srcpad, TRUE);
661 gst_pad_create_stream_id (demux->srcpad, GST_ELEMENT_CAST (demux), NULL);
663 event = gst_pad_get_sticky_event (demux->sinkpad, GST_EVENT_STREAM_START, 0);
665 if (gst_event_parse_group_id (event, &demux->group_id))
666 demux->have_group_id = TRUE;
668 demux->have_group_id = FALSE;
669 gst_event_unref (event);
670 } else if (!demux->have_group_id) {
671 demux->have_group_id = TRUE;
672 demux->group_id = gst_util_group_id_next ();
674 event = gst_event_new_stream_start (stream_id);
675 if (demux->have_group_id)
676 gst_event_set_group_id (event, demux->group_id);
678 gst_pad_push_event (demux->srcpad, event);
681 gst_pad_set_caps (demux->srcpad, newcaps);
683 gst_element_add_pad (GST_ELEMENT (demux), demux->srcpad);
685 gst_element_no_more_pads (GST_ELEMENT (demux));
689 gst_pad_push_event (oldpad, gst_event_new_eos ());
690 gst_pad_set_active (oldpad, FALSE);
691 gst_element_remove_pad (GST_ELEMENT (demux), oldpad);
696 gst_hls_demux_stream_loop (GstHLSDemux * demux)
698 GstFragment *fragment;
701 GstCaps *bufcaps, *srccaps = NULL;
703 /* Loop for the source pad task. The task is started when we have
704 * received the main playlist from the source element. It tries first to
705 * cache the first fragments and then it waits until it has more data in the
706 * queue. This task is woken up when we push a new fragment to the queue or
707 * when we reached the end of the playlist */
708 GST_DEBUG_OBJECT (demux, "Enter task");
710 if (G_UNLIKELY (demux->need_cache)) {
711 if (!gst_hls_demux_cache_fragments (demux))
714 /* we can start now the updates thread (only if on playing) */
715 gst_task_start (demux->updates_task);
716 GST_INFO_OBJECT (demux, "First fragments cached successfully");
719 if (g_queue_is_empty (demux->queue)) {
720 if (demux->end_of_playlist)
721 goto end_of_playlist;
726 fragment = g_queue_pop_head (demux->queue);
727 buf = gst_fragment_get_buffer (fragment);
729 /* Figure out if we need to create/switch pads */
730 if (G_LIKELY (demux->srcpad))
731 srccaps = gst_pad_get_current_caps (demux->srcpad);
732 bufcaps = gst_fragment_get_caps (fragment);
733 if (G_UNLIKELY (!srccaps || !gst_caps_is_equal_fixed (bufcaps, srccaps)
734 || demux->need_segment)) {
735 switch_pads (demux, bufcaps);
736 demux->need_segment = TRUE;
738 gst_caps_unref (bufcaps);
739 if (G_LIKELY (srccaps))
740 gst_caps_unref (srccaps);
741 g_object_unref (fragment);
743 if (demux->need_segment) {
745 GstClockTime start = GST_BUFFER_PTS (buf);
747 start += demux->position_shift;
748 /* And send a newsegment */
749 GST_DEBUG_OBJECT (demux, "Sending new-segment. segment start:%"
750 GST_TIME_FORMAT, GST_TIME_ARGS (start));
751 gst_segment_init (&segment, GST_FORMAT_TIME);
752 segment.start = start;
753 segment.time = start;
754 gst_pad_push_event (demux->srcpad, gst_event_new_segment (&segment));
755 demux->need_segment = FALSE;
756 demux->position_shift = 0;
759 GST_DEBUG_OBJECT (demux, "Pushing buffer %p", buf);
761 ret = gst_pad_push (demux->srcpad, buf);
762 if (ret != GST_FLOW_OK)
765 GST_DEBUG_OBJECT (demux, "Pushed buffer");
771 GST_DEBUG_OBJECT (demux, "Reached end of playlist, sending EOS");
772 gst_pad_push_event (demux->srcpad, gst_event_new_eos ());
773 gst_hls_demux_pause_tasks (demux, FALSE);
779 gst_task_pause (demux->stream_task);
780 if (!demux->cancelled) {
781 GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND,
782 ("Could not cache the first fragments"), (NULL));
783 gst_hls_demux_pause_tasks (demux, FALSE);
790 if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS) {
791 GST_ELEMENT_ERROR (demux, STREAM, FAILED, (NULL),
792 ("stream stopped, reason %s", gst_flow_get_name (ret)));
793 gst_pad_push_event (demux->srcpad, gst_event_new_eos ());
795 GST_DEBUG_OBJECT (demux, "stream stopped, reason %s",
796 gst_flow_get_name (ret));
798 gst_hls_demux_pause_tasks (demux, FALSE);
804 GST_DEBUG_OBJECT (demux, "Pause task");
805 gst_task_pause (demux->stream_task);
811 gst_hls_demux_reset (GstHLSDemux * demux, gboolean dispose)
813 demux->need_cache = TRUE;
814 demux->end_of_playlist = FALSE;
815 demux->cancelled = FALSE;
816 demux->do_typefind = TRUE;
818 if (demux->input_caps) {
819 gst_caps_unref (demux->input_caps);
820 demux->input_caps = NULL;
823 if (demux->playlist) {
824 gst_buffer_unref (demux->playlist);
825 demux->playlist = NULL;
829 gst_m3u8_client_free (demux->client);
830 demux->client = NULL;
834 demux->client = gst_m3u8_client_new ("");
837 while (!g_queue_is_empty (demux->queue)) {
838 GstFragment *fragment = g_queue_pop_head (demux->queue);
839 g_object_unref (fragment);
841 g_queue_clear (demux->queue);
843 demux->position_shift = 0;
844 demux->need_segment = TRUE;
846 demux->have_group_id = FALSE;
847 demux->group_id = G_MAXUINT;
851 gst_hls_demux_set_location (GstHLSDemux * demux, const gchar * uri)
854 gst_m3u8_client_free (demux->client);
855 demux->client = gst_m3u8_client_new (uri);
856 GST_INFO_OBJECT (demux, "Changed location: %s", uri);
861 gst_hls_demux_updates_loop (GstHLSDemux * demux)
863 /* Loop for the updates. It's started when the first fragments are cached and
864 * schedules the next update of the playlist (for lives sources) and the next
865 * update of fragments. When a new fragment is downloaded, it compares the
866 * download time with the next scheduled update to check if we can or should
867 * switch to a different bitrate */
869 /* block until the next scheduled update or the signal to quit this thread */
870 g_mutex_lock (&demux->updates_timed_lock);
871 GST_DEBUG_OBJECT (demux, "Started updates task");
873 if (demux->cancelled)
876 /* schedule the next update */
877 gst_hls_demux_schedule (demux);
879 /* block until the next scheduled update or the signal to quit this thread */
880 GST_DEBUG_OBJECT (demux, "Waiting");
881 if (g_cond_timed_wait (GST_TASK_GET_COND (demux->updates_task),
882 &demux->updates_timed_lock, &demux->next_update)) {
883 GST_DEBUG_OBJECT (demux, "Unlocked");
886 GST_DEBUG_OBJECT (demux, "Continue");
888 if (demux->cancelled)
891 /* update the playlist for live sources */
892 if (gst_m3u8_client_is_live (demux->client)) {
893 if (!gst_hls_demux_update_playlist (demux, TRUE)) {
894 if (demux->cancelled)
896 demux->client->update_failed_count++;
897 if (demux->client->update_failed_count < DEFAULT_FAILED_COUNT) {
898 GST_WARNING_OBJECT (demux, "Could not update the playlist");
901 GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND,
902 ("Could not update the playlist"), (NULL));
908 /* if it's a live source and the playlist couldn't be updated, there aren't
909 * more fragments in the playlist, so we just wait for the next schedulled
911 if (gst_m3u8_client_is_live (demux->client) &&
912 demux->client->update_failed_count > 0) {
913 GST_WARNING_OBJECT (demux,
914 "The playlist hasn't been updated, failed count is %d",
915 demux->client->update_failed_count);
919 if (demux->cancelled)
922 /* fetch the next fragment */
923 if (g_queue_is_empty (demux->queue)) {
924 GST_DEBUG_OBJECT (demux, "queue empty, get next fragment");
925 if (!gst_hls_demux_get_next_fragment (demux, FALSE)) {
926 if (demux->cancelled) {
928 } else if (!demux->end_of_playlist && !demux->cancelled) {
929 demux->client->update_failed_count++;
930 if (demux->client->update_failed_count < DEFAULT_FAILED_COUNT) {
931 GST_WARNING_OBJECT (demux, "Could not fetch the next fragment");
934 GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND,
935 ("Could not fetch the next fragment"), (NULL));
940 demux->client->update_failed_count = 0;
942 if (demux->cancelled)
945 /* try to switch to another bitrate if needed */
946 gst_hls_demux_switch_playlist (demux);
953 GST_DEBUG_OBJECT (demux, "Stopped updates task");
954 g_mutex_unlock (&demux->updates_timed_lock);
960 GST_DEBUG_OBJECT (demux, "Stopped updates task because of error");
961 gst_hls_demux_pause_tasks (demux, TRUE);
962 g_mutex_unlock (&demux->updates_timed_lock);
967 gst_hls_demux_cache_fragments (GstHLSDemux * demux)
971 /* If this playlist is a variant playlist, select the first one
973 if (gst_m3u8_client_has_variant_playlist (demux->client)) {
974 GstM3U8 *child = NULL;
976 if (demux->connection_speed == 0) {
978 GST_M3U8_CLIENT_LOCK (demux->client);
979 child = demux->client->main->current_variant->data;
980 GST_M3U8_CLIENT_UNLOCK (demux->client);
982 GList *tmp = gst_m3u8_client_get_playlist_for_bitrate (demux->client,
983 demux->connection_speed);
985 child = GST_M3U8 (tmp->data);
988 gst_m3u8_client_set_current (demux->client, child);
989 if (!gst_hls_demux_update_playlist (demux, FALSE)) {
990 GST_ERROR_OBJECT (demux, "Could not fetch the child playlist %s",
996 if (!gst_m3u8_client_is_live (demux->client)) {
997 GstClockTime duration = gst_m3u8_client_get_duration (demux->client);
999 GST_DEBUG_OBJECT (demux, "Sending duration message : %" GST_TIME_FORMAT,
1000 GST_TIME_ARGS (duration));
1001 if (duration != GST_CLOCK_TIME_NONE)
1002 gst_element_post_message (GST_ELEMENT (demux),
1003 gst_message_new_duration_changed (GST_OBJECT (demux)));
1006 /* Cache the first fragments */
1007 for (i = 0; i < demux->fragments_cache; i++) {
1008 gst_element_post_message (GST_ELEMENT (demux),
1009 gst_message_new_buffering (GST_OBJECT (demux),
1010 100 * i / demux->fragments_cache));
1011 g_get_current_time (&demux->next_update);
1012 if (!gst_hls_demux_get_next_fragment (demux, TRUE)) {
1013 if (demux->end_of_playlist)
1015 if (!demux->cancelled)
1016 GST_ERROR_OBJECT (demux, "Error caching the first fragments");
1019 /* make sure we stop caching fragments if something cancelled it */
1020 if (demux->cancelled)
1022 gst_hls_demux_switch_playlist (demux);
1024 gst_element_post_message (GST_ELEMENT (demux),
1025 gst_message_new_buffering (GST_OBJECT (demux), 100));
1027 g_get_current_time (&demux->next_update);
1029 demux->need_cache = FALSE;
1035 gst_hls_src_buf_to_utf8_playlist (GstBuffer * buf)
1040 if (!gst_buffer_map (buf, &info, GST_MAP_READ))
1043 if (!g_utf8_validate ((gchar *) info.data, info.size, NULL))
1044 goto validate_error;
1046 /* alloc size + 1 to end with a null character */
1047 playlist = g_malloc0 (info.size + 1);
1048 memcpy (playlist, info.data, info.size);
1050 gst_buffer_unmap (buf, &info);
1051 gst_buffer_unref (buf);
1055 gst_buffer_unmap (buf, &info);
1056 gst_buffer_unref (buf);
1061 gst_hls_demux_update_playlist (GstHLSDemux * demux, gboolean update)
1063 GstFragment *download;
1066 gboolean updated = FALSE;
1068 const gchar *uri = gst_m3u8_client_get_current_uri (demux->client);
1070 download = gst_uri_downloader_fetch_uri (demux->downloader, uri);
1072 if (download == NULL)
1075 buf = gst_fragment_get_buffer (download);
1076 playlist = gst_hls_src_buf_to_utf8_playlist (buf);
1077 g_object_unref (download);
1079 if (playlist == NULL) {
1080 GST_WARNING_OBJECT (demux, "Couldn't not validate playlist encoding");
1084 updated = gst_m3u8_client_update (demux->client, playlist);
1086 /* If it's a live source, do not let the sequence number go beyond
1087 * three fragments before the end of the list */
1088 if (updated && update == FALSE && demux->client->current &&
1089 gst_m3u8_client_is_live (demux->client)) {
1090 guint last_sequence;
1092 GST_M3U8_CLIENT_LOCK (demux->client);
1094 GST_M3U8_MEDIA_FILE (g_list_last (demux->client->current->
1095 files)->data)->sequence;
1097 if (demux->client->sequence >= last_sequence - 3) {
1098 GST_DEBUG_OBJECT (demux, "Sequence is beyond playlist. Moving back to %d",
1100 demux->need_segment = TRUE;
1101 demux->client->sequence = last_sequence - 3;
1103 GST_M3U8_CLIENT_UNLOCK (demux->client);
1110 gst_hls_demux_change_playlist (GstHLSDemux * demux, guint max_bitrate)
1112 GList *previous_variant, *current_variant;
1113 gint old_bandwidth, new_bandwidth;
1115 /* If user specifies a connection speed never use a playlist with a bandwidth
1116 * superior than it */
1117 if (demux->connection_speed != 0 && max_bitrate > demux->connection_speed)
1118 max_bitrate = demux->connection_speed;
1120 previous_variant = demux->client->main->current_variant;
1121 current_variant = gst_m3u8_client_get_playlist_for_bitrate (demux->client,
1124 retry_failover_protection:
1125 old_bandwidth = GST_M3U8 (previous_variant->data)->bandwidth;
1126 new_bandwidth = GST_M3U8 (current_variant->data)->bandwidth;
1128 /* Don't do anything else if the playlist is the same */
1129 if (new_bandwidth == old_bandwidth) {
1133 demux->client->main->current_variant = current_variant;
1134 GST_M3U8_CLIENT_UNLOCK (demux->client);
1136 gst_m3u8_client_set_current (demux->client, current_variant->data);
1138 GST_INFO_OBJECT (demux, "Client was on %dbps, max allowed is %dbps, switching"
1139 " to bitrate %dbps", old_bandwidth, max_bitrate, new_bandwidth);
1141 if (gst_hls_demux_update_playlist (demux, FALSE)) {
1144 s = gst_structure_new ("playlist",
1145 "uri", G_TYPE_STRING, gst_m3u8_client_get_current_uri (demux->client),
1146 "bitrate", G_TYPE_INT, new_bandwidth, NULL);
1147 gst_element_post_message (GST_ELEMENT_CAST (demux),
1148 gst_message_new_element (GST_OBJECT_CAST (demux), s));
1150 GList *failover = NULL;
1152 GST_INFO_OBJECT (demux, "Unable to update playlist. Switching back");
1153 GST_M3U8_CLIENT_LOCK (demux->client);
1155 failover = g_list_previous (current_variant);
1156 if (failover && new_bandwidth == GST_M3U8 (failover->data)->bandwidth) {
1157 current_variant = failover;
1158 goto retry_failover_protection;
1161 demux->client->main->current_variant = previous_variant;
1162 GST_M3U8_CLIENT_UNLOCK (demux->client);
1163 gst_m3u8_client_set_current (demux->client, previous_variant->data);
1164 /* Try a lower bitrate (or stop if we just tried the lowest) */
1165 if (new_bandwidth ==
1166 GST_M3U8 (g_list_first (demux->client->main->lists)->data)->bandwidth)
1169 return gst_hls_demux_change_playlist (demux, new_bandwidth - 1);
1172 /* Force typefinding since we might have changed media type */
1173 demux->do_typefind = TRUE;
1179 gst_hls_demux_schedule (GstHLSDemux * demux)
1181 gfloat update_factor;
1184 /* As defined in §6.3.4. Reloading the Playlist file:
1185 * "If the client reloads a Playlist file and finds that it has not
1186 * changed then it MUST wait for a period of time before retrying. The
1187 * minimum delay is a multiple of the target duration. This multiple is
1188 * 0.5 for the first attempt, 1.5 for the second, and 3.0 thereafter."
1190 count = demux->client->update_failed_count;
1192 update_factor = update_interval_factor[count];
1194 update_factor = update_interval_factor[3];
1196 /* schedule the next update using the target duration field of the
1198 g_time_val_add (&demux->next_update,
1199 gst_m3u8_client_get_target_duration (demux->client)
1200 / GST_SECOND * G_USEC_PER_SEC * update_factor);
1201 GST_DEBUG_OBJECT (demux, "Next update scheduled at %s",
1202 g_time_val_to_iso8601 (&demux->next_update));
1208 gst_hls_demux_switch_playlist (GstHLSDemux * demux)
1214 GstFragment *fragment = g_queue_peek_tail (demux->queue);
1217 GST_M3U8_CLIENT_LOCK (demux->client);
1218 if (!demux->client->main->lists) {
1219 GST_M3U8_CLIENT_UNLOCK (demux->client);
1222 GST_M3U8_CLIENT_UNLOCK (demux->client);
1224 /* compare the time when the fragment was downloaded with the time when it was
1226 g_get_current_time (&now);
1227 diff = (GST_TIMEVAL_TO_TIME (now) - GST_TIMEVAL_TO_TIME (demux->next_update));
1228 buffer = gst_fragment_get_buffer (fragment);
1229 size = gst_buffer_get_size (buffer);
1230 bitrate = (size * 8) / ((double) diff / GST_SECOND);
1232 GST_DEBUG ("Downloaded %d bytes in %" GST_TIME_FORMAT ". Bitrate is : %d",
1233 (guint) size, GST_TIME_ARGS (diff), bitrate);
1235 gst_buffer_unref (buffer);
1236 return gst_hls_demux_change_playlist (demux, bitrate * demux->bitrate_limit);
1239 static GstFragment *
1240 gst_hls_demux_decrypt_fragment (GstHLSDemux * demux,
1241 GstFragment * encrypted_fragment, const gchar * key, const guint8 * iv)
1243 GstFragment *key_fragment, *ret;
1244 GstBuffer *key_buffer, *encrypted_buffer, *decrypted_buffer;
1245 GstMapInfo key_info, encrypted_info, decrypted_info;
1246 gnutls_cipher_hd_t aes_ctx;
1247 gnutls_datum_t key_d, iv_d;
1248 gsize unpadded_size;
1250 GST_INFO_OBJECT (demux, "Fetching key %s", key);
1251 key_fragment = gst_uri_downloader_fetch_uri (demux->downloader, key);
1252 if (key_fragment == NULL)
1255 key_buffer = gst_fragment_get_buffer (key_fragment);
1256 encrypted_buffer = gst_fragment_get_buffer (encrypted_fragment);
1258 gst_buffer_new_allocate (NULL, gst_buffer_get_size (encrypted_buffer),
1261 gst_buffer_map (key_buffer, &key_info, GST_MAP_READ);
1262 gst_buffer_map (encrypted_buffer, &encrypted_info, GST_MAP_READ);
1263 gst_buffer_map (decrypted_buffer, &decrypted_info, GST_MAP_WRITE);
1265 key_d.data = key_info.data;
1267 iv_d.data = (unsigned char *) iv;
1269 gnutls_cipher_init (&aes_ctx, gnutls_cipher_get_id ("AES-128-CBC"), &key_d,
1271 gnutls_cipher_decrypt2 (aes_ctx, encrypted_info.data, encrypted_info.size,
1272 decrypted_info.data, decrypted_info.size);
1273 gnutls_cipher_deinit (aes_ctx);
1275 /* Handle pkcs7 unpadding here */
1277 decrypted_info.size - decrypted_info.data[decrypted_info.size - 1];
1279 gst_buffer_unmap (decrypted_buffer, &decrypted_info);
1280 gst_buffer_unmap (encrypted_buffer, &encrypted_info);
1281 gst_buffer_unmap (key_buffer, &key_info);
1283 gst_buffer_resize (decrypted_buffer, 0, unpadded_size);
1285 gst_buffer_unref (key_buffer);
1286 gst_buffer_unref (encrypted_buffer);
1287 g_object_unref (key_fragment);
1288 g_object_unref (encrypted_fragment);
1290 ret = gst_fragment_new ();
1291 gst_fragment_add_buffer (ret, decrypted_buffer);
1292 ret->completed = TRUE;
1298 gst_hls_demux_get_next_fragment (GstHLSDemux * demux, gboolean caching)
1300 GstFragment *download;
1301 const gchar *next_fragment_uri;
1302 GstClockTime duration;
1303 GstClockTime timestamp;
1306 const gchar *key = NULL;
1307 const guint8 *iv = NULL;
1309 if (!gst_m3u8_client_get_next_fragment (demux->client, &discont,
1310 &next_fragment_uri, &duration, ×tamp, &key, &iv)) {
1311 GST_INFO_OBJECT (demux, "This playlist doesn't contain more fragments");
1312 demux->end_of_playlist = TRUE;
1313 gst_task_start (demux->stream_task);
1317 GST_INFO_OBJECT (demux, "Fetching next fragment %s", next_fragment_uri);
1319 download = gst_uri_downloader_fetch_uri (demux->downloader,
1322 if (download && key)
1323 download = gst_hls_demux_decrypt_fragment (demux, download, key, iv);
1325 if (download == NULL)
1328 buf = gst_fragment_get_buffer (download);
1330 GST_BUFFER_DURATION (buf) = duration;
1331 GST_BUFFER_PTS (buf) = timestamp;
1333 /* We actually need to do this every time we switch bitrate */
1334 if (G_UNLIKELY (demux->do_typefind)) {
1335 GstCaps *caps = gst_fragment_get_caps (download);
1337 if (!demux->input_caps || !gst_caps_is_equal (caps, demux->input_caps)) {
1338 gst_caps_replace (&demux->input_caps, caps);
1339 /* gst_pad_set_caps (demux->srcpad, demux->input_caps); */
1340 GST_INFO_OBJECT (demux, "Input source caps: %" GST_PTR_FORMAT,
1342 demux->do_typefind = FALSE;
1344 gst_caps_unref (caps);
1346 gst_fragment_set_caps (download, demux->input_caps);
1350 GST_DEBUG_OBJECT (demux, "Marking fragment as discontinuous");
1351 GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
1354 GST_DEBUG_OBJECT (demux, "Pushing fragment in queue");
1355 g_queue_push_tail (demux->queue, download);
1357 GST_TASK_SIGNAL (demux->updates_task);
1358 gst_task_start (demux->stream_task);
1364 gst_hls_demux_stop (demux);