hlsdemux: Give a proper name to the srcpads and remove it when resetting the element
[platform/upstream/gstreamer.git] / ext / hls / gsthlsdemux.c
1 /* GStreamer
2  * Copyright (C) 2010 Marc-Andre Lureau <marcandre.lureau@gmail.com>
3  * Copyright (C) 2010 Andoni Morales Alastruey <ylatuya@gmail.com>
4  * Copyright (C) 2011, Hewlett-Packard Development Company, L.P.
5  *  Author: Youness Alaoui <youness.alaoui@collabora.co.uk>, Collabora Ltd.
6  *  Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>, Collabora Ltd.
7  *
8  * Gsthlsdemux.c:
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25 /**
26  * SECTION:element-hlsdemux
27  *
28  * HTTP Live Streaming demuxer element.
29  *
30  * <refsect2>
31  * <title>Example launch line</title>
32  * |[
33  * gst-launch souphttpsrc location=http://devimages.apple.com/iphone/samples/bipbop/gear4/prog_index.m3u8 ! hlsdemux ! decodebin2 ! videoconvert ! videoscale ! autovideosink
34  * ]|
35  * </refsect2>
36  *
37  * Last reviewed on 2010-10-07
38  */
39
40 #ifdef HAVE_CONFIG_H
41 #  include "config.h"
42 #endif
43
44 #include <string.h>
45 #ifdef HAVE_NETTLE
46 #include <nettle/aes.h>
47 #include <nettle/cbc.h>
48 #else
49 #include <gcrypt.h>
50 #endif
51 #include "gsthlsdemux.h"
52
53 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
54     GST_PAD_SRC,
55     GST_PAD_SOMETIMES,
56     GST_STATIC_CAPS_ANY);
57
58 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
59     GST_PAD_SINK,
60     GST_PAD_ALWAYS,
61     GST_STATIC_CAPS ("application/x-hls"));
62
63 GST_DEBUG_CATEGORY_STATIC (gst_hls_demux_debug);
64 #define GST_CAT_DEFAULT gst_hls_demux_debug
65
66 enum
67 {
68   PROP_0,
69
70   PROP_FRAGMENTS_CACHE,
71   PROP_BITRATE_LIMIT,
72   PROP_CONNECTION_SPEED,
73   PROP_LAST
74 };
75
76 static const float update_interval_factor[] = { 1, 0.5, 1.5, 3 };
77
78 #define DEFAULT_FRAGMENTS_CACHE 3
79 #define DEFAULT_FAILED_COUNT 3
80 #define DEFAULT_BITRATE_LIMIT 0.8
81 #define DEFAULT_CONNECTION_SPEED    0
82
83 /* GObject */
84 static void gst_hls_demux_set_property (GObject * object, guint prop_id,
85     const GValue * value, GParamSpec * pspec);
86 static void gst_hls_demux_get_property (GObject * object, guint prop_id,
87     GValue * value, GParamSpec * pspec);
88 static void gst_hls_demux_dispose (GObject * obj);
89
90 /* GstElement */
91 static GstStateChangeReturn
92 gst_hls_demux_change_state (GstElement * element, GstStateChange transition);
93
94 /* GstHLSDemux */
95 static GstFlowReturn gst_hls_demux_chain (GstPad * pad, GstObject * parent,
96     GstBuffer * buf);
97 static gboolean gst_hls_demux_sink_event (GstPad * pad, GstObject * parent,
98     GstEvent * event);
99 static gboolean gst_hls_demux_src_event (GstPad * pad, GstObject * parent,
100     GstEvent * event);
101 static gboolean gst_hls_demux_src_query (GstPad * pad, GstObject * parent,
102     GstQuery * query);
103 static void gst_hls_demux_stream_loop (GstHLSDemux * demux);
104 static void gst_hls_demux_updates_loop (GstHLSDemux * demux);
105 static void gst_hls_demux_stop (GstHLSDemux * demux);
106 static void gst_hls_demux_pause_tasks (GstHLSDemux * demux, gboolean caching);
107 static gboolean gst_hls_demux_cache_fragments (GstHLSDemux * demux);
108 static gboolean gst_hls_demux_schedule (GstHLSDemux * demux);
109 static gboolean gst_hls_demux_switch_playlist (GstHLSDemux * demux);
110 static gboolean gst_hls_demux_get_next_fragment (GstHLSDemux * demux,
111     gboolean caching, GError ** err);
112 static gboolean gst_hls_demux_update_playlist (GstHLSDemux * demux,
113     gboolean update, GError ** err);
114 static void gst_hls_demux_reset (GstHLSDemux * demux, gboolean dispose);
115 static gboolean gst_hls_demux_set_location (GstHLSDemux * demux,
116     const gchar * uri);
117 static gchar *gst_hls_src_buf_to_utf8_playlist (GstBuffer * buf);
118
119 #define gst_hls_demux_parent_class parent_class
120 G_DEFINE_TYPE (GstHLSDemux, gst_hls_demux, GST_TYPE_ELEMENT);
121
122 static void
123 gst_hls_demux_dispose (GObject * obj)
124 {
125   GstHLSDemux *demux = GST_HLS_DEMUX (obj);
126
127   if (demux->stream_task) {
128     if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) {
129       GST_DEBUG_OBJECT (demux, "Leaving streaming task");
130       gst_task_stop (demux->stream_task);
131       g_rec_mutex_lock (&demux->stream_lock);
132       g_rec_mutex_unlock (&demux->stream_lock);
133       gst_task_join (demux->stream_task);
134     }
135     gst_object_unref (demux->stream_task);
136     g_rec_mutex_clear (&demux->stream_lock);
137     demux->stream_task = NULL;
138   }
139
140   if (demux->updates_task) {
141     if (GST_TASK_STATE (demux->updates_task) != GST_TASK_STOPPED) {
142       GST_DEBUG_OBJECT (demux, "Leaving updates task");
143       demux->cancelled = TRUE;
144       gst_uri_downloader_cancel (demux->downloader);
145       gst_task_stop (demux->updates_task);
146       g_mutex_lock (&demux->updates_timed_lock);
147       GST_TASK_SIGNAL (demux->updates_task);
148       g_rec_mutex_lock (&demux->updates_lock);
149       g_rec_mutex_unlock (&demux->updates_lock);
150       g_mutex_unlock (&demux->updates_timed_lock);
151       gst_task_join (demux->updates_task);
152     }
153     gst_object_unref (demux->updates_task);
154     g_mutex_clear (&demux->updates_timed_lock);
155     g_rec_mutex_clear (&demux->updates_lock);
156     demux->updates_task = NULL;
157   }
158
159   if (demux->downloader != NULL) {
160     g_object_unref (demux->downloader);
161     demux->downloader = NULL;
162   }
163
164   gst_hls_demux_reset (demux, TRUE);
165
166   g_queue_free (demux->queue);
167
168   G_OBJECT_CLASS (parent_class)->dispose (obj);
169 }
170
171 static void
172 gst_hls_demux_class_init (GstHLSDemuxClass * klass)
173 {
174   GObjectClass *gobject_class;
175   GstElementClass *element_class;
176
177   gobject_class = (GObjectClass *) klass;
178   element_class = (GstElementClass *) klass;
179
180   gobject_class->set_property = gst_hls_demux_set_property;
181   gobject_class->get_property = gst_hls_demux_get_property;
182   gobject_class->dispose = gst_hls_demux_dispose;
183
184   g_object_class_install_property (gobject_class, PROP_FRAGMENTS_CACHE,
185       g_param_spec_uint ("fragments-cache", "Fragments cache",
186           "Number of fragments needed to be cached to start playing",
187           1, G_MAXUINT, DEFAULT_FRAGMENTS_CACHE,
188           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
189
190   g_object_class_install_property (gobject_class, PROP_BITRATE_LIMIT,
191       g_param_spec_float ("bitrate-limit",
192           "Bitrate limit in %",
193           "Limit of the available bitrate to use when switching to alternates.",
194           0, 1, DEFAULT_BITRATE_LIMIT,
195           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
196
197   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
198       g_param_spec_uint ("connection-speed", "Connection Speed",
199           "Network connection speed in kbps (0 = unknown)",
200           0, G_MAXUINT / 1000, DEFAULT_CONNECTION_SPEED,
201           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
202
203   element_class->change_state = GST_DEBUG_FUNCPTR (gst_hls_demux_change_state);
204
205   gst_element_class_add_pad_template (element_class,
206       gst_static_pad_template_get (&srctemplate));
207
208   gst_element_class_add_pad_template (element_class,
209       gst_static_pad_template_get (&sinktemplate));
210
211   gst_element_class_set_static_metadata (element_class,
212       "HLS Demuxer",
213       "Demuxer/URIList",
214       "HTTP Live Streaming demuxer",
215       "Marc-Andre Lureau <marcandre.lureau@gmail.com>\n"
216       "Andoni Morales Alastruey <ylatuya@gmail.com>");
217
218   GST_DEBUG_CATEGORY_INIT (gst_hls_demux_debug, "hlsdemux", 0,
219       "hlsdemux element");
220 }
221
222 static void
223 gst_hls_demux_init (GstHLSDemux * demux)
224 {
225   /* sink pad */
226   demux->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
227   gst_pad_set_chain_function (demux->sinkpad,
228       GST_DEBUG_FUNCPTR (gst_hls_demux_chain));
229   gst_pad_set_event_function (demux->sinkpad,
230       GST_DEBUG_FUNCPTR (gst_hls_demux_sink_event));
231   gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
232
233   /* Downloader */
234   demux->downloader = gst_uri_downloader_new ();
235
236   demux->do_typefind = TRUE;
237
238   /* Properties */
239   demux->fragments_cache = DEFAULT_FRAGMENTS_CACHE;
240   demux->bitrate_limit = DEFAULT_BITRATE_LIMIT;
241   demux->connection_speed = DEFAULT_CONNECTION_SPEED;
242
243   demux->queue = g_queue_new ();
244
245   /* Updates task */
246   g_rec_mutex_init (&demux->updates_lock);
247   demux->updates_task =
248       gst_task_new ((GstTaskFunction) gst_hls_demux_updates_loop, demux, NULL);
249   gst_task_set_lock (demux->updates_task, &demux->updates_lock);
250   g_mutex_init (&demux->updates_timed_lock);
251
252   /* Streaming task */
253   g_rec_mutex_init (&demux->stream_lock);
254   demux->stream_task =
255       gst_task_new ((GstTaskFunction) gst_hls_demux_stream_loop, demux, NULL);
256   gst_task_set_lock (demux->stream_task, &demux->stream_lock);
257
258   demux->have_group_id = FALSE;
259   demux->group_id = G_MAXUINT;
260 }
261
262 static void
263 gst_hls_demux_set_property (GObject * object, guint prop_id,
264     const GValue * value, GParamSpec * pspec)
265 {
266   GstHLSDemux *demux = GST_HLS_DEMUX (object);
267
268   switch (prop_id) {
269     case PROP_FRAGMENTS_CACHE:
270       demux->fragments_cache = g_value_get_uint (value);
271       break;
272     case PROP_BITRATE_LIMIT:
273       demux->bitrate_limit = g_value_get_float (value);
274       break;
275     case PROP_CONNECTION_SPEED:
276       demux->connection_speed = g_value_get_uint (value) * 1000;
277       break;
278     default:
279       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
280       break;
281   }
282 }
283
284 static void
285 gst_hls_demux_get_property (GObject * object, guint prop_id, GValue * value,
286     GParamSpec * pspec)
287 {
288   GstHLSDemux *demux = GST_HLS_DEMUX (object);
289
290   switch (prop_id) {
291     case PROP_FRAGMENTS_CACHE:
292       g_value_set_uint (value, demux->fragments_cache);
293       break;
294     case PROP_BITRATE_LIMIT:
295       g_value_set_float (value, demux->bitrate_limit);
296       break;
297     case PROP_CONNECTION_SPEED:
298       g_value_set_uint (value, demux->connection_speed / 1000);
299       break;
300     default:
301       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
302       break;
303   }
304 }
305
306 static GstStateChangeReturn
307 gst_hls_demux_change_state (GstElement * element, GstStateChange transition)
308 {
309   GstStateChangeReturn ret;
310   GstHLSDemux *demux = GST_HLS_DEMUX (element);
311
312   switch (transition) {
313     case GST_STATE_CHANGE_READY_TO_PAUSED:
314       gst_hls_demux_reset (demux, FALSE);
315       gst_uri_downloader_reset (demux->downloader);
316       break;
317     default:
318       break;
319   }
320
321   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
322
323   switch (transition) {
324     case GST_STATE_CHANGE_PAUSED_TO_READY:
325       demux->cancelled = TRUE;
326       gst_hls_demux_stop (demux);
327       gst_task_join (demux->stream_task);
328       gst_task_join (demux->updates_task);
329       gst_hls_demux_reset (demux, FALSE);
330       break;
331     default:
332       break;
333   }
334   return ret;
335 }
336
337 static gboolean
338 gst_hls_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
339 {
340   GstHLSDemux *demux;
341
342   demux = GST_HLS_DEMUX (parent);
343
344   switch (event->type) {
345     case GST_EVENT_SEEK:
346     {
347       gdouble rate;
348       GstFormat format;
349       GstSeekFlags flags;
350       GstSeekType start_type, stop_type;
351       gint64 start, stop;
352       GList *walk;
353       GstClockTime position, current_pos, target_pos;
354       gint current_sequence;
355       GstM3U8MediaFile *file;
356
357       GST_INFO_OBJECT (demux, "Received GST_EVENT_SEEK");
358
359       if (gst_m3u8_client_is_live (demux->client)) {
360         GST_WARNING_OBJECT (demux, "Received seek event for live stream");
361         return FALSE;
362       }
363
364       gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
365           &stop_type, &stop);
366
367       if (format != GST_FORMAT_TIME)
368         return FALSE;
369
370       GST_DEBUG_OBJECT (demux, "seek event, rate: %f start: %" GST_TIME_FORMAT
371           " stop: %" GST_TIME_FORMAT, rate, GST_TIME_ARGS (start),
372           GST_TIME_ARGS (stop));
373
374       GST_M3U8_CLIENT_LOCK (demux->client);
375       file = GST_M3U8_MEDIA_FILE (demux->client->current->files->data);
376       current_sequence = file->sequence;
377       current_pos = 0;
378       target_pos = (GstClockTime) start;
379       for (walk = demux->client->current->files; walk; walk = walk->next) {
380         file = walk->data;
381
382         current_sequence = file->sequence;
383         if (current_pos <= target_pos
384             && target_pos < current_pos + file->duration) {
385           break;
386         }
387         current_pos += file->duration;
388       }
389       GST_M3U8_CLIENT_UNLOCK (demux->client);
390
391       if (walk == NULL) {
392         GST_WARNING_OBJECT (demux, "Could not find seeked fragment");
393         return FALSE;
394       }
395
396       if (flags & GST_SEEK_FLAG_FLUSH) {
397         GST_DEBUG_OBJECT (demux, "sending flush start");
398         gst_pad_push_event (demux->srcpad, gst_event_new_flush_start ());
399       }
400
401       demux->cancelled = TRUE;
402       gst_task_pause (demux->stream_task);
403       gst_uri_downloader_cancel (demux->downloader);
404       gst_task_stop (demux->updates_task);
405       g_mutex_lock (&demux->updates_timed_lock);
406       GST_TASK_SIGNAL (demux->updates_task);
407       g_mutex_unlock (&demux->updates_timed_lock);
408       g_rec_mutex_lock (&demux->updates_lock);
409       g_rec_mutex_unlock (&demux->updates_lock);
410       gst_task_pause (demux->stream_task);
411
412       /* wait for streaming to finish */
413       g_rec_mutex_lock (&demux->stream_lock);
414
415       demux->need_cache = TRUE;
416       while (!g_queue_is_empty (demux->queue)) {
417         GstFragment *fragment = g_queue_pop_head (demux->queue);
418         g_object_unref (fragment);
419       }
420       g_queue_clear (demux->queue);
421
422       GST_M3U8_CLIENT_LOCK (demux->client);
423       GST_DEBUG_OBJECT (demux, "seeking to sequence %d", current_sequence);
424       demux->client->sequence = current_sequence;
425       gst_m3u8_client_get_current_position (demux->client, &position);
426       demux->position_shift = start - position;
427       demux->need_segment = TRUE;
428       GST_M3U8_CLIENT_UNLOCK (demux->client);
429
430       if (flags & GST_SEEK_FLAG_FLUSH) {
431         GST_DEBUG_OBJECT (demux, "sending flush stop");
432         gst_pad_push_event (demux->srcpad, gst_event_new_flush_stop (TRUE));
433       }
434
435       demux->cancelled = FALSE;
436       gst_uri_downloader_reset (demux->downloader);
437       gst_task_start (demux->stream_task);
438       g_rec_mutex_unlock (&demux->stream_lock);
439
440       return TRUE;
441     }
442     default:
443       break;
444   }
445
446   return gst_pad_event_default (pad, parent, event);
447 }
448
449 static gboolean
450 gst_hls_demux_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
451 {
452   GstHLSDemux *demux;
453   GstQuery *query;
454   gboolean ret;
455   gchar *uri;
456
457   demux = GST_HLS_DEMUX (parent);
458
459   switch (event->type) {
460     case GST_EVENT_EOS:{
461       gchar *playlist = NULL;
462
463       if (demux->playlist == NULL) {
464         GST_WARNING_OBJECT (demux, "Received EOS without a playlist.");
465         break;
466       }
467
468       GST_DEBUG_OBJECT (demux,
469           "Got EOS on the sink pad: main playlist fetched");
470
471       query = gst_query_new_uri ();
472       ret = gst_pad_peer_query (demux->sinkpad, query);
473       if (ret) {
474         gst_query_parse_uri_redirection (query, &uri);
475         if (uri == NULL)
476           gst_query_parse_uri (query, &uri);
477         gst_hls_demux_set_location (demux, uri);
478         g_free (uri);
479       }
480       gst_query_unref (query);
481
482       playlist = gst_hls_src_buf_to_utf8_playlist (demux->playlist);
483       demux->playlist = NULL;
484       if (playlist == NULL) {
485         GST_WARNING_OBJECT (demux, "Error validating first playlist.");
486       } else if (!gst_m3u8_client_update (demux->client, playlist)) {
487         /* In most cases, this will happen if we set a wrong url in the
488          * source element and we have received the 404 HTML response instead of
489          * the playlist */
490         GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid playlist."),
491             (NULL));
492         return FALSE;
493       }
494
495       if (!ret && gst_m3u8_client_is_live (demux->client)) {
496         GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND,
497             ("Failed querying the playlist uri, "
498                 "required for live sources."), (NULL));
499         return FALSE;
500       }
501
502       gst_task_start (demux->stream_task);
503       gst_event_unref (event);
504       return TRUE;
505     }
506     case GST_EVENT_SEGMENT:
507       /* Swallow newsegments, we'll push our own */
508       gst_event_unref (event);
509       return TRUE;
510     default:
511       break;
512   }
513
514   return gst_pad_event_default (pad, parent, event);
515 }
516
517 static gboolean
518 gst_hls_demux_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
519 {
520   GstHLSDemux *hlsdemux;
521   gboolean ret = FALSE;
522
523   if (query == NULL)
524     return FALSE;
525
526   hlsdemux = GST_HLS_DEMUX (parent);
527
528   switch (query->type) {
529     case GST_QUERY_DURATION:{
530       GstClockTime duration = -1;
531       GstFormat fmt;
532
533       gst_query_parse_duration (query, &fmt, NULL);
534       if (fmt == GST_FORMAT_TIME) {
535         duration = gst_m3u8_client_get_duration (hlsdemux->client);
536         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
537           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
538           ret = TRUE;
539         }
540       }
541       GST_INFO_OBJECT (hlsdemux, "GST_QUERY_DURATION returns %s with duration %"
542           GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
543       break;
544     }
545     case GST_QUERY_URI:
546       if (hlsdemux->client) {
547         /* FIXME: Do we answer with the variant playlist, with the current
548          * playlist or the the uri of the least downlowaded fragment? */
549         gst_query_set_uri (query, gst_m3u8_client_get_uri (hlsdemux->client));
550         ret = TRUE;
551       }
552       break;
553     case GST_QUERY_SEEKING:{
554       GstFormat fmt;
555       gint64 stop = -1;
556
557       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
558       GST_INFO_OBJECT (hlsdemux, "Received GST_QUERY_SEEKING with format %d",
559           fmt);
560       if (fmt == GST_FORMAT_TIME) {
561         GstClockTime duration;
562
563         duration = gst_m3u8_client_get_duration (hlsdemux->client);
564         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
565           stop = duration;
566
567         gst_query_set_seeking (query, fmt,
568             !gst_m3u8_client_is_live (hlsdemux->client), 0, stop);
569         ret = TRUE;
570         GST_INFO_OBJECT (hlsdemux, "GST_QUERY_SEEKING returning with stop : %"
571             GST_TIME_FORMAT, GST_TIME_ARGS (stop));
572       }
573       break;
574     }
575     default:
576       /* Don't fordward queries upstream because of the special nature of this
577        * "demuxer", which relies on the upstream element only to be fed with the
578        * first playlist */
579       break;
580   }
581
582   return ret;
583 }
584
585 static GstFlowReturn
586 gst_hls_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
587 {
588   GstHLSDemux *demux = GST_HLS_DEMUX (parent);
589
590   if (demux->playlist == NULL)
591     demux->playlist = buf;
592   else
593     demux->playlist = gst_buffer_append (demux->playlist, buf);
594
595   return GST_FLOW_OK;
596 }
597
598 static void
599 gst_hls_demux_pause_tasks (GstHLSDemux * demux, gboolean caching)
600 {
601   if (GST_TASK_STATE (demux->updates_task) != GST_TASK_STOPPED) {
602     demux->cancelled = TRUE;
603     gst_uri_downloader_cancel (demux->downloader);
604     gst_task_pause (demux->updates_task);
605     if (!caching)
606       g_mutex_lock (&demux->updates_timed_lock);
607     GST_TASK_SIGNAL (demux->updates_task);
608     if (!caching)
609       g_mutex_unlock (&demux->updates_timed_lock);
610   }
611
612   if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) {
613     demux->stop_stream_task = TRUE;
614     gst_task_pause (demux->stream_task);
615   }
616 }
617
618 static void
619 gst_hls_demux_stop (GstHLSDemux * demux)
620 {
621   gst_uri_downloader_cancel (demux->downloader);
622
623   if (GST_TASK_STATE (demux->updates_task) != GST_TASK_STOPPED) {
624     demux->cancelled = TRUE;
625     gst_uri_downloader_cancel (demux->downloader);
626     gst_task_stop (demux->updates_task);
627     g_mutex_lock (&demux->updates_timed_lock);
628     GST_TASK_SIGNAL (demux->updates_task);
629     g_mutex_unlock (&demux->updates_timed_lock);
630     g_rec_mutex_lock (&demux->updates_lock);
631     g_rec_mutex_unlock (&demux->updates_lock);
632   }
633
634   if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) {
635     demux->stop_stream_task = TRUE;
636     gst_task_stop (demux->stream_task);
637     g_rec_mutex_lock (&demux->stream_lock);
638     g_rec_mutex_unlock (&demux->stream_lock);
639   }
640 }
641
642 static void
643 switch_pads (GstHLSDemux * demux, GstCaps * newcaps)
644 {
645   GstPad *oldpad = demux->srcpad;
646   GstEvent *event;
647   gchar *stream_id;
648   gchar *name;
649
650   GST_DEBUG ("Switching pads (oldpad:%p) with caps: %" GST_PTR_FORMAT, oldpad,
651       newcaps);
652
653   /* First create and activate new pad */
654   name = g_strdup_printf ("src_%u", demux->srcpad_counter++);
655   demux->srcpad = gst_pad_new_from_static_template (&srctemplate, name);
656   g_free (name);
657   gst_pad_set_event_function (demux->srcpad,
658       GST_DEBUG_FUNCPTR (gst_hls_demux_src_event));
659   gst_pad_set_query_function (demux->srcpad,
660       GST_DEBUG_FUNCPTR (gst_hls_demux_src_query));
661   gst_pad_use_fixed_caps (demux->srcpad);
662   gst_pad_set_active (demux->srcpad, TRUE);
663
664   stream_id =
665       gst_pad_create_stream_id (demux->srcpad, GST_ELEMENT_CAST (demux), NULL);
666
667   event = gst_pad_get_sticky_event (demux->sinkpad, GST_EVENT_STREAM_START, 0);
668   if (event) {
669     if (gst_event_parse_group_id (event, &demux->group_id))
670       demux->have_group_id = TRUE;
671     else
672       demux->have_group_id = FALSE;
673     gst_event_unref (event);
674   } else if (!demux->have_group_id) {
675     demux->have_group_id = TRUE;
676     demux->group_id = gst_util_group_id_next ();
677   }
678   event = gst_event_new_stream_start (stream_id);
679   if (demux->have_group_id)
680     gst_event_set_group_id (event, demux->group_id);
681
682   gst_pad_push_event (demux->srcpad, event);
683   g_free (stream_id);
684
685   gst_pad_set_caps (demux->srcpad, newcaps);
686
687   gst_element_add_pad (GST_ELEMENT (demux), demux->srcpad);
688
689   gst_element_no_more_pads (GST_ELEMENT (demux));
690
691   if (oldpad) {
692     /* Push out EOS */
693     gst_pad_push_event (oldpad, gst_event_new_eos ());
694     gst_pad_set_active (oldpad, FALSE);
695     gst_element_remove_pad (GST_ELEMENT (demux), oldpad);
696   }
697 }
698
699 static void
700 gst_hls_demux_stream_loop (GstHLSDemux * demux)
701 {
702   GstFragment *fragment = NULL;
703   GstBuffer *buf;
704   GstFlowReturn ret;
705   GstCaps *bufcaps, *srccaps = NULL;
706
707   /* Loop for the source pad task. The task is started when we have
708    * received the main playlist from the source element. It tries first to
709    * cache the first fragments and then it waits until it has more data in the
710    * queue. This task is woken up when we push a new fragment to the queue or
711    * when we reached the end of the playlist  */
712   GST_DEBUG_OBJECT (demux, "Enter task");
713
714   if (G_UNLIKELY (demux->need_cache)) {
715     if (!gst_hls_demux_cache_fragments (demux))
716       goto cache_error;
717
718     /* Pop off the first fragment immediately so the
719      * update task can get the next one already */
720     fragment = g_queue_pop_head (demux->queue);
721
722     /* we can start now the updates thread (only if on playing) */
723     gst_task_start (demux->updates_task);
724     GST_INFO_OBJECT (demux, "First fragments cached successfully");
725   }
726
727   if (!fragment && g_queue_is_empty (demux->queue)) {
728     if (demux->end_of_playlist)
729       goto end_of_playlist;
730
731     goto pause_task;
732   }
733
734   /* If we didn't get our fragment above already */
735   if (!fragment)
736     fragment = g_queue_pop_head (demux->queue);
737
738   /* Figure out if we need to create/switch pads */
739   if (G_LIKELY (demux->srcpad))
740     srccaps = gst_pad_get_current_caps (demux->srcpad);
741   bufcaps = gst_fragment_get_caps (fragment);
742   if (G_UNLIKELY (!bufcaps)) {
743     if (srccaps)
744       gst_caps_unref (srccaps);
745     g_object_unref (fragment);
746     goto type_not_found;
747   }
748
749   buf = gst_fragment_get_buffer (fragment);
750
751   if (G_UNLIKELY (!srccaps || !gst_caps_is_equal_fixed (bufcaps, srccaps)
752           || demux->need_segment)) {
753     switch_pads (demux, bufcaps);
754     demux->need_segment = TRUE;
755   }
756   gst_caps_unref (bufcaps);
757   if (G_LIKELY (srccaps))
758     gst_caps_unref (srccaps);
759   g_object_unref (fragment);
760
761   if (demux->need_segment) {
762     GstSegment segment;
763     GstClockTime start = GST_BUFFER_PTS (buf);
764
765     start += demux->position_shift;
766     /* And send a newsegment */
767     GST_DEBUG_OBJECT (demux, "Sending new-segment. segment start:%"
768         GST_TIME_FORMAT, GST_TIME_ARGS (start));
769     gst_segment_init (&segment, GST_FORMAT_TIME);
770     segment.start = start;
771     segment.time = start;
772     gst_pad_push_event (demux->srcpad, gst_event_new_segment (&segment));
773     demux->need_segment = FALSE;
774     demux->position_shift = 0;
775   }
776
777   GST_DEBUG_OBJECT (demux, "Pushing buffer %p", buf);
778
779   ret = gst_pad_push (demux->srcpad, buf);
780   if (ret != GST_FLOW_OK)
781     goto error_pushing;
782
783   GST_DEBUG_OBJECT (demux, "Pushed buffer");
784
785   return;
786
787 end_of_playlist:
788   {
789     GST_DEBUG_OBJECT (demux, "Reached end of playlist, sending EOS");
790     gst_pad_push_event (demux->srcpad, gst_event_new_eos ());
791     gst_hls_demux_pause_tasks (demux, FALSE);
792     return;
793   }
794
795 cache_error:
796   {
797     /* Pausing a stopped task will start it */
798     if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED)
799       gst_task_pause (demux->stream_task);
800     if (!demux->cancelled) {
801       GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND,
802           ("Could not cache the first fragments"), (NULL));
803       gst_hls_demux_pause_tasks (demux, FALSE);
804     }
805     return;
806   }
807
808 type_not_found:
809   {
810     GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND,
811         ("Could not determine type of stream"), (NULL));
812     gst_hls_demux_pause_tasks (demux, FALSE);
813     return;
814   }
815
816 error_pushing:
817   {
818     if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS) {
819       GST_ELEMENT_ERROR (demux, STREAM, FAILED, (NULL),
820           ("stream stopped, reason %s", gst_flow_get_name (ret)));
821       gst_pad_push_event (demux->srcpad, gst_event_new_eos ());
822     } else {
823       GST_DEBUG_OBJECT (demux, "stream stopped, reason %s",
824           gst_flow_get_name (ret));
825     }
826     gst_hls_demux_pause_tasks (demux, FALSE);
827     return;
828   }
829
830 pause_task:
831   {
832     GST_DEBUG_OBJECT (demux, "Pause task");
833     /* Pausing a stopped task will start it */
834     if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED)
835       gst_task_pause (demux->stream_task);
836     return;
837   }
838 }
839
840 static void
841 gst_hls_demux_reset (GstHLSDemux * demux, gboolean dispose)
842 {
843   demux->need_cache = TRUE;
844   demux->end_of_playlist = FALSE;
845   demux->cancelled = FALSE;
846   demux->do_typefind = TRUE;
847
848   g_free (demux->key_url);
849   demux->key_url = NULL;
850
851   if (demux->key_fragment)
852     g_object_unref (demux->key_fragment);
853   demux->key_fragment = NULL;
854
855   if (demux->input_caps) {
856     gst_caps_unref (demux->input_caps);
857     demux->input_caps = NULL;
858   }
859
860   if (demux->playlist) {
861     gst_buffer_unref (demux->playlist);
862     demux->playlist = NULL;
863   }
864
865   if (demux->client) {
866     gst_m3u8_client_free (demux->client);
867     demux->client = NULL;
868   }
869
870   if (!dispose) {
871     demux->client = gst_m3u8_client_new ("");
872   }
873
874   while (!g_queue_is_empty (demux->queue)) {
875     GstFragment *fragment = g_queue_pop_head (demux->queue);
876     g_object_unref (fragment);
877   }
878   g_queue_clear (demux->queue);
879
880   demux->position_shift = 0;
881   demux->need_segment = TRUE;
882
883   demux->have_group_id = FALSE;
884   demux->group_id = G_MAXUINT;
885
886   demux->srcpad_counter = 0;
887   if (demux->srcpad) {
888     gst_element_remove_pad (GST_ELEMENT_CAST (demux), demux->srcpad);
889     demux->srcpad = NULL;
890   }
891 }
892
893 static gboolean
894 gst_hls_demux_set_location (GstHLSDemux * demux, const gchar * uri)
895 {
896   if (demux->client)
897     gst_m3u8_client_free (demux->client);
898   demux->client = gst_m3u8_client_new (uri);
899   GST_INFO_OBJECT (demux, "Changed location: %s", uri);
900   return TRUE;
901 }
902
903 void
904 gst_hls_demux_updates_loop (GstHLSDemux * demux)
905 {
906   /* Loop for the updates. It's started when the first fragments are cached and
907    * schedules the next update of the playlist (for lives sources) and the next
908    * update of fragments. When a new fragment is downloaded, it compares the
909    * download time with the next scheduled update to check if we can or should
910    * switch to a different bitrate */
911
912   /* block until the next scheduled update or the signal to quit this thread */
913   g_mutex_lock (&demux->updates_timed_lock);
914   GST_DEBUG_OBJECT (demux, "Started updates task");
915   while (TRUE) {
916     if (demux->cancelled)
917       goto quit;
918
919     /* fetch the next fragment */
920     if (g_queue_get_length (demux->queue) < demux->fragments_cache) {
921       GError *err = NULL;
922
923       GST_DEBUG_OBJECT (demux, "queue not full, get next fragment");
924       if (!gst_hls_demux_get_next_fragment (demux, FALSE, &err)) {
925         if (demux->cancelled) {
926           g_clear_error (&err);
927           goto quit;
928         } else if (!demux->end_of_playlist) {
929           demux->client->update_failed_count++;
930           if (demux->client->update_failed_count < DEFAULT_FAILED_COUNT) {
931             GST_WARNING_OBJECT (demux, "Could not fetch the next fragment");
932             g_clear_error (&err);
933             continue;
934           } else {
935             gst_element_post_message (GST_ELEMENT_CAST (demux),
936                 gst_message_new_error (GST_OBJECT_CAST (demux), err,
937                     "Could not fetch the next fragment"));
938             g_clear_error (&err);
939             goto error;
940           }
941         }
942       } else {
943         demux->client->update_failed_count = 0;
944
945         if (demux->cancelled)
946           goto quit;
947
948         /* try to switch to another bitrate if needed */
949         gst_hls_demux_switch_playlist (demux);
950       }
951     }
952
953     /* schedule the next update */
954     gst_hls_demux_schedule (demux);
955
956     /*  block until the next scheduled update or the signal to quit this thread */
957     GST_DEBUG_OBJECT (demux, "Waiting");
958     if (g_cond_wait_until (GST_TASK_GET_COND (demux->updates_task),
959             &demux->updates_timed_lock, demux->next_update)) {
960       GST_DEBUG_OBJECT (demux, "Unlocked");
961       goto quit;
962     }
963     GST_DEBUG_OBJECT (demux, "Continue");
964
965     if (demux->cancelled)
966       goto quit;
967
968     /* update the playlist for live sources */
969     if (gst_m3u8_client_is_live (demux->client)) {
970       GError *err = NULL;
971
972       if (!gst_hls_demux_update_playlist (demux, TRUE, &err)) {
973         if (demux->cancelled)
974           goto quit;
975         demux->client->update_failed_count++;
976         if (demux->client->update_failed_count < DEFAULT_FAILED_COUNT) {
977           GST_WARNING_OBJECT (demux, "Could not update the playlist");
978           continue;
979         } else {
980           gst_element_post_message (GST_ELEMENT_CAST (demux),
981               gst_message_new_error (GST_OBJECT_CAST (demux), err,
982                   "Could not update the playlist"));
983           g_error_free (err);
984           goto error;
985         }
986       }
987     }
988
989     /* if it's a live source and the playlist couldn't be updated, there aren't
990      * more fragments in the playlist, so we just wait for the next schedulled
991      * update */
992     if (gst_m3u8_client_is_live (demux->client) &&
993         demux->client->update_failed_count > 0) {
994       GST_WARNING_OBJECT (demux,
995           "The playlist hasn't been updated, failed count is %d",
996           demux->client->update_failed_count);
997       continue;
998     }
999
1000     if (demux->cancelled)
1001       goto quit;
1002   }
1003
1004 quit:
1005   {
1006     GST_DEBUG_OBJECT (demux, "Stopped updates task");
1007     g_mutex_unlock (&demux->updates_timed_lock);
1008     return;
1009   }
1010
1011 error:
1012   {
1013     GST_DEBUG_OBJECT (demux, "Stopped updates task because of error");
1014     gst_hls_demux_pause_tasks (demux, TRUE);
1015     g_mutex_unlock (&demux->updates_timed_lock);
1016   }
1017 }
1018
1019 static gboolean
1020 gst_hls_demux_cache_fragments (GstHLSDemux * demux)
1021 {
1022   gint i;
1023
1024   /* If this playlist is a variant playlist, select the first one
1025    * and update it */
1026   if (gst_m3u8_client_has_variant_playlist (demux->client)) {
1027     GstM3U8 *child = NULL;
1028     GError *err = NULL;
1029
1030     if (demux->connection_speed == 0) {
1031
1032       GST_M3U8_CLIENT_LOCK (demux->client);
1033       child = demux->client->main->current_variant->data;
1034       GST_M3U8_CLIENT_UNLOCK (demux->client);
1035     } else {
1036       GList *tmp = gst_m3u8_client_get_playlist_for_bitrate (demux->client,
1037           demux->connection_speed);
1038
1039       child = GST_M3U8 (tmp->data);
1040     }
1041
1042     gst_m3u8_client_set_current (demux->client, child);
1043     if (!gst_hls_demux_update_playlist (demux, FALSE, &err)) {
1044       gst_element_post_message (GST_ELEMENT_CAST (demux),
1045           gst_message_new_error (GST_OBJECT_CAST (demux), err,
1046               "Could not fetch the child playlist"));
1047       g_error_free (err);
1048       return FALSE;
1049     }
1050   }
1051
1052   if (!gst_m3u8_client_is_live (demux->client)) {
1053     GstClockTime duration = gst_m3u8_client_get_duration (demux->client);
1054
1055     GST_DEBUG_OBJECT (demux, "Sending duration message : %" GST_TIME_FORMAT,
1056         GST_TIME_ARGS (duration));
1057     if (duration != GST_CLOCK_TIME_NONE)
1058       gst_element_post_message (GST_ELEMENT (demux),
1059           gst_message_new_duration_changed (GST_OBJECT (demux)));
1060   }
1061
1062   /* Cache the first fragments */
1063   for (i = 0; i < demux->fragments_cache; i++) {
1064     GError *err = NULL;
1065
1066     gst_element_post_message (GST_ELEMENT (demux),
1067         gst_message_new_buffering (GST_OBJECT (demux),
1068             100 * i / demux->fragments_cache));
1069     demux->next_update = g_get_monotonic_time ();
1070     if (!gst_hls_demux_get_next_fragment (demux, TRUE, &err)) {
1071       if (demux->end_of_playlist)
1072         break;
1073       if (!demux->cancelled) {
1074         gst_element_post_message (GST_ELEMENT_CAST (demux),
1075             gst_message_new_error (GST_OBJECT_CAST (demux), err,
1076                 "Error caching the first fragments"));
1077       }
1078       g_clear_error (&err);
1079       return FALSE;
1080     }
1081     /* make sure we stop caching fragments if something cancelled it */
1082     if (demux->cancelled)
1083       return FALSE;
1084     gst_hls_demux_switch_playlist (demux);
1085   }
1086   gst_element_post_message (GST_ELEMENT (demux),
1087       gst_message_new_buffering (GST_OBJECT (demux), 100));
1088
1089   /* Start downloading 1s early to keep the risk of
1090    * underflows lower */
1091   demux->next_update = g_get_monotonic_time () - G_USEC_PER_SEC;
1092
1093   demux->need_cache = FALSE;
1094   return TRUE;
1095 }
1096
1097 static gchar *
1098 gst_hls_src_buf_to_utf8_playlist (GstBuffer * buf)
1099 {
1100   GstMapInfo info;
1101   gchar *playlist;
1102
1103   if (!gst_buffer_map (buf, &info, GST_MAP_READ))
1104     goto map_error;
1105
1106   if (!g_utf8_validate ((gchar *) info.data, info.size, NULL))
1107     goto validate_error;
1108
1109   /* alloc size + 1 to end with a null character */
1110   playlist = g_malloc0 (info.size + 1);
1111   memcpy (playlist, info.data, info.size);
1112
1113   gst_buffer_unmap (buf, &info);
1114   gst_buffer_unref (buf);
1115   return playlist;
1116
1117 validate_error:
1118   gst_buffer_unmap (buf, &info);
1119 map_error:
1120   gst_buffer_unref (buf);
1121   return NULL;
1122 }
1123
1124 static gboolean
1125 gst_hls_demux_update_playlist (GstHLSDemux * demux, gboolean update,
1126     GError ** err)
1127 {
1128   GstFragment *download;
1129   GstBuffer *buf;
1130   gchar *playlist;
1131   gboolean updated = FALSE;
1132
1133   const gchar *uri = gst_m3u8_client_get_current_uri (demux->client);
1134
1135   download = gst_uri_downloader_fetch_uri (demux->downloader, uri, TRUE, err);
1136   if (download == NULL)
1137     return FALSE;
1138
1139   buf = gst_fragment_get_buffer (download);
1140   playlist = gst_hls_src_buf_to_utf8_playlist (buf);
1141   g_object_unref (download);
1142
1143   if (playlist == NULL) {
1144     GST_WARNING_OBJECT (demux, "Couldn't validate playlist encoding");
1145     g_set_error (err, GST_STREAM_ERROR, GST_STREAM_ERROR_WRONG_TYPE,
1146         "Couldn't validate playlist encoding");
1147     return FALSE;
1148   }
1149
1150   updated = gst_m3u8_client_update (demux->client, playlist);
1151   if (!updated) {
1152     GST_WARNING_OBJECT (demux, "Couldn't update playlist");
1153     g_set_error (err, GST_STREAM_ERROR, GST_STREAM_ERROR_FAILED,
1154         "Couldn't update playlist");
1155     return FALSE;
1156   }
1157
1158   /*  If it's a live source, do not let the sequence number go beyond
1159    * three fragments before the end of the list */
1160   if (updated && update == FALSE && demux->client->current &&
1161       gst_m3u8_client_is_live (demux->client)) {
1162     guint last_sequence;
1163
1164     GST_M3U8_CLIENT_LOCK (demux->client);
1165     last_sequence =
1166         GST_M3U8_MEDIA_FILE (g_list_last (demux->client->current->
1167             files)->data)->sequence;
1168
1169     if (demux->client->sequence >= last_sequence - 3) {
1170       GST_DEBUG_OBJECT (demux, "Sequence is beyond playlist. Moving back to %d",
1171           last_sequence - 3);
1172       demux->need_segment = TRUE;
1173       demux->client->sequence = last_sequence - 3;
1174     }
1175     GST_M3U8_CLIENT_UNLOCK (demux->client);
1176   }
1177
1178   return updated;
1179 }
1180
1181 static gboolean
1182 gst_hls_demux_change_playlist (GstHLSDemux * demux, guint max_bitrate)
1183 {
1184   GList *previous_variant, *current_variant;
1185   gint old_bandwidth, new_bandwidth;
1186
1187   /* If user specifies a connection speed never use a playlist with a bandwidth
1188    * superior than it */
1189   if (demux->connection_speed != 0 && max_bitrate > demux->connection_speed)
1190     max_bitrate = demux->connection_speed;
1191
1192   previous_variant = demux->client->main->current_variant;
1193   current_variant = gst_m3u8_client_get_playlist_for_bitrate (demux->client,
1194       max_bitrate);
1195
1196 retry_failover_protection:
1197   old_bandwidth = GST_M3U8 (previous_variant->data)->bandwidth;
1198   new_bandwidth = GST_M3U8 (current_variant->data)->bandwidth;
1199
1200   /* Don't do anything else if the playlist is the same */
1201   if (new_bandwidth == old_bandwidth) {
1202     return TRUE;
1203   }
1204
1205   demux->client->main->current_variant = current_variant;
1206   GST_M3U8_CLIENT_UNLOCK (demux->client);
1207
1208   gst_m3u8_client_set_current (demux->client, current_variant->data);
1209
1210   GST_INFO_OBJECT (demux, "Client was on %dbps, max allowed is %dbps, switching"
1211       " to bitrate %dbps", old_bandwidth, max_bitrate, new_bandwidth);
1212
1213   if (gst_hls_demux_update_playlist (demux, FALSE, NULL)) {
1214     GstStructure *s;
1215
1216     s = gst_structure_new ("playlist",
1217         "uri", G_TYPE_STRING, gst_m3u8_client_get_current_uri (demux->client),
1218         "bitrate", G_TYPE_INT, new_bandwidth, NULL);
1219     gst_element_post_message (GST_ELEMENT_CAST (demux),
1220         gst_message_new_element (GST_OBJECT_CAST (demux), s));
1221   } else {
1222     GList *failover = NULL;
1223
1224     GST_INFO_OBJECT (demux, "Unable to update playlist. Switching back");
1225     GST_M3U8_CLIENT_LOCK (demux->client);
1226
1227     failover = g_list_previous (current_variant);
1228     if (failover && new_bandwidth == GST_M3U8 (failover->data)->bandwidth) {
1229       current_variant = failover;
1230       goto retry_failover_protection;
1231     }
1232
1233     demux->client->main->current_variant = previous_variant;
1234     GST_M3U8_CLIENT_UNLOCK (demux->client);
1235     gst_m3u8_client_set_current (demux->client, previous_variant->data);
1236     /*  Try a lower bitrate (or stop if we just tried the lowest) */
1237     if (new_bandwidth ==
1238         GST_M3U8 (g_list_first (demux->client->main->lists)->data)->bandwidth)
1239       return FALSE;
1240     else
1241       return gst_hls_demux_change_playlist (demux, new_bandwidth - 1);
1242   }
1243
1244   /* Force typefinding since we might have changed media type */
1245   demux->do_typefind = TRUE;
1246
1247   return TRUE;
1248 }
1249
1250 static gboolean
1251 gst_hls_demux_schedule (GstHLSDemux * demux)
1252 {
1253   gfloat update_factor;
1254   gint count;
1255
1256   /* As defined in Â§6.3.4. Reloading the Playlist file:
1257    * "If the client reloads a Playlist file and finds that it has not
1258    * changed then it MUST wait for a period of time before retrying.  The
1259    * minimum delay is a multiple of the target duration.  This multiple is
1260    * 0.5 for the first attempt, 1.5 for the second, and 3.0 thereafter."
1261    */
1262   count = demux->client->update_failed_count;
1263   if (count < 3)
1264     update_factor = update_interval_factor[count];
1265   else
1266     update_factor = update_interval_factor[3];
1267
1268   /* schedule the next update using the target duration field of the
1269    * playlist */
1270   demux->next_update +=
1271       gst_util_uint64_scale (gst_m3u8_client_get_current_fragment_duration
1272       (demux->client), G_USEC_PER_SEC * update_factor, GST_SECOND);
1273   GST_DEBUG_OBJECT (demux, "Next update scheduled at %" G_GINT64_FORMAT,
1274       demux->next_update);
1275
1276   return TRUE;
1277 }
1278
1279 static gboolean
1280 gst_hls_demux_switch_playlist (GstHLSDemux * demux)
1281 {
1282   GstClockTime diff;
1283   gsize size;
1284   gint bitrate;
1285   GstFragment *fragment;
1286   GstBuffer *buffer;
1287
1288   GST_M3U8_CLIENT_LOCK (demux->client);
1289   fragment = g_queue_peek_tail (demux->queue);
1290   if (!demux->client->main->lists || !fragment) {
1291     GST_M3U8_CLIENT_UNLOCK (demux->client);
1292     return TRUE;
1293   }
1294   GST_M3U8_CLIENT_UNLOCK (demux->client);
1295
1296   /* compare the time when the fragment was downloaded with the time when it was
1297    * scheduled */
1298   diff = g_get_monotonic_time () - demux->next_update;
1299   buffer = gst_fragment_get_buffer (fragment);
1300   size = gst_buffer_get_size (buffer);
1301   bitrate = (size * 8) / ((double) diff / G_USEC_PER_SEC);
1302
1303   GST_DEBUG ("Downloaded %d bytes in %" GST_TIME_FORMAT ". Bitrate is : %d",
1304       (guint) size, GST_TIME_ARGS (diff), bitrate);
1305
1306   gst_buffer_unref (buffer);
1307   return gst_hls_demux_change_playlist (demux, bitrate * demux->bitrate_limit);
1308 }
1309
1310 #ifdef HAVE_NETTLE
1311 static gboolean
1312 decrypt_fragment (GstHLSDemux * demux, gsize length,
1313     const guint8 * encrypted_data, guint8 * decrypted_data,
1314     const guint8 * key_data, const guint8 * iv_data)
1315 {
1316   struct CBC_CTX (struct aes_ctx, AES_BLOCK_SIZE) aes_ctx;
1317
1318   if (length % 16 != 0)
1319     return FALSE;
1320
1321   aes_set_decrypt_key (&aes_ctx.ctx, 16, key_data);
1322   CBC_SET_IV (&aes_ctx, iv_data);
1323
1324   CBC_DECRYPT (&aes_ctx, aes_decrypt, length, decrypted_data, encrypted_data);
1325
1326   return TRUE;
1327 }
1328 #else
1329 static gboolean
1330 decrypt_fragment (GstHLSDemux * demux, gsize length,
1331     const guint8 * encrypted_data, guint8 * decrypted_data,
1332     const guint8 * key_data, const guint8 * iv_data)
1333 {
1334   gcry_cipher_hd_t aes_ctx = NULL;
1335   gcry_error_t err = 0;
1336   gboolean ret = FALSE;
1337
1338   err =
1339       gcry_cipher_open (&aes_ctx, GCRY_CIPHER_AES128, GCRY_CIPHER_MODE_CBC, 0);
1340   if (err)
1341     goto out;
1342   err = gcry_cipher_setkey (aes_ctx, key_data, 16);
1343   if (err)
1344     goto out;
1345   err = gcry_cipher_setiv (aes_ctx, iv_data, 16);
1346   if (err)
1347     goto out;
1348   err = gcry_cipher_decrypt (aes_ctx, decrypted_data, length,
1349       encrypted_data, length);
1350   if (err)
1351     goto out;
1352
1353   ret = TRUE;
1354
1355 out:
1356   if (aes_ctx)
1357     gcry_cipher_close (aes_ctx);
1358
1359   return ret;
1360 }
1361 #endif
1362
1363 static GstFragment *
1364 gst_hls_demux_decrypt_fragment (GstHLSDemux * demux,
1365     GstFragment * encrypted_fragment, const gchar * key, const guint8 * iv,
1366     GError ** err)
1367 {
1368   GstFragment *key_fragment, *ret = NULL;
1369   GstBuffer *key_buffer, *encrypted_buffer, *decrypted_buffer;
1370   GstMapInfo key_info, encrypted_info, decrypted_info;
1371   gsize unpadded_size;
1372
1373   if (demux->key_url && strcmp (demux->key_url, key) == 0) {
1374     key_fragment = g_object_ref (demux->key_fragment);
1375   } else {
1376     g_free (demux->key_url);
1377     demux->key_url = NULL;
1378
1379     if (demux->key_fragment)
1380       g_object_unref (demux->key_fragment);
1381     demux->key_fragment = NULL;
1382
1383     GST_INFO_OBJECT (demux, "Fetching key %s", key);
1384     key_fragment =
1385         gst_uri_downloader_fetch_uri (demux->downloader, key, FALSE, err);
1386     if (key_fragment == NULL)
1387       goto key_failed;
1388     demux->key_url = g_strdup (key);
1389     demux->key_fragment = g_object_ref (key_fragment);
1390   }
1391
1392   key_buffer = gst_fragment_get_buffer (key_fragment);
1393   encrypted_buffer = gst_fragment_get_buffer (encrypted_fragment);
1394   decrypted_buffer =
1395       gst_buffer_new_allocate (NULL, gst_buffer_get_size (encrypted_buffer),
1396       NULL);
1397
1398   gst_buffer_map (key_buffer, &key_info, GST_MAP_READ);
1399   gst_buffer_map (encrypted_buffer, &encrypted_info, GST_MAP_READ);
1400   gst_buffer_map (decrypted_buffer, &decrypted_info, GST_MAP_WRITE);
1401
1402   if (key_info.size != 16)
1403     goto decrypt_error;
1404   if (!decrypt_fragment (demux, encrypted_info.size,
1405           encrypted_info.data, decrypted_info.data, key_info.data, iv))
1406     goto decrypt_error;
1407
1408   /* Handle pkcs7 unpadding here */
1409   unpadded_size =
1410       decrypted_info.size - decrypted_info.data[decrypted_info.size - 1];
1411
1412   gst_buffer_unmap (decrypted_buffer, &decrypted_info);
1413   gst_buffer_unmap (encrypted_buffer, &encrypted_info);
1414   gst_buffer_unmap (key_buffer, &key_info);
1415
1416   gst_buffer_resize (decrypted_buffer, 0, unpadded_size);
1417
1418   gst_buffer_unref (key_buffer);
1419   gst_buffer_unref (encrypted_buffer);
1420   g_object_unref (key_fragment);
1421
1422   ret = gst_fragment_new ();
1423   gst_fragment_add_buffer (ret, decrypted_buffer);
1424   ret->completed = TRUE;
1425 key_failed:
1426   g_object_unref (encrypted_fragment);
1427   return ret;
1428
1429 decrypt_error:
1430   GST_ERROR_OBJECT (demux, "Failed to decrypt fragment");
1431   g_set_error (err, GST_STREAM_ERROR, GST_STREAM_ERROR_DECRYPT,
1432       "Failed to decrypt fragment");
1433
1434   gst_buffer_unmap (decrypted_buffer, &decrypted_info);
1435   gst_buffer_unmap (encrypted_buffer, &encrypted_info);
1436   gst_buffer_unmap (key_buffer, &key_info);
1437
1438   gst_buffer_unref (key_buffer);
1439   gst_buffer_unref (encrypted_buffer);
1440   gst_buffer_unref (decrypted_buffer);
1441
1442   g_object_unref (key_fragment);
1443   g_object_unref (encrypted_fragment);
1444   return ret;
1445 }
1446
1447 static gboolean
1448 gst_hls_demux_get_next_fragment (GstHLSDemux * demux, gboolean caching,
1449     GError ** err)
1450 {
1451   GstFragment *download;
1452   const gchar *next_fragment_uri;
1453   GstClockTime duration;
1454   GstClockTime timestamp;
1455   GstBuffer *buf;
1456   gboolean discont;
1457   const gchar *key = NULL;
1458   const guint8 *iv = NULL;
1459
1460   if (!gst_m3u8_client_get_next_fragment (demux->client, &discont,
1461           &next_fragment_uri, &duration, &timestamp, &key, &iv)) {
1462     GST_INFO_OBJECT (demux, "This playlist doesn't contain more fragments");
1463     demux->end_of_playlist = TRUE;
1464     gst_task_start (demux->stream_task);
1465     return FALSE;
1466   }
1467
1468   GST_INFO_OBJECT (demux, "Fetching next fragment %s", next_fragment_uri);
1469
1470   download = gst_uri_downloader_fetch_uri (demux->downloader,
1471       next_fragment_uri, FALSE, err);
1472
1473   if (download == NULL)
1474     goto error;
1475
1476   if (key) {
1477     download = gst_hls_demux_decrypt_fragment (demux, download, key, iv, err);
1478     if (download == NULL)
1479       goto error;
1480   }
1481
1482   buf = gst_fragment_get_buffer (download);
1483
1484   GST_DEBUG_OBJECT (demux, "set fragment pts=%" GST_TIME_FORMAT " duration=%"
1485       GST_TIME_FORMAT, GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration));
1486
1487   GST_BUFFER_DURATION (buf) = duration;
1488   GST_BUFFER_PTS (buf) = timestamp;
1489
1490   /* We actually need to do this every time we switch bitrate */
1491   if (G_UNLIKELY (demux->do_typefind)) {
1492     GstCaps *caps = gst_fragment_get_caps (download);
1493
1494     if (G_UNLIKELY (!caps)) {
1495       GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND,
1496           ("Could not determine type of stream"), (NULL));
1497       gst_buffer_unref (buf);
1498       g_object_unref (download);
1499       goto error;
1500     }
1501
1502     if (!demux->input_caps || !gst_caps_is_equal (caps, demux->input_caps)) {
1503       gst_caps_replace (&demux->input_caps, caps);
1504       /* gst_pad_set_caps (demux->srcpad, demux->input_caps); */
1505       GST_INFO_OBJECT (demux, "Input source caps: %" GST_PTR_FORMAT,
1506           demux->input_caps);
1507       demux->do_typefind = FALSE;
1508     }
1509     gst_caps_unref (caps);
1510   } else {
1511     gst_fragment_set_caps (download, demux->input_caps);
1512   }
1513
1514   if (discont) {
1515     GST_DEBUG_OBJECT (demux, "Marking fragment as discontinuous");
1516     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
1517   }
1518
1519   /* The buffer ref is still kept inside the fragment download */
1520   gst_buffer_unref (buf);
1521
1522   GST_DEBUG_OBJECT (demux, "Pushing fragment in queue");
1523   g_queue_push_tail (demux->queue, download);
1524   if (!caching) {
1525     GST_TASK_SIGNAL (demux->updates_task);
1526     gst_task_start (demux->stream_task);
1527   }
1528   return TRUE;
1529
1530 error:
1531   {
1532     return FALSE;
1533   }
1534 }