8de5b417918ce28cb311d73364214fcba92b3103
[platform/upstream/gstreamer.git] / gst / hls / gsthlsdemux.c
1 /* GStreamer
2  * Copyright (C) 2010 Marc-Andre Lureau <marcandre.lureau@gmail.com>
3  * Copyright (C) 2010 Andoni Morales Alastruey <ylatuya@gmail.com>
4  * Copyright (C) 2011, Hewlett-Packard Development Company, L.P.
5  *  Author: Youness Alaoui <youness.alaoui@collabora.co.uk>, Collabora Ltd.
6  *  Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>, Collabora Ltd.
7  *
8  * Gsthlsdemux.c:
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 02111-1307, USA.
24  */
25 /**
26  * SECTION:element-hlsdemux
27  *
28  * HTTP Live Streaming demuxer element.
29  *
30  * <refsect2>
31  * <title>Example launch line</title>
32  * |[
33  * gst-launch souphttpsrc location=http://devimages.apple.com/iphone/samples/bipbop/gear4/prog_index.m3u8 ! hlsdemux ! decodebin2 ! ffmpegcolorspace ! videoscale ! autovideosink
34  * ]|
35  * </refsect2>
36  *
37  * Last reviewed on 2010-10-07
38  */
39
40 #ifdef HAVE_CONFIG_H
41 #  include "config.h"
42 #endif
43
44 /* FIXME 0.11: suppress warnings for deprecated API such as GStaticRecMutex
45  * with newer GLib versions (>= 2.31.0) */
46 #define GLIB_DISABLE_DEPRECATION_WARNINGS
47
48 #include <string.h>
49 #include <gst/glib-compat-private.h>
50 #include "gsthlsdemux.h"
51
52 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
53     GST_PAD_SRC,
54     GST_PAD_SOMETIMES,
55     GST_STATIC_CAPS_ANY);
56
57 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
58     GST_PAD_SINK,
59     GST_PAD_ALWAYS,
60     GST_STATIC_CAPS ("application/x-hls"));
61
62 GST_DEBUG_CATEGORY_STATIC (gst_hls_demux_debug);
63 #define GST_CAT_DEFAULT gst_hls_demux_debug
64
65 enum
66 {
67   PROP_0,
68
69   PROP_FRAGMENTS_CACHE,
70   PROP_BITRATE_LIMIT,
71   PROP_CONNECTION_SPEED,
72   PROP_LAST
73 };
74
75 static const float update_interval_factor[] = { 1, 0.5, 1.5, 3 };
76
77 #define DEFAULT_FRAGMENTS_CACHE 3
78 #define DEFAULT_FAILED_COUNT 3
79 #define DEFAULT_BITRATE_LIMIT 0.8
80 #define DEFAULT_CONNECTION_SPEED    0
81
82 /* GObject */
83 static void gst_hls_demux_set_property (GObject * object, guint prop_id,
84     const GValue * value, GParamSpec * pspec);
85 static void gst_hls_demux_get_property (GObject * object, guint prop_id,
86     GValue * value, GParamSpec * pspec);
87 static void gst_hls_demux_dispose (GObject * obj);
88
89 /* GstElement */
90 static GstStateChangeReturn
91 gst_hls_demux_change_state (GstElement * element, GstStateChange transition);
92
93 /* GstHLSDemux */
94 static GstFlowReturn gst_hls_demux_chain (GstPad * pad, GstObject * parent,
95     GstBuffer * buf);
96 static gboolean gst_hls_demux_sink_event (GstPad * pad, GstObject * parent,
97     GstEvent * event);
98 static gboolean gst_hls_demux_src_event (GstPad * pad, GstObject * parent,
99     GstEvent * event);
100 static gboolean gst_hls_demux_src_query (GstPad * pad, GstObject * parent,
101     GstQuery * query);
102 static void gst_hls_demux_stream_loop (GstHLSDemux * demux);
103 static void gst_hls_demux_updates_loop (GstHLSDemux * demux);
104 static void gst_hls_demux_stop (GstHLSDemux * demux);
105 static gboolean gst_hls_demux_cache_fragments (GstHLSDemux * demux);
106 static gboolean gst_hls_demux_schedule (GstHLSDemux * demux);
107 static gboolean gst_hls_demux_switch_playlist (GstHLSDemux * demux);
108 static gboolean gst_hls_demux_get_next_fragment (GstHLSDemux * demux,
109     gboolean caching);
110 static gboolean gst_hls_demux_update_playlist (GstHLSDemux * demux,
111     gboolean update);
112 static void gst_hls_demux_reset (GstHLSDemux * demux, gboolean dispose);
113 static gboolean gst_hls_demux_set_location (GstHLSDemux * demux,
114     const gchar * uri);
115 static gchar *gst_hls_src_buf_to_utf8_playlist (GstBuffer * buf);
116
117 #define gst_hls_demux_parent_class parent_class
118 G_DEFINE_TYPE (GstHLSDemux, gst_hls_demux, GST_TYPE_ELEMENT);
119
120 static void
121 gst_hls_demux_dispose (GObject * obj)
122 {
123   GstHLSDemux *demux = GST_HLS_DEMUX (obj);
124
125   if (demux->stream_task) {
126     if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) {
127       GST_DEBUG_OBJECT (demux, "Leaving streaming task");
128       gst_task_stop (demux->stream_task);
129       gst_task_join (demux->stream_task);
130     }
131     gst_object_unref (demux->stream_task);
132     g_rec_mutex_clear (&demux->stream_lock);
133     demux->stream_task = NULL;
134   }
135
136   if (demux->updates_task) {
137     if (GST_TASK_STATE (demux->updates_task) != GST_TASK_STOPPED) {
138       GST_DEBUG_OBJECT (demux, "Leaving updates task");
139       gst_task_stop (demux->updates_task);
140       gst_task_join (demux->updates_task);
141     }
142     gst_object_unref (demux->updates_task);
143     g_mutex_clear (&demux->updates_timed_lock);
144     g_rec_mutex_clear (&demux->updates_lock);
145     demux->updates_task = NULL;
146   }
147
148   if (demux->downloader != NULL) {
149     g_object_unref (demux->downloader);
150     demux->downloader = NULL;
151   }
152
153   gst_hls_demux_reset (demux, TRUE);
154
155   g_queue_free (demux->queue);
156
157   G_OBJECT_CLASS (parent_class)->dispose (obj);
158 }
159
160 static void
161 gst_hls_demux_class_init (GstHLSDemuxClass * klass)
162 {
163   GObjectClass *gobject_class;
164   GstElementClass *element_class;
165
166   gobject_class = (GObjectClass *) klass;
167   element_class = (GstElementClass *) klass;
168
169   gobject_class->set_property = gst_hls_demux_set_property;
170   gobject_class->get_property = gst_hls_demux_get_property;
171   gobject_class->dispose = gst_hls_demux_dispose;
172
173   g_object_class_install_property (gobject_class, PROP_FRAGMENTS_CACHE,
174       g_param_spec_uint ("fragments-cache", "Fragments cache",
175           "Number of fragments needed to be cached to start playing",
176           2, G_MAXUINT, DEFAULT_FRAGMENTS_CACHE,
177           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
178
179   g_object_class_install_property (gobject_class, PROP_BITRATE_LIMIT,
180       g_param_spec_float ("bitrate-limit",
181           "Bitrate limit in %",
182           "Limit of the available bitrate to use when switching to alternates.",
183           0, 1, DEFAULT_BITRATE_LIMIT,
184           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
185
186   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
187       g_param_spec_uint ("connection-speed", "Connection Speed",
188           "Network connection speed in kbps (0 = unknown)",
189           0, G_MAXUINT / 1000, DEFAULT_CONNECTION_SPEED,
190           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
191
192   element_class->change_state = GST_DEBUG_FUNCPTR (gst_hls_demux_change_state);
193
194   gst_element_class_add_pad_template (element_class,
195       gst_static_pad_template_get (&srctemplate));
196
197   gst_element_class_add_pad_template (element_class,
198       gst_static_pad_template_get (&sinktemplate));
199
200   gst_element_class_set_details_simple (element_class,
201       "HLS Demuxer",
202       "Demuxer/URIList",
203       "HTTP Live Streaming demuxer",
204       "Marc-Andre Lureau <marcandre.lureau@gmail.com>\n"
205       "Andoni Morales Alastruey <ylatuya@gmail.com>");
206
207   GST_DEBUG_CATEGORY_INIT (gst_hls_demux_debug, "hlsdemux", 0,
208       "hlsdemux element");
209 }
210
211 static void
212 gst_hls_demux_init (GstHLSDemux * demux)
213 {
214   /* sink pad */
215   demux->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
216   gst_pad_set_chain_function (demux->sinkpad,
217       GST_DEBUG_FUNCPTR (gst_hls_demux_chain));
218   gst_pad_set_event_function (demux->sinkpad,
219       GST_DEBUG_FUNCPTR (gst_hls_demux_sink_event));
220   gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
221
222   /* Downloader */
223   demux->downloader = gst_uri_downloader_new ();
224
225   demux->do_typefind = TRUE;
226
227   /* Properties */
228   demux->fragments_cache = DEFAULT_FRAGMENTS_CACHE;
229   demux->bitrate_limit = DEFAULT_BITRATE_LIMIT;
230   demux->connection_speed = DEFAULT_CONNECTION_SPEED;
231
232   demux->queue = g_queue_new ();
233
234   /* Updates task */
235   g_rec_mutex_init (&demux->updates_lock);
236   demux->updates_task =
237       gst_task_new ((GstTaskFunction) gst_hls_demux_updates_loop, demux);
238   gst_task_set_lock (demux->updates_task, &demux->updates_lock);
239   g_mutex_init (&demux->updates_timed_lock);
240
241   /* Streaming task */
242   g_rec_mutex_init (&demux->stream_lock);
243   demux->stream_task =
244       gst_task_new ((GstTaskFunction) gst_hls_demux_stream_loop, demux);
245   gst_task_set_lock (demux->stream_task, &demux->stream_lock);
246 }
247
248 static void
249 gst_hls_demux_set_property (GObject * object, guint prop_id,
250     const GValue * value, GParamSpec * pspec)
251 {
252   GstHLSDemux *demux = GST_HLS_DEMUX (object);
253
254   switch (prop_id) {
255     case PROP_FRAGMENTS_CACHE:
256       demux->fragments_cache = g_value_get_uint (value);
257       break;
258     case PROP_BITRATE_LIMIT:
259       demux->bitrate_limit = g_value_get_float (value);
260       break;
261     case PROP_CONNECTION_SPEED:
262       demux->connection_speed = g_value_get_uint (value) * 1000;
263       break;
264     default:
265       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
266       break;
267   }
268 }
269
270 static void
271 gst_hls_demux_get_property (GObject * object, guint prop_id, GValue * value,
272     GParamSpec * pspec)
273 {
274   GstHLSDemux *demux = GST_HLS_DEMUX (object);
275
276   switch (prop_id) {
277     case PROP_FRAGMENTS_CACHE:
278       g_value_set_uint (value, demux->fragments_cache);
279       break;
280     case PROP_BITRATE_LIMIT:
281       g_value_set_float (value, demux->bitrate_limit);
282       break;
283     case PROP_CONNECTION_SPEED:
284       g_value_set_uint (value, demux->connection_speed / 1000);
285       break;
286     default:
287       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
288       break;
289   }
290 }
291
292 static GstStateChangeReturn
293 gst_hls_demux_change_state (GstElement * element, GstStateChange transition)
294 {
295   GstStateChangeReturn ret;
296   GstHLSDemux *demux = GST_HLS_DEMUX (element);
297
298   switch (transition) {
299     case GST_STATE_CHANGE_READY_TO_PAUSED:
300       gst_hls_demux_reset (demux, FALSE);
301       break;
302     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
303       /* Start the streaming loop in paused only if we already received
304          the main playlist. It might have been stopped if we were in PAUSED
305          state and we filled our queue with enough cached fragments
306        */
307       if (gst_m3u8_client_get_uri (demux->client)[0] != '\0')
308         gst_task_start (demux->updates_task);
309       break;
310     default:
311       break;
312   }
313
314   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
315
316   switch (transition) {
317     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
318       gst_task_stop (demux->updates_task);
319       break;
320     case GST_STATE_CHANGE_PAUSED_TO_READY:
321       demux->cancelled = TRUE;
322       gst_hls_demux_stop (demux);
323       gst_task_join (demux->stream_task);
324       gst_hls_demux_reset (demux, FALSE);
325       break;
326     default:
327       break;
328   }
329   return ret;
330 }
331
332 static gboolean
333 gst_hls_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
334 {
335   GstHLSDemux *demux;
336
337   demux = GST_HLS_DEMUX (parent);
338
339   switch (event->type) {
340     case GST_EVENT_SEEK:
341     {
342       gdouble rate;
343       GstFormat format;
344       GstSeekFlags flags;
345       GstSeekType start_type, stop_type;
346       gint64 start, stop;
347       GList *walk;
348       GstClockTime position, current_pos, target_pos;
349       gint current_sequence;
350       GstM3U8MediaFile *file;
351
352       GST_INFO_OBJECT (demux, "Received GST_EVENT_SEEK");
353
354       if (gst_m3u8_client_is_live (demux->client)) {
355         GST_WARNING_OBJECT (demux, "Received seek event for live stream");
356         return FALSE;
357       }
358
359       gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
360           &stop_type, &stop);
361
362       if (format != GST_FORMAT_TIME)
363         return FALSE;
364
365       GST_DEBUG_OBJECT (demux, "seek event, rate: %f start: %" GST_TIME_FORMAT
366           " stop: %" GST_TIME_FORMAT, rate, GST_TIME_ARGS (start),
367           GST_TIME_ARGS (stop));
368
369       GST_M3U8_CLIENT_LOCK (demux->client);
370       file = GST_M3U8_MEDIA_FILE (demux->client->current->files->data);
371       current_sequence = file->sequence;
372       current_pos = 0;
373       target_pos = (GstClockTime) start;
374       for (walk = demux->client->current->files; walk; walk = walk->next) {
375         file = walk->data;
376
377         current_sequence = file->sequence;
378         if (current_pos <= target_pos
379             && target_pos < current_pos + file->duration) {
380           break;
381         }
382         current_pos += file->duration;
383       }
384       GST_M3U8_CLIENT_UNLOCK (demux->client);
385
386       if (walk == NULL) {
387         GST_WARNING_OBJECT (demux, "Could not find seeked fragment");
388         return FALSE;
389       }
390
391       if (flags & GST_SEEK_FLAG_FLUSH) {
392         GST_DEBUG_OBJECT (demux, "sending flush start");
393         gst_pad_push_event (demux->srcpad, gst_event_new_flush_start ());
394       }
395
396       demux->cancelled = TRUE;
397       gst_task_pause (demux->stream_task);
398       gst_uri_downloader_cancel (demux->downloader);
399       gst_task_stop (demux->updates_task);
400       gst_task_pause (demux->stream_task);
401
402       /* wait for streaming to finish */
403       g_rec_mutex_lock (&demux->stream_lock);
404
405       demux->need_cache = TRUE;
406       while (!g_queue_is_empty (demux->queue)) {
407         GstFragment *fragment = g_queue_pop_head (demux->queue);
408         g_object_unref (fragment);
409       }
410       g_queue_clear (demux->queue);
411
412       GST_M3U8_CLIENT_LOCK (demux->client);
413       GST_DEBUG_OBJECT (demux, "seeking to sequence %d", current_sequence);
414       demux->client->sequence = current_sequence;
415       gst_m3u8_client_get_current_position (demux->client, &position);
416       demux->position_shift = start - position;
417       demux->need_segment = TRUE;
418       GST_M3U8_CLIENT_UNLOCK (demux->client);
419
420
421       if (flags & GST_SEEK_FLAG_FLUSH) {
422         GST_DEBUG_OBJECT (demux, "sending flush stop");
423         gst_pad_push_event (demux->srcpad, gst_event_new_flush_stop (TRUE));
424       }
425
426       demux->cancelled = FALSE;
427       gst_task_start (demux->stream_task);
428       g_rec_mutex_unlock (&demux->stream_lock);
429
430       return TRUE;
431     }
432     default:
433       break;
434   }
435
436   return gst_pad_event_default (pad, parent, event);
437 }
438
439 static gboolean
440 gst_hls_demux_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
441 {
442   GstHLSDemux *demux;
443   GstQuery *query;
444   gboolean ret;
445   gchar *uri;
446
447   demux = GST_HLS_DEMUX (parent);
448
449   switch (event->type) {
450     case GST_EVENT_EOS:{
451       gchar *playlist = NULL;
452
453       if (demux->playlist == NULL) {
454         GST_WARNING_OBJECT (demux, "Received EOS without a playlist.");
455         break;
456       }
457
458       GST_DEBUG_OBJECT (demux,
459           "Got EOS on the sink pad: main playlist fetched");
460
461       query = gst_query_new_uri ();
462       ret = gst_pad_peer_query (demux->sinkpad, query);
463       if (ret) {
464         gst_query_parse_uri (query, &uri);
465         gst_hls_demux_set_location (demux, uri);
466         g_free (uri);
467       }
468       gst_query_unref (query);
469
470       playlist = gst_hls_src_buf_to_utf8_playlist (demux->playlist);
471       demux->playlist = NULL;
472       if (playlist == NULL) {
473         GST_WARNING_OBJECT (demux, "Error validating first playlist.");
474       } else if (!gst_m3u8_client_update (demux->client, playlist)) {
475         /* In most cases, this will happen if we set a wrong url in the
476          * source element and we have received the 404 HTML response instead of
477          * the playlist */
478         GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid playlist."),
479             (NULL));
480         return FALSE;
481       }
482
483       if (!ret && gst_m3u8_client_is_live (demux->client)) {
484         GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND,
485             ("Failed querying the playlist uri, "
486                 "required for live sources."), (NULL));
487         return FALSE;
488       }
489
490       gst_task_start (demux->stream_task);
491       gst_event_unref (event);
492       return TRUE;
493     }
494     case GST_EVENT_SEGMENT:
495       /* Swallow newsegments, we'll push our own */
496       gst_event_unref (event);
497       return TRUE;
498     default:
499       break;
500   }
501
502   return gst_pad_event_default (pad, parent, event);
503 }
504
505 static gboolean
506 gst_hls_demux_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
507 {
508   GstHLSDemux *hlsdemux;
509   gboolean ret = FALSE;
510
511   if (query == NULL)
512     return FALSE;
513
514   hlsdemux = GST_HLS_DEMUX (parent);
515
516   switch (query->type) {
517     case GST_QUERY_DURATION:{
518       GstClockTime duration = -1;
519       GstFormat fmt;
520
521       gst_query_parse_duration (query, &fmt, NULL);
522       if (fmt == GST_FORMAT_TIME) {
523         duration = gst_m3u8_client_get_duration (hlsdemux->client);
524         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
525           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
526           ret = TRUE;
527         }
528       }
529       GST_INFO_OBJECT (hlsdemux, "GST_QUERY_DURATION returns %s with duration %"
530           GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
531       break;
532     }
533     case GST_QUERY_URI:
534       if (hlsdemux->client) {
535         /* FIXME: Do we answer with the variant playlist, with the current
536          * playlist or the the uri of the least downlowaded fragment? */
537         gst_query_set_uri (query, gst_m3u8_client_get_uri (hlsdemux->client));
538         ret = TRUE;
539       }
540       break;
541     case GST_QUERY_SEEKING:{
542       GstFormat fmt;
543       gint64 stop = -1;
544
545       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
546       GST_INFO_OBJECT (hlsdemux, "Received GST_QUERY_SEEKING with format %d",
547           fmt);
548       if (fmt == GST_FORMAT_TIME) {
549         GstClockTime duration;
550
551         duration = gst_m3u8_client_get_duration (hlsdemux->client);
552         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
553           stop = duration;
554
555         gst_query_set_seeking (query, fmt,
556             !gst_m3u8_client_is_live (hlsdemux->client), 0, stop);
557         ret = TRUE;
558         GST_INFO_OBJECT (hlsdemux, "GST_QUERY_SEEKING returning with stop : %"
559             GST_TIME_FORMAT, GST_TIME_ARGS (stop));
560       }
561       break;
562     }
563     default:
564       /* Don't fordward queries upstream because of the special nature of this
565        * "demuxer", which relies on the upstream element only to be fed with the
566        * first playlist */
567       break;
568   }
569
570   return ret;
571 }
572
573 static GstFlowReturn
574 gst_hls_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
575 {
576   GstHLSDemux *demux = GST_HLS_DEMUX (parent);
577
578   if (demux->playlist == NULL)
579     demux->playlist = buf;
580   else
581     demux->playlist = gst_buffer_append (demux->playlist, buf);
582
583   return GST_FLOW_OK;
584 }
585
586 static void
587 gst_hls_demux_stop (GstHLSDemux * demux)
588 {
589   gst_uri_downloader_cancel (demux->downloader);
590
591   if (GST_TASK_STATE (demux->updates_task) != GST_TASK_STOPPED) {
592     demux->stop_stream_task = TRUE;
593     gst_task_stop (demux->updates_task);
594     GST_TASK_SIGNAL (demux->updates_task);
595   }
596
597   if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED)
598     gst_task_stop (demux->stream_task);
599 }
600
601 static void
602 switch_pads (GstHLSDemux * demux, GstCaps * newcaps)
603 {
604   GstPad *oldpad = demux->srcpad;
605
606   GST_DEBUG ("Switching pads (oldpad:%p) with caps: %" GST_PTR_FORMAT, oldpad,
607       newcaps);
608
609   /* First create and activate new pad */
610   demux->srcpad = gst_pad_new_from_static_template (&srctemplate, NULL);
611   gst_pad_set_event_function (demux->srcpad,
612       GST_DEBUG_FUNCPTR (gst_hls_demux_src_event));
613   gst_pad_set_query_function (demux->srcpad,
614       GST_DEBUG_FUNCPTR (gst_hls_demux_src_query));
615   gst_pad_set_element_private (demux->srcpad, demux);
616   gst_pad_set_active (demux->srcpad, TRUE);
617   gst_pad_push_event (demux->srcpad, gst_event_new_stream_start ());
618   gst_pad_set_caps (demux->srcpad, newcaps);
619   gst_pad_push_event (demux->srcpad, gst_event_new_caps (newcaps));
620   gst_element_add_pad (GST_ELEMENT (demux), demux->srcpad);
621
622   gst_element_no_more_pads (GST_ELEMENT (demux));
623
624   if (oldpad) {
625     /* Push out EOS */
626     gst_pad_push_event (oldpad, gst_event_new_eos ());
627     gst_pad_set_active (oldpad, FALSE);
628     gst_element_remove_pad (GST_ELEMENT (demux), oldpad);
629   }
630 }
631
632 static void
633 gst_hls_demux_stream_loop (GstHLSDemux * demux)
634 {
635   GstFragment *fragment;
636   GstBuffer *buf;
637   GstFlowReturn ret;
638   GstCaps *bufcaps, *srccaps = NULL;
639
640   /* Loop for the source pad task. The task is started when we have
641    * received the main playlist from the source element. It tries first to
642    * cache the first fragments and then it waits until it has more data in the
643    * queue. This task is woken up when we push a new fragment to the queue or
644    * when we reached the end of the playlist  */
645
646   if (G_UNLIKELY (demux->need_cache)) {
647     if (!gst_hls_demux_cache_fragments (demux))
648       goto cache_error;
649
650     /* we can start now the updates thread (only if on playing) */
651     if (GST_STATE (demux) == GST_STATE_PLAYING)
652       gst_task_start (demux->updates_task);
653     GST_INFO_OBJECT (demux, "First fragments cached successfully");
654   }
655
656   if (g_queue_is_empty (demux->queue)) {
657     if (demux->end_of_playlist)
658       goto end_of_playlist;
659
660     goto pause_task;
661   }
662
663   fragment = g_queue_pop_head (demux->queue);
664   buf = gst_fragment_get_buffer (fragment);
665
666   /* Figure out if we need to create/switch pads */
667   if (G_LIKELY (demux->srcpad))
668     srccaps = gst_pad_get_current_caps (demux->srcpad);
669   bufcaps = gst_fragment_get_caps (fragment);
670   if (G_UNLIKELY (!srccaps || !gst_caps_is_equal_fixed (bufcaps, srccaps)
671           || demux->need_segment)) {
672     switch_pads (demux, bufcaps);
673     demux->need_segment = TRUE;
674   }
675   gst_caps_unref (bufcaps);
676   if (G_LIKELY (srccaps))
677     gst_caps_unref (srccaps);
678   g_object_unref (fragment);
679
680   if (demux->need_segment) {
681     GstSegment segment;
682     GstClockTime start = GST_BUFFER_PTS (buf);
683
684     start += demux->position_shift;
685     /* And send a newsegment */
686     GST_DEBUG_OBJECT (demux, "Sending new-segment. segment start:%"
687         GST_TIME_FORMAT, GST_TIME_ARGS (start));
688     gst_segment_init (&segment, GST_FORMAT_TIME);
689     segment.start = start;
690     segment.time = start;
691     gst_pad_push_event (demux->srcpad, gst_event_new_segment (&segment));
692     demux->need_segment = FALSE;
693     demux->position_shift = 0;
694   }
695
696   ret = gst_pad_push (demux->srcpad, buf);
697   if (ret != GST_FLOW_OK)
698     goto error_pushing;
699
700   return;
701
702 end_of_playlist:
703   {
704     GST_DEBUG_OBJECT (demux, "Reached end of playlist, sending EOS");
705     gst_pad_push_event (demux->srcpad, gst_event_new_eos ());
706     gst_hls_demux_stop (demux);
707     return;
708   }
709
710 cache_error:
711   {
712     gst_task_pause (demux->stream_task);
713     if (!demux->cancelled) {
714       GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND,
715           ("Could not cache the first fragments"), (NULL));
716       gst_hls_demux_stop (demux);
717     }
718     return;
719   }
720
721 error_pushing:
722   {
723     /* FIXME: handle error */
724     GST_DEBUG_OBJECT (demux, "Error pushing buffer: %s... stopping task",
725         gst_flow_get_name (ret));
726     gst_hls_demux_stop (demux);
727     return;
728   }
729
730 pause_task:
731   {
732     gst_task_pause (demux->stream_task);
733     return;
734   }
735 }
736
737 static void
738 gst_hls_demux_reset (GstHLSDemux * demux, gboolean dispose)
739 {
740   demux->need_cache = TRUE;
741   demux->end_of_playlist = FALSE;
742   demux->cancelled = FALSE;
743   demux->do_typefind = TRUE;
744
745   if (demux->input_caps) {
746     gst_caps_unref (demux->input_caps);
747     demux->input_caps = NULL;
748   }
749
750   if (demux->playlist) {
751     gst_buffer_unref (demux->playlist);
752     demux->playlist = NULL;
753   }
754
755   if (demux->client) {
756     gst_m3u8_client_free (demux->client);
757     demux->client = NULL;
758   }
759
760   if (!dispose) {
761     demux->client = gst_m3u8_client_new ("");
762   }
763
764   while (!g_queue_is_empty (demux->queue)) {
765     GstFragment *fragment = g_queue_pop_head (demux->queue);
766     g_object_unref (fragment);
767   }
768   g_queue_clear (demux->queue);
769
770   demux->position_shift = 0;
771   demux->need_segment = TRUE;
772 }
773
774 static gboolean
775 gst_hls_demux_set_location (GstHLSDemux * demux, const gchar * uri)
776 {
777   if (demux->client)
778     gst_m3u8_client_free (demux->client);
779   demux->client = gst_m3u8_client_new (uri);
780   GST_INFO_OBJECT (demux, "Changed location: %s", uri);
781   return TRUE;
782 }
783
784 void
785 gst_hls_demux_updates_loop (GstHLSDemux * demux)
786 {
787   /* Loop for the updates. It's started when the first fragments are cached and
788    * schedules the next update of the playlist (for lives sources) and the next
789    * update of fragments. When a new fragment is downloaded, it compares the
790    * download time with the next scheduled update to check if we can or should
791    * switch to a different bitrate */
792
793   /* block until the next scheduled update or the signal to quit this thread */
794   g_mutex_lock (&demux->updates_timed_lock);
795   GST_DEBUG_OBJECT (demux, "Started updates task");
796   while (TRUE) {
797     /* schedule the next update */
798     gst_hls_demux_schedule (demux);
799
800     /*  block until the next scheduled update or the signal to quit this thread */
801     if (g_cond_timed_wait (GST_TASK_GET_COND (demux->updates_task),
802             &demux->updates_timed_lock, &demux->next_update)) {
803       goto quit;
804     }
805     /* update the playlist for live sources */
806     if (gst_m3u8_client_is_live (demux->client)) {
807       if (!gst_hls_demux_update_playlist (demux, TRUE)) {
808         demux->client->update_failed_count++;
809         if (demux->client->update_failed_count < DEFAULT_FAILED_COUNT) {
810           GST_WARNING_OBJECT (demux, "Could not update the playlist");
811           continue;
812         } else {
813           GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND,
814               ("Could not update the playlist"), (NULL));
815           goto quit;
816         }
817       }
818     }
819
820     /* if it's a live source and the playlist couldn't be updated, there aren't
821      * more fragments in the playlist, so we just wait for the next schedulled
822      * update */
823     if (gst_m3u8_client_is_live (demux->client) &&
824         demux->client->update_failed_count > 0) {
825       GST_WARNING_OBJECT (demux,
826           "The playlist hasn't been updated, failed count is %d",
827           demux->client->update_failed_count);
828       continue;
829     }
830
831     /* fetch the next fragment */
832     if (g_queue_is_empty (demux->queue)) {
833       if (!gst_hls_demux_get_next_fragment (demux, FALSE)) {
834         if (!demux->end_of_playlist && !demux->cancelled) {
835           demux->client->update_failed_count++;
836           if (demux->client->update_failed_count < DEFAULT_FAILED_COUNT) {
837             GST_WARNING_OBJECT (demux, "Could not fetch the next fragment");
838             continue;
839           } else {
840             GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND,
841                 ("Could not fetch the next fragment"), (NULL));
842             goto quit;
843           }
844         }
845       } else {
846         demux->client->update_failed_count = 0;
847
848         /* try to switch to another bitrate if needed */
849         gst_hls_demux_switch_playlist (demux);
850       }
851     }
852   }
853
854 quit:
855   {
856     GST_DEBUG_OBJECT (demux, "Stopped updates task");
857     gst_hls_demux_stop (demux);
858     g_mutex_unlock (&demux->updates_timed_lock);
859   }
860 }
861
862 static gboolean
863 gst_hls_demux_cache_fragments (GstHLSDemux * demux)
864 {
865   gint i;
866
867   /* If this playlist is a variant playlist, select the first one
868    * and update it */
869   if (gst_m3u8_client_has_variant_playlist (demux->client)) {
870     GstM3U8 *child = NULL;
871
872     if (demux->connection_speed == 0) {
873
874       GST_M3U8_CLIENT_LOCK (demux->client);
875       child = demux->client->main->current_variant->data;
876       GST_M3U8_CLIENT_UNLOCK (demux->client);
877     } else {
878       GList *tmp = gst_m3u8_client_get_playlist_for_bitrate (demux->client,
879           demux->connection_speed);
880
881       child = GST_M3U8 (tmp->data);
882     }
883
884     gst_m3u8_client_set_current (demux->client, child);
885     if (!gst_hls_demux_update_playlist (demux, FALSE)) {
886       GST_ERROR_OBJECT (demux, "Could not fetch the child playlist %s",
887           child->uri);
888       return FALSE;
889     }
890   }
891
892   if (!gst_m3u8_client_is_live (demux->client)) {
893     GstClockTime duration = gst_m3u8_client_get_duration (demux->client);
894
895     GST_DEBUG_OBJECT (demux, "Sending duration message : %" GST_TIME_FORMAT,
896         GST_TIME_ARGS (duration));
897     if (duration != GST_CLOCK_TIME_NONE)
898       gst_element_post_message (GST_ELEMENT (demux),
899           gst_message_new_duration (GST_OBJECT (demux),
900               GST_FORMAT_TIME, duration));
901   }
902
903   /* Cache the first fragments */
904   for (i = 0; i < demux->fragments_cache; i++) {
905     gst_element_post_message (GST_ELEMENT (demux),
906         gst_message_new_buffering (GST_OBJECT (demux),
907             100 * i / demux->fragments_cache));
908     g_get_current_time (&demux->next_update);
909     if (!gst_hls_demux_get_next_fragment (demux, TRUE)) {
910       if (demux->end_of_playlist)
911         break;
912       if (!demux->cancelled)
913         GST_ERROR_OBJECT (demux, "Error caching the first fragments");
914       return FALSE;
915     }
916     /* make sure we stop caching fragments if something cancelled it */
917     if (demux->cancelled)
918       return FALSE;
919     gst_hls_demux_switch_playlist (demux);
920   }
921   gst_element_post_message (GST_ELEMENT (demux),
922       gst_message_new_buffering (GST_OBJECT (demux), 100));
923
924   g_get_current_time (&demux->next_update);
925
926   demux->need_cache = FALSE;
927   return TRUE;
928
929 }
930
931 static gchar *
932 gst_hls_src_buf_to_utf8_playlist (GstBuffer * buf)
933 {
934   GstMapInfo info;
935   gchar *playlist;
936
937   if (!gst_buffer_map (buf, &info, GST_MAP_READ))
938     return NULL;
939
940   if (!g_utf8_validate ((gchar *) info.data, info.size, NULL))
941     goto validate_error;
942
943   /* alloc size + 1 to end with a null character */
944   playlist = g_malloc0 (info.size + 1);
945   memcpy (playlist, info.data, info.size + 1);
946
947   gst_buffer_unmap (buf, &info);
948   gst_buffer_unref (buf);
949   return playlist;
950
951 validate_error:
952   gst_buffer_unmap (buf, &info);
953   gst_buffer_unref (buf);
954   return NULL;
955 }
956
957 static gboolean
958 gst_hls_demux_update_playlist (GstHLSDemux * demux, gboolean update)
959 {
960   GstFragment *download;
961   GstBuffer *buf;
962   gchar *playlist;
963   gboolean updated = FALSE;
964
965   const gchar *uri = gst_m3u8_client_get_current_uri (demux->client);
966
967   download = gst_uri_downloader_fetch_uri (demux->downloader, uri);
968
969   if (download == NULL)
970     return FALSE;
971
972   buf = gst_fragment_get_buffer (download);
973   playlist = gst_hls_src_buf_to_utf8_playlist (buf);
974   g_object_unref (download);
975
976   if (playlist == NULL) {
977     GST_WARNING_OBJECT (demux, "Couldn't not validate playlist encoding");
978     return FALSE;
979   }
980
981   updated = gst_m3u8_client_update (demux->client, playlist);
982
983   /*  If it's a live source, do not let the sequence number go beyond
984    * three fragments before the end of the list */
985   if (updated && update == FALSE && demux->client->current &&
986       gst_m3u8_client_is_live (demux->client)) {
987     guint last_sequence;
988
989     GST_M3U8_CLIENT_LOCK (demux->client);
990     last_sequence =
991         GST_M3U8_MEDIA_FILE (g_list_last (demux->client->current->
992             files)->data)->sequence;
993
994     if (demux->client->sequence >= last_sequence - 3) {
995       GST_DEBUG_OBJECT (demux, "Sequence is beyond playlist. Moving back to %d",
996           last_sequence - 3);
997       demux->need_segment = TRUE;
998       demux->client->sequence = last_sequence - 3;
999     }
1000     GST_M3U8_CLIENT_UNLOCK (demux->client);
1001   }
1002
1003   return updated;
1004 }
1005
1006 static gboolean
1007 gst_hls_demux_change_playlist (GstHLSDemux * demux, guint max_bitrate)
1008 {
1009   GList *previous_variant, *current_variant;
1010   gint old_bandwidth, new_bandwidth;
1011
1012   /* If user specifies a connection speed never use a playlist with a bandwidth
1013    * superior than it */
1014   if (demux->connection_speed != 0 && max_bitrate > demux->connection_speed)
1015     max_bitrate = demux->connection_speed;
1016
1017   previous_variant = demux->client->main->current_variant;
1018   current_variant = gst_m3u8_client_get_playlist_for_bitrate (demux->client,
1019       max_bitrate);
1020
1021 retry_failover_protection:
1022   old_bandwidth = GST_M3U8 (previous_variant->data)->bandwidth;
1023   new_bandwidth = GST_M3U8 (current_variant->data)->bandwidth;
1024
1025   /* Don't do anything else if the playlist is the same */
1026   if (new_bandwidth == old_bandwidth) {
1027     return TRUE;
1028   }
1029
1030   demux->client->main->current_variant = current_variant;
1031   GST_M3U8_CLIENT_UNLOCK (demux->client);
1032
1033   gst_m3u8_client_set_current (demux->client, current_variant->data);
1034
1035   GST_INFO_OBJECT (demux, "Client was on %dbps, max allowed is %dbps, switching"
1036       " to bitrate %dbps", old_bandwidth, max_bitrate, new_bandwidth);
1037
1038   if (gst_hls_demux_update_playlist (demux, FALSE)) {
1039     GstStructure *s;
1040
1041     s = gst_structure_new ("playlist",
1042         "uri", G_TYPE_STRING, gst_m3u8_client_get_current_uri (demux->client),
1043         "bitrate", G_TYPE_INT, new_bandwidth, NULL);
1044     gst_element_post_message (GST_ELEMENT_CAST (demux),
1045         gst_message_new_element (GST_OBJECT_CAST (demux), s));
1046   } else {
1047     GList *failover = NULL;
1048
1049     GST_INFO_OBJECT (demux, "Unable to update playlist. Switching back");
1050     GST_M3U8_CLIENT_LOCK (demux->client);
1051
1052     failover = g_list_previous (current_variant);
1053     if (failover && new_bandwidth == GST_M3U8 (failover->data)->bandwidth) {
1054       current_variant = failover;
1055       goto retry_failover_protection;
1056     }
1057
1058     demux->client->main->current_variant = previous_variant;
1059     GST_M3U8_CLIENT_UNLOCK (demux->client);
1060     gst_m3u8_client_set_current (demux->client, previous_variant->data);
1061     /*  Try a lower bitrate (or stop if we just tried the lowest) */
1062     if (new_bandwidth ==
1063         GST_M3U8 (g_list_first (demux->client->main->lists)->data)->bandwidth)
1064       return FALSE;
1065     else
1066       return gst_hls_demux_change_playlist (demux, new_bandwidth - 1);
1067   }
1068
1069   /* Force typefinding since we might have changed media type */
1070   demux->do_typefind = TRUE;
1071
1072   return TRUE;
1073 }
1074
1075 static gboolean
1076 gst_hls_demux_schedule (GstHLSDemux * demux)
1077 {
1078   gfloat update_factor;
1079   gint count;
1080
1081   /* As defined in §6.3.4. Reloading the Playlist file:
1082    * "If the client reloads a Playlist file and finds that it has not
1083    * changed then it MUST wait for a period of time before retrying.  The
1084    * minimum delay is a multiple of the target duration.  This multiple is
1085    * 0.5 for the first attempt, 1.5 for the second, and 3.0 thereafter."
1086    */
1087   count = demux->client->update_failed_count;
1088   if (count < 3)
1089     update_factor = update_interval_factor[count];
1090   else
1091     update_factor = update_interval_factor[3];
1092
1093   /* schedule the next update using the target duration field of the
1094    * playlist */
1095   g_time_val_add (&demux->next_update,
1096       gst_m3u8_client_get_target_duration (demux->client)
1097       / GST_SECOND * G_USEC_PER_SEC * update_factor);
1098   GST_DEBUG_OBJECT (demux, "Next update scheduled at %s",
1099       g_time_val_to_iso8601 (&demux->next_update));
1100
1101   return TRUE;
1102 }
1103
1104 static gboolean
1105 gst_hls_demux_switch_playlist (GstHLSDemux * demux)
1106 {
1107   GTimeVal now;
1108   GstClockTime diff;
1109   gsize size;
1110   gint bitrate;
1111   GstFragment *fragment = g_queue_peek_tail (demux->queue);
1112   GstBuffer *buffer;
1113
1114   GST_M3U8_CLIENT_LOCK (demux->client);
1115   if (!demux->client->main->lists) {
1116     GST_M3U8_CLIENT_UNLOCK (demux->client);
1117     return TRUE;
1118   }
1119   GST_M3U8_CLIENT_UNLOCK (demux->client);
1120
1121   /* compare the time when the fragment was downloaded with the time when it was
1122    * scheduled */
1123   g_get_current_time (&now);
1124   diff = (GST_TIMEVAL_TO_TIME (now) - GST_TIMEVAL_TO_TIME (demux->next_update));
1125   buffer = gst_fragment_get_buffer (fragment);
1126   size = gst_buffer_get_size (buffer);
1127   bitrate = (size * 8) / ((double) diff / GST_SECOND);
1128
1129   GST_DEBUG ("Downloaded %d bytes in %" GST_TIME_FORMAT ". Bitrate is : %d",
1130       size, GST_TIME_ARGS (diff), bitrate);
1131
1132   gst_buffer_unref (buffer);
1133   return gst_hls_demux_change_playlist (demux, bitrate * demux->bitrate_limit);
1134 }
1135
1136 static gboolean
1137 gst_hls_demux_get_next_fragment (GstHLSDemux * demux, gboolean caching)
1138 {
1139   GstFragment *download;
1140   const gchar *next_fragment_uri;
1141   GstClockTime duration;
1142   GstClockTime timestamp;
1143   GstBuffer *buf;
1144   gboolean discont;
1145
1146   if (!gst_m3u8_client_get_next_fragment (demux->client, &discont,
1147           &next_fragment_uri, &duration, &timestamp)) {
1148     GST_INFO_OBJECT (demux, "This playlist doesn't contain more fragments");
1149     demux->end_of_playlist = TRUE;
1150     gst_task_start (demux->stream_task);
1151     return FALSE;
1152   }
1153
1154   GST_INFO_OBJECT (demux, "Fetching next fragment %s", next_fragment_uri);
1155
1156   download = gst_uri_downloader_fetch_uri (demux->downloader,
1157       next_fragment_uri);
1158
1159   if (download == NULL)
1160     goto error;
1161
1162   buf = gst_fragment_get_buffer (download);
1163   GST_BUFFER_DURATION (buf) = duration;
1164   GST_BUFFER_PTS (buf) = timestamp;
1165
1166   /* We actually need to do this every time we switch bitrate */
1167   if (G_UNLIKELY (demux->do_typefind)) {
1168     GstCaps *caps = gst_fragment_get_caps (download);
1169
1170     if (!demux->input_caps || !gst_caps_is_equal (caps, demux->input_caps)) {
1171       gst_caps_replace (&demux->input_caps, caps);
1172       /* gst_pad_set_caps (demux->srcpad, demux->input_caps); */
1173       GST_INFO_OBJECT (demux, "Input source caps: %" GST_PTR_FORMAT,
1174           demux->input_caps);
1175       demux->do_typefind = FALSE;
1176     }
1177     gst_caps_unref (caps);
1178   } else {
1179     gst_fragment_set_caps (download, demux->input_caps);
1180   }
1181
1182   if (discont) {
1183     GST_DEBUG_OBJECT (demux, "Marking fragment as discontinuous");
1184     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
1185   }
1186
1187   g_queue_push_tail (demux->queue, download);
1188   if (!caching) {
1189     GST_TASK_SIGNAL (demux->updates_task);
1190     gst_task_start (demux->stream_task);
1191   }
1192   return TRUE;
1193
1194 error:
1195   {
1196     gst_hls_demux_stop (demux);
1197     return FALSE;
1198   }
1199 }