elements: Use gst_pad_set_caps() instead of manual event fiddling
[platform/upstream/gstreamer.git] / gst / hls / gsthlsdemux.c
1 /* GStreamer
2  * Copyright (C) 2010 Marc-Andre Lureau <marcandre.lureau@gmail.com>
3  * Copyright (C) 2010 Andoni Morales Alastruey <ylatuya@gmail.com>
4  * Copyright (C) 2011, Hewlett-Packard Development Company, L.P.
5  *  Author: Youness Alaoui <youness.alaoui@collabora.co.uk>, Collabora Ltd.
6  *  Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>, Collabora Ltd.
7  *
8  * Gsthlsdemux.c:
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 02111-1307, USA.
24  */
25 /**
26  * SECTION:element-hlsdemux
27  *
28  * HTTP Live Streaming demuxer element.
29  *
30  * <refsect2>
31  * <title>Example launch line</title>
32  * |[
33  * gst-launch souphttpsrc location=http://devimages.apple.com/iphone/samples/bipbop/gear4/prog_index.m3u8 ! hlsdemux ! decodebin2 ! ffmpegcolorspace ! videoscale ! autovideosink
34  * ]|
35  * </refsect2>
36  *
37  * Last reviewed on 2010-10-07
38  */
39
40 #ifdef HAVE_CONFIG_H
41 #  include "config.h"
42 #endif
43
44 /* FIXME 0.11: suppress warnings for deprecated API such as GStaticRecMutex
45  * with newer GLib versions (>= 2.31.0) */
46 #define GLIB_DISABLE_DEPRECATION_WARNINGS
47
48 #include <string.h>
49 #include <gst/glib-compat-private.h>
50 #include "gsthlsdemux.h"
51
52 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
53     GST_PAD_SRC,
54     GST_PAD_SOMETIMES,
55     GST_STATIC_CAPS_ANY);
56
57 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
58     GST_PAD_SINK,
59     GST_PAD_ALWAYS,
60     GST_STATIC_CAPS ("application/x-hls"));
61
62 GST_DEBUG_CATEGORY_STATIC (gst_hls_demux_debug);
63 #define GST_CAT_DEFAULT gst_hls_demux_debug
64
65 enum
66 {
67   PROP_0,
68
69   PROP_FRAGMENTS_CACHE,
70   PROP_BITRATE_LIMIT,
71   PROP_CONNECTION_SPEED,
72   PROP_LAST
73 };
74
75 static const float update_interval_factor[] = { 1, 0.5, 1.5, 3 };
76
77 #define DEFAULT_FRAGMENTS_CACHE 3
78 #define DEFAULT_FAILED_COUNT 3
79 #define DEFAULT_BITRATE_LIMIT 0.8
80 #define DEFAULT_CONNECTION_SPEED    0
81
82 /* GObject */
83 static void gst_hls_demux_set_property (GObject * object, guint prop_id,
84     const GValue * value, GParamSpec * pspec);
85 static void gst_hls_demux_get_property (GObject * object, guint prop_id,
86     GValue * value, GParamSpec * pspec);
87 static void gst_hls_demux_dispose (GObject * obj);
88
89 /* GstElement */
90 static GstStateChangeReturn
91 gst_hls_demux_change_state (GstElement * element, GstStateChange transition);
92
93 /* GstHLSDemux */
94 static GstFlowReturn gst_hls_demux_chain (GstPad * pad, GstObject * parent,
95     GstBuffer * buf);
96 static gboolean gst_hls_demux_sink_event (GstPad * pad, GstObject * parent,
97     GstEvent * event);
98 static gboolean gst_hls_demux_src_event (GstPad * pad, GstObject * parent,
99     GstEvent * event);
100 static gboolean gst_hls_demux_src_query (GstPad * pad, GstObject * parent,
101     GstQuery * query);
102 static void gst_hls_demux_stream_loop (GstHLSDemux * demux);
103 static void gst_hls_demux_updates_loop (GstHLSDemux * demux);
104 static void gst_hls_demux_stop (GstHLSDemux * demux);
105 static gboolean gst_hls_demux_cache_fragments (GstHLSDemux * demux);
106 static gboolean gst_hls_demux_schedule (GstHLSDemux * demux);
107 static gboolean gst_hls_demux_switch_playlist (GstHLSDemux * demux);
108 static gboolean gst_hls_demux_get_next_fragment (GstHLSDemux * demux,
109     gboolean caching);
110 static gboolean gst_hls_demux_update_playlist (GstHLSDemux * demux,
111     gboolean update);
112 static void gst_hls_demux_reset (GstHLSDemux * demux, gboolean dispose);
113 static gboolean gst_hls_demux_set_location (GstHLSDemux * demux,
114     const gchar * uri);
115 static gchar *gst_hls_src_buf_to_utf8_playlist (GstBuffer * buf);
116
117 #define gst_hls_demux_parent_class parent_class
118 G_DEFINE_TYPE (GstHLSDemux, gst_hls_demux, GST_TYPE_ELEMENT);
119
120 static void
121 gst_hls_demux_dispose (GObject * obj)
122 {
123   GstHLSDemux *demux = GST_HLS_DEMUX (obj);
124
125   if (demux->stream_task) {
126     if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) {
127       GST_DEBUG_OBJECT (demux, "Leaving streaming task");
128       gst_task_stop (demux->stream_task);
129       gst_task_join (demux->stream_task);
130     }
131     gst_object_unref (demux->stream_task);
132     g_rec_mutex_clear (&demux->stream_lock);
133     demux->stream_task = NULL;
134   }
135
136   if (demux->updates_task) {
137     if (GST_TASK_STATE (demux->updates_task) != GST_TASK_STOPPED) {
138       GST_DEBUG_OBJECT (demux, "Leaving updates task");
139       gst_task_stop (demux->updates_task);
140       gst_task_join (demux->updates_task);
141     }
142     gst_object_unref (demux->updates_task);
143     g_mutex_clear (&demux->updates_timed_lock);
144     g_rec_mutex_clear (&demux->updates_lock);
145     demux->updates_task = NULL;
146   }
147
148   if (demux->downloader != NULL) {
149     g_object_unref (demux->downloader);
150     demux->downloader = NULL;
151   }
152
153   gst_hls_demux_reset (demux, TRUE);
154
155   g_queue_free (demux->queue);
156
157   G_OBJECT_CLASS (parent_class)->dispose (obj);
158 }
159
160 static void
161 gst_hls_demux_class_init (GstHLSDemuxClass * klass)
162 {
163   GObjectClass *gobject_class;
164   GstElementClass *element_class;
165
166   gobject_class = (GObjectClass *) klass;
167   element_class = (GstElementClass *) klass;
168
169   gobject_class->set_property = gst_hls_demux_set_property;
170   gobject_class->get_property = gst_hls_demux_get_property;
171   gobject_class->dispose = gst_hls_demux_dispose;
172
173   g_object_class_install_property (gobject_class, PROP_FRAGMENTS_CACHE,
174       g_param_spec_uint ("fragments-cache", "Fragments cache",
175           "Number of fragments needed to be cached to start playing",
176           2, G_MAXUINT, DEFAULT_FRAGMENTS_CACHE,
177           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
178
179   g_object_class_install_property (gobject_class, PROP_BITRATE_LIMIT,
180       g_param_spec_float ("bitrate-limit",
181           "Bitrate limit in %",
182           "Limit of the available bitrate to use when switching to alternates.",
183           0, 1, DEFAULT_BITRATE_LIMIT,
184           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
185
186   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
187       g_param_spec_uint ("connection-speed", "Connection Speed",
188           "Network connection speed in kbps (0 = unknown)",
189           0, G_MAXUINT / 1000, DEFAULT_CONNECTION_SPEED,
190           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
191
192   element_class->change_state = GST_DEBUG_FUNCPTR (gst_hls_demux_change_state);
193
194   gst_element_class_add_pad_template (element_class,
195       gst_static_pad_template_get (&srctemplate));
196
197   gst_element_class_add_pad_template (element_class,
198       gst_static_pad_template_get (&sinktemplate));
199
200   gst_element_class_set_details_simple (element_class,
201       "HLS Demuxer",
202       "Demuxer/URIList",
203       "HTTP Live Streaming demuxer",
204       "Marc-Andre Lureau <marcandre.lureau@gmail.com>\n"
205       "Andoni Morales Alastruey <ylatuya@gmail.com>");
206
207   GST_DEBUG_CATEGORY_INIT (gst_hls_demux_debug, "hlsdemux", 0,
208       "hlsdemux element");
209 }
210
211 static void
212 gst_hls_demux_init (GstHLSDemux * demux)
213 {
214   /* sink pad */
215   demux->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
216   gst_pad_set_chain_function (demux->sinkpad,
217       GST_DEBUG_FUNCPTR (gst_hls_demux_chain));
218   gst_pad_set_event_function (demux->sinkpad,
219       GST_DEBUG_FUNCPTR (gst_hls_demux_sink_event));
220   gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
221
222   /* Downloader */
223   demux->downloader = gst_uri_downloader_new ();
224
225   demux->do_typefind = TRUE;
226
227   /* Properties */
228   demux->fragments_cache = DEFAULT_FRAGMENTS_CACHE;
229   demux->bitrate_limit = DEFAULT_BITRATE_LIMIT;
230   demux->connection_speed = DEFAULT_CONNECTION_SPEED;
231
232   demux->queue = g_queue_new ();
233
234   /* Updates task */
235   g_rec_mutex_init (&demux->updates_lock);
236   demux->updates_task =
237       gst_task_new ((GstTaskFunction) gst_hls_demux_updates_loop, demux);
238   gst_task_set_lock (demux->updates_task, &demux->updates_lock);
239   g_mutex_init (&demux->updates_timed_lock);
240
241   /* Streaming task */
242   g_rec_mutex_init (&demux->stream_lock);
243   demux->stream_task =
244       gst_task_new ((GstTaskFunction) gst_hls_demux_stream_loop, demux);
245   gst_task_set_lock (demux->stream_task, &demux->stream_lock);
246 }
247
248 static void
249 gst_hls_demux_set_property (GObject * object, guint prop_id,
250     const GValue * value, GParamSpec * pspec)
251 {
252   GstHLSDemux *demux = GST_HLS_DEMUX (object);
253
254   switch (prop_id) {
255     case PROP_FRAGMENTS_CACHE:
256       demux->fragments_cache = g_value_get_uint (value);
257       break;
258     case PROP_BITRATE_LIMIT:
259       demux->bitrate_limit = g_value_get_float (value);
260       break;
261     case PROP_CONNECTION_SPEED:
262       demux->connection_speed = g_value_get_uint (value) * 1000;
263       break;
264     default:
265       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
266       break;
267   }
268 }
269
270 static void
271 gst_hls_demux_get_property (GObject * object, guint prop_id, GValue * value,
272     GParamSpec * pspec)
273 {
274   GstHLSDemux *demux = GST_HLS_DEMUX (object);
275
276   switch (prop_id) {
277     case PROP_FRAGMENTS_CACHE:
278       g_value_set_uint (value, demux->fragments_cache);
279       break;
280     case PROP_BITRATE_LIMIT:
281       g_value_set_float (value, demux->bitrate_limit);
282       break;
283     case PROP_CONNECTION_SPEED:
284       g_value_set_uint (value, demux->connection_speed / 1000);
285       break;
286     default:
287       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
288       break;
289   }
290 }
291
292 static GstStateChangeReturn
293 gst_hls_demux_change_state (GstElement * element, GstStateChange transition)
294 {
295   GstStateChangeReturn ret;
296   GstHLSDemux *demux = GST_HLS_DEMUX (element);
297
298   switch (transition) {
299     case GST_STATE_CHANGE_READY_TO_PAUSED:
300       gst_hls_demux_reset (demux, FALSE);
301       break;
302     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
303       /* Start the streaming loop in paused only if we already received
304          the main playlist. It might have been stopped if we were in PAUSED
305          state and we filled our queue with enough cached fragments
306        */
307       if (gst_m3u8_client_get_uri (demux->client)[0] != '\0')
308         gst_task_start (demux->updates_task);
309       break;
310     default:
311       break;
312   }
313
314   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
315
316   switch (transition) {
317     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
318       gst_task_stop (demux->updates_task);
319       break;
320     case GST_STATE_CHANGE_PAUSED_TO_READY:
321       demux->cancelled = TRUE;
322       gst_hls_demux_stop (demux);
323       gst_task_join (demux->stream_task);
324       gst_hls_demux_reset (demux, FALSE);
325       break;
326     default:
327       break;
328   }
329   return ret;
330 }
331
332 static gboolean
333 gst_hls_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
334 {
335   GstHLSDemux *demux;
336
337   demux = GST_HLS_DEMUX (parent);
338
339   switch (event->type) {
340     case GST_EVENT_SEEK:
341     {
342       gdouble rate;
343       GstFormat format;
344       GstSeekFlags flags;
345       GstSeekType start_type, stop_type;
346       gint64 start, stop;
347       GList *walk;
348       GstClockTime position, current_pos, target_pos;
349       gint current_sequence;
350       GstM3U8MediaFile *file;
351
352       GST_INFO_OBJECT (demux, "Received GST_EVENT_SEEK");
353
354       if (gst_m3u8_client_is_live (demux->client)) {
355         GST_WARNING_OBJECT (demux, "Received seek event for live stream");
356         return FALSE;
357       }
358
359       gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
360           &stop_type, &stop);
361
362       if (format != GST_FORMAT_TIME)
363         return FALSE;
364
365       GST_DEBUG_OBJECT (demux, "seek event, rate: %f start: %" GST_TIME_FORMAT
366           " stop: %" GST_TIME_FORMAT, rate, GST_TIME_ARGS (start),
367           GST_TIME_ARGS (stop));
368
369       GST_M3U8_CLIENT_LOCK (demux->client);
370       file = GST_M3U8_MEDIA_FILE (demux->client->current->files->data);
371       current_sequence = file->sequence;
372       current_pos = 0;
373       target_pos = (GstClockTime) start;
374       for (walk = demux->client->current->files; walk; walk = walk->next) {
375         file = walk->data;
376
377         current_sequence = file->sequence;
378         if (current_pos <= target_pos
379             && target_pos < current_pos + file->duration) {
380           break;
381         }
382         current_pos += file->duration;
383       }
384       GST_M3U8_CLIENT_UNLOCK (demux->client);
385
386       if (walk == NULL) {
387         GST_WARNING_OBJECT (demux, "Could not find seeked fragment");
388         return FALSE;
389       }
390
391       if (flags & GST_SEEK_FLAG_FLUSH) {
392         GST_DEBUG_OBJECT (demux, "sending flush start");
393         gst_pad_push_event (demux->srcpad, gst_event_new_flush_start ());
394       }
395
396       demux->cancelled = TRUE;
397       gst_task_pause (demux->stream_task);
398       gst_uri_downloader_cancel (demux->downloader);
399       gst_task_stop (demux->updates_task);
400       gst_task_pause (demux->stream_task);
401
402       /* wait for streaming to finish */
403       g_rec_mutex_lock (&demux->stream_lock);
404
405       demux->need_cache = TRUE;
406       while (!g_queue_is_empty (demux->queue)) {
407         GstFragment *fragment = g_queue_pop_head (demux->queue);
408         g_object_unref (fragment);
409       }
410       g_queue_clear (demux->queue);
411
412       GST_M3U8_CLIENT_LOCK (demux->client);
413       GST_DEBUG_OBJECT (demux, "seeking to sequence %d", current_sequence);
414       demux->client->sequence = current_sequence;
415       gst_m3u8_client_get_current_position (demux->client, &position);
416       demux->position_shift = start - position;
417       demux->need_segment = TRUE;
418       GST_M3U8_CLIENT_UNLOCK (demux->client);
419
420
421       if (flags & GST_SEEK_FLAG_FLUSH) {
422         GST_DEBUG_OBJECT (demux, "sending flush stop");
423         gst_pad_push_event (demux->srcpad, gst_event_new_flush_stop (TRUE));
424       }
425
426       demux->cancelled = FALSE;
427       gst_task_start (demux->stream_task);
428       g_rec_mutex_unlock (&demux->stream_lock);
429
430       return TRUE;
431     }
432     default:
433       break;
434   }
435
436   return gst_pad_event_default (pad, parent, event);
437 }
438
439 static gboolean
440 gst_hls_demux_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
441 {
442   GstHLSDemux *demux;
443   GstQuery *query;
444   gboolean ret;
445   gchar *uri;
446
447   demux = GST_HLS_DEMUX (parent);
448
449   switch (event->type) {
450     case GST_EVENT_EOS:{
451       gchar *playlist = NULL;
452
453       if (demux->playlist == NULL) {
454         GST_WARNING_OBJECT (demux, "Received EOS without a playlist.");
455         break;
456       }
457
458       GST_DEBUG_OBJECT (demux,
459           "Got EOS on the sink pad: main playlist fetched");
460
461       query = gst_query_new_uri ();
462       ret = gst_pad_peer_query (demux->sinkpad, query);
463       if (ret) {
464         gst_query_parse_uri (query, &uri);
465         gst_hls_demux_set_location (demux, uri);
466         g_free (uri);
467       }
468       gst_query_unref (query);
469
470       playlist = gst_hls_src_buf_to_utf8_playlist (demux->playlist);
471       demux->playlist = NULL;
472       if (playlist == NULL) {
473         GST_WARNING_OBJECT (demux, "Error validating first playlist.");
474       } else if (!gst_m3u8_client_update (demux->client, playlist)) {
475         /* In most cases, this will happen if we set a wrong url in the
476          * source element and we have received the 404 HTML response instead of
477          * the playlist */
478         GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid playlist."),
479             (NULL));
480         return FALSE;
481       }
482
483       if (!ret && gst_m3u8_client_is_live (demux->client)) {
484         GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND,
485             ("Failed querying the playlist uri, "
486                 "required for live sources."), (NULL));
487         return FALSE;
488       }
489
490       gst_task_start (demux->stream_task);
491       gst_event_unref (event);
492       return TRUE;
493     }
494     case GST_EVENT_SEGMENT:
495       /* Swallow newsegments, we'll push our own */
496       gst_event_unref (event);
497       return TRUE;
498     default:
499       break;
500   }
501
502   return gst_pad_event_default (pad, parent, event);
503 }
504
505 static gboolean
506 gst_hls_demux_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
507 {
508   GstHLSDemux *hlsdemux;
509   gboolean ret = FALSE;
510
511   if (query == NULL)
512     return FALSE;
513
514   hlsdemux = GST_HLS_DEMUX (parent);
515
516   switch (query->type) {
517     case GST_QUERY_DURATION:{
518       GstClockTime duration = -1;
519       GstFormat fmt;
520
521       gst_query_parse_duration (query, &fmt, NULL);
522       if (fmt == GST_FORMAT_TIME) {
523         duration = gst_m3u8_client_get_duration (hlsdemux->client);
524         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
525           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
526           ret = TRUE;
527         }
528       }
529       GST_INFO_OBJECT (hlsdemux, "GST_QUERY_DURATION returns %s with duration %"
530           GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
531       break;
532     }
533     case GST_QUERY_URI:
534       if (hlsdemux->client) {
535         /* FIXME: Do we answer with the variant playlist, with the current
536          * playlist or the the uri of the least downlowaded fragment? */
537         gst_query_set_uri (query, gst_m3u8_client_get_uri (hlsdemux->client));
538         ret = TRUE;
539       }
540       break;
541     case GST_QUERY_SEEKING:{
542       GstFormat fmt;
543       gint64 stop = -1;
544
545       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
546       GST_INFO_OBJECT (hlsdemux, "Received GST_QUERY_SEEKING with format %d",
547           fmt);
548       if (fmt == GST_FORMAT_TIME) {
549         GstClockTime duration;
550
551         duration = gst_m3u8_client_get_duration (hlsdemux->client);
552         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
553           stop = duration;
554
555         gst_query_set_seeking (query, fmt,
556             !gst_m3u8_client_is_live (hlsdemux->client), 0, stop);
557         ret = TRUE;
558         GST_INFO_OBJECT (hlsdemux, "GST_QUERY_SEEKING returning with stop : %"
559             GST_TIME_FORMAT, GST_TIME_ARGS (stop));
560       }
561       break;
562     }
563     default:
564       /* Don't fordward queries upstream because of the special nature of this
565        * "demuxer", which relies on the upstream element only to be fed with the
566        * first playlist */
567       break;
568   }
569
570   return ret;
571 }
572
573 static GstFlowReturn
574 gst_hls_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
575 {
576   GstHLSDemux *demux = GST_HLS_DEMUX (parent);
577
578   if (demux->playlist == NULL)
579     demux->playlist = buf;
580   else
581     demux->playlist = gst_buffer_append (demux->playlist, buf);
582
583   return GST_FLOW_OK;
584 }
585
586 static void
587 gst_hls_demux_stop (GstHLSDemux * demux)
588 {
589   gst_uri_downloader_cancel (demux->downloader);
590
591   if (GST_TASK_STATE (demux->updates_task) != GST_TASK_STOPPED) {
592     demux->stop_stream_task = TRUE;
593     gst_task_stop (demux->updates_task);
594     GST_TASK_SIGNAL (demux->updates_task);
595   }
596
597   if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED)
598     gst_task_stop (demux->stream_task);
599 }
600
601 static void
602 switch_pads (GstHLSDemux * demux, GstCaps * newcaps)
603 {
604   GstPad *oldpad = demux->srcpad;
605
606   GST_DEBUG ("Switching pads (oldpad:%p) with caps: %" GST_PTR_FORMAT, oldpad,
607       newcaps);
608
609   /* First create and activate new pad */
610   demux->srcpad = gst_pad_new_from_static_template (&srctemplate, NULL);
611   gst_pad_set_event_function (demux->srcpad,
612       GST_DEBUG_FUNCPTR (gst_hls_demux_src_event));
613   gst_pad_set_query_function (demux->srcpad,
614       GST_DEBUG_FUNCPTR (gst_hls_demux_src_query));
615   gst_pad_set_element_private (demux->srcpad, demux);
616   gst_pad_set_active (demux->srcpad, TRUE);
617   gst_pad_push_event (demux->srcpad, gst_event_new_stream_start ());
618   gst_pad_set_caps (demux->srcpad, newcaps);
619   gst_element_add_pad (GST_ELEMENT (demux), demux->srcpad);
620
621   gst_element_no_more_pads (GST_ELEMENT (demux));
622
623   if (oldpad) {
624     /* Push out EOS */
625     gst_pad_push_event (oldpad, gst_event_new_eos ());
626     gst_pad_set_active (oldpad, FALSE);
627     gst_element_remove_pad (GST_ELEMENT (demux), oldpad);
628   }
629 }
630
631 static void
632 gst_hls_demux_stream_loop (GstHLSDemux * demux)
633 {
634   GstFragment *fragment;
635   GstBuffer *buf;
636   GstFlowReturn ret;
637   GstCaps *bufcaps, *srccaps = NULL;
638
639   /* Loop for the source pad task. The task is started when we have
640    * received the main playlist from the source element. It tries first to
641    * cache the first fragments and then it waits until it has more data in the
642    * queue. This task is woken up when we push a new fragment to the queue or
643    * when we reached the end of the playlist  */
644
645   if (G_UNLIKELY (demux->need_cache)) {
646     if (!gst_hls_demux_cache_fragments (demux))
647       goto cache_error;
648
649     /* we can start now the updates thread (only if on playing) */
650     if (GST_STATE (demux) == GST_STATE_PLAYING)
651       gst_task_start (demux->updates_task);
652     GST_INFO_OBJECT (demux, "First fragments cached successfully");
653   }
654
655   if (g_queue_is_empty (demux->queue)) {
656     if (demux->end_of_playlist)
657       goto end_of_playlist;
658
659     goto pause_task;
660   }
661
662   fragment = g_queue_pop_head (demux->queue);
663   buf = gst_fragment_get_buffer (fragment);
664
665   /* Figure out if we need to create/switch pads */
666   if (G_LIKELY (demux->srcpad))
667     srccaps = gst_pad_get_current_caps (demux->srcpad);
668   bufcaps = gst_fragment_get_caps (fragment);
669   if (G_UNLIKELY (!srccaps || !gst_caps_is_equal_fixed (bufcaps, srccaps)
670           || demux->need_segment)) {
671     switch_pads (demux, bufcaps);
672     demux->need_segment = TRUE;
673   }
674   gst_caps_unref (bufcaps);
675   if (G_LIKELY (srccaps))
676     gst_caps_unref (srccaps);
677   g_object_unref (fragment);
678
679   if (demux->need_segment) {
680     GstSegment segment;
681     GstClockTime start = GST_BUFFER_PTS (buf);
682
683     start += demux->position_shift;
684     /* And send a newsegment */
685     GST_DEBUG_OBJECT (demux, "Sending new-segment. segment start:%"
686         GST_TIME_FORMAT, GST_TIME_ARGS (start));
687     gst_segment_init (&segment, GST_FORMAT_TIME);
688     segment.start = start;
689     segment.time = start;
690     gst_pad_push_event (demux->srcpad, gst_event_new_segment (&segment));
691     demux->need_segment = FALSE;
692     demux->position_shift = 0;
693   }
694
695   ret = gst_pad_push (demux->srcpad, buf);
696   if (ret != GST_FLOW_OK)
697     goto error_pushing;
698
699   return;
700
701 end_of_playlist:
702   {
703     GST_DEBUG_OBJECT (demux, "Reached end of playlist, sending EOS");
704     gst_pad_push_event (demux->srcpad, gst_event_new_eos ());
705     gst_hls_demux_stop (demux);
706     return;
707   }
708
709 cache_error:
710   {
711     gst_task_pause (demux->stream_task);
712     if (!demux->cancelled) {
713       GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND,
714           ("Could not cache the first fragments"), (NULL));
715       gst_hls_demux_stop (demux);
716     }
717     return;
718   }
719
720 error_pushing:
721   {
722     /* FIXME: handle error */
723     GST_DEBUG_OBJECT (demux, "Error pushing buffer: %s... stopping task",
724         gst_flow_get_name (ret));
725     gst_hls_demux_stop (demux);
726     return;
727   }
728
729 pause_task:
730   {
731     gst_task_pause (demux->stream_task);
732     return;
733   }
734 }
735
736 static void
737 gst_hls_demux_reset (GstHLSDemux * demux, gboolean dispose)
738 {
739   demux->need_cache = TRUE;
740   demux->end_of_playlist = FALSE;
741   demux->cancelled = FALSE;
742   demux->do_typefind = TRUE;
743
744   if (demux->input_caps) {
745     gst_caps_unref (demux->input_caps);
746     demux->input_caps = NULL;
747   }
748
749   if (demux->playlist) {
750     gst_buffer_unref (demux->playlist);
751     demux->playlist = NULL;
752   }
753
754   if (demux->client) {
755     gst_m3u8_client_free (demux->client);
756     demux->client = NULL;
757   }
758
759   if (!dispose) {
760     demux->client = gst_m3u8_client_new ("");
761   }
762
763   while (!g_queue_is_empty (demux->queue)) {
764     GstFragment *fragment = g_queue_pop_head (demux->queue);
765     g_object_unref (fragment);
766   }
767   g_queue_clear (demux->queue);
768
769   demux->position_shift = 0;
770   demux->need_segment = TRUE;
771 }
772
773 static gboolean
774 gst_hls_demux_set_location (GstHLSDemux * demux, const gchar * uri)
775 {
776   if (demux->client)
777     gst_m3u8_client_free (demux->client);
778   demux->client = gst_m3u8_client_new (uri);
779   GST_INFO_OBJECT (demux, "Changed location: %s", uri);
780   return TRUE;
781 }
782
783 void
784 gst_hls_demux_updates_loop (GstHLSDemux * demux)
785 {
786   /* Loop for the updates. It's started when the first fragments are cached and
787    * schedules the next update of the playlist (for lives sources) and the next
788    * update of fragments. When a new fragment is downloaded, it compares the
789    * download time with the next scheduled update to check if we can or should
790    * switch to a different bitrate */
791
792   /* block until the next scheduled update or the signal to quit this thread */
793   g_mutex_lock (&demux->updates_timed_lock);
794   GST_DEBUG_OBJECT (demux, "Started updates task");
795   while (TRUE) {
796     /* schedule the next update */
797     gst_hls_demux_schedule (demux);
798
799     /*  block until the next scheduled update or the signal to quit this thread */
800     if (g_cond_timed_wait (GST_TASK_GET_COND (demux->updates_task),
801             &demux->updates_timed_lock, &demux->next_update)) {
802       goto quit;
803     }
804     /* update the playlist for live sources */
805     if (gst_m3u8_client_is_live (demux->client)) {
806       if (!gst_hls_demux_update_playlist (demux, TRUE)) {
807         demux->client->update_failed_count++;
808         if (demux->client->update_failed_count < DEFAULT_FAILED_COUNT) {
809           GST_WARNING_OBJECT (demux, "Could not update the playlist");
810           continue;
811         } else {
812           GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND,
813               ("Could not update the playlist"), (NULL));
814           goto quit;
815         }
816       }
817     }
818
819     /* if it's a live source and the playlist couldn't be updated, there aren't
820      * more fragments in the playlist, so we just wait for the next schedulled
821      * update */
822     if (gst_m3u8_client_is_live (demux->client) &&
823         demux->client->update_failed_count > 0) {
824       GST_WARNING_OBJECT (demux,
825           "The playlist hasn't been updated, failed count is %d",
826           demux->client->update_failed_count);
827       continue;
828     }
829
830     /* fetch the next fragment */
831     if (g_queue_is_empty (demux->queue)) {
832       if (!gst_hls_demux_get_next_fragment (demux, FALSE)) {
833         if (!demux->end_of_playlist && !demux->cancelled) {
834           demux->client->update_failed_count++;
835           if (demux->client->update_failed_count < DEFAULT_FAILED_COUNT) {
836             GST_WARNING_OBJECT (demux, "Could not fetch the next fragment");
837             continue;
838           } else {
839             GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND,
840                 ("Could not fetch the next fragment"), (NULL));
841             goto quit;
842           }
843         }
844       } else {
845         demux->client->update_failed_count = 0;
846
847         /* try to switch to another bitrate if needed */
848         gst_hls_demux_switch_playlist (demux);
849       }
850     }
851   }
852
853 quit:
854   {
855     GST_DEBUG_OBJECT (demux, "Stopped updates task");
856     gst_hls_demux_stop (demux);
857     g_mutex_unlock (&demux->updates_timed_lock);
858   }
859 }
860
861 static gboolean
862 gst_hls_demux_cache_fragments (GstHLSDemux * demux)
863 {
864   gint i;
865
866   /* If this playlist is a variant playlist, select the first one
867    * and update it */
868   if (gst_m3u8_client_has_variant_playlist (demux->client)) {
869     GstM3U8 *child = NULL;
870
871     if (demux->connection_speed == 0) {
872
873       GST_M3U8_CLIENT_LOCK (demux->client);
874       child = demux->client->main->current_variant->data;
875       GST_M3U8_CLIENT_UNLOCK (demux->client);
876     } else {
877       GList *tmp = gst_m3u8_client_get_playlist_for_bitrate (demux->client,
878           demux->connection_speed);
879
880       child = GST_M3U8 (tmp->data);
881     }
882
883     gst_m3u8_client_set_current (demux->client, child);
884     if (!gst_hls_demux_update_playlist (demux, FALSE)) {
885       GST_ERROR_OBJECT (demux, "Could not fetch the child playlist %s",
886           child->uri);
887       return FALSE;
888     }
889   }
890
891   if (!gst_m3u8_client_is_live (demux->client)) {
892     GstClockTime duration = gst_m3u8_client_get_duration (demux->client);
893
894     GST_DEBUG_OBJECT (demux, "Sending duration message : %" GST_TIME_FORMAT,
895         GST_TIME_ARGS (duration));
896     if (duration != GST_CLOCK_TIME_NONE)
897       gst_element_post_message (GST_ELEMENT (demux),
898           gst_message_new_duration (GST_OBJECT (demux),
899               GST_FORMAT_TIME, duration));
900   }
901
902   /* Cache the first fragments */
903   for (i = 0; i < demux->fragments_cache; i++) {
904     gst_element_post_message (GST_ELEMENT (demux),
905         gst_message_new_buffering (GST_OBJECT (demux),
906             100 * i / demux->fragments_cache));
907     g_get_current_time (&demux->next_update);
908     if (!gst_hls_demux_get_next_fragment (demux, TRUE)) {
909       if (demux->end_of_playlist)
910         break;
911       if (!demux->cancelled)
912         GST_ERROR_OBJECT (demux, "Error caching the first fragments");
913       return FALSE;
914     }
915     /* make sure we stop caching fragments if something cancelled it */
916     if (demux->cancelled)
917       return FALSE;
918     gst_hls_demux_switch_playlist (demux);
919   }
920   gst_element_post_message (GST_ELEMENT (demux),
921       gst_message_new_buffering (GST_OBJECT (demux), 100));
922
923   g_get_current_time (&demux->next_update);
924
925   demux->need_cache = FALSE;
926   return TRUE;
927
928 }
929
930 static gchar *
931 gst_hls_src_buf_to_utf8_playlist (GstBuffer * buf)
932 {
933   GstMapInfo info;
934   gchar *playlist;
935
936   if (!gst_buffer_map (buf, &info, GST_MAP_READ))
937     return NULL;
938
939   if (!g_utf8_validate ((gchar *) info.data, info.size, NULL))
940     goto validate_error;
941
942   /* alloc size + 1 to end with a null character */
943   playlist = g_malloc0 (info.size + 1);
944   memcpy (playlist, info.data, info.size + 1);
945
946   gst_buffer_unmap (buf, &info);
947   gst_buffer_unref (buf);
948   return playlist;
949
950 validate_error:
951   gst_buffer_unmap (buf, &info);
952   gst_buffer_unref (buf);
953   return NULL;
954 }
955
956 static gboolean
957 gst_hls_demux_update_playlist (GstHLSDemux * demux, gboolean update)
958 {
959   GstFragment *download;
960   GstBuffer *buf;
961   gchar *playlist;
962   gboolean updated = FALSE;
963
964   const gchar *uri = gst_m3u8_client_get_current_uri (demux->client);
965
966   download = gst_uri_downloader_fetch_uri (demux->downloader, uri);
967
968   if (download == NULL)
969     return FALSE;
970
971   buf = gst_fragment_get_buffer (download);
972   playlist = gst_hls_src_buf_to_utf8_playlist (buf);
973   g_object_unref (download);
974
975   if (playlist == NULL) {
976     GST_WARNING_OBJECT (demux, "Couldn't not validate playlist encoding");
977     return FALSE;
978   }
979
980   updated = gst_m3u8_client_update (demux->client, playlist);
981
982   /*  If it's a live source, do not let the sequence number go beyond
983    * three fragments before the end of the list */
984   if (updated && update == FALSE && demux->client->current &&
985       gst_m3u8_client_is_live (demux->client)) {
986     guint last_sequence;
987
988     GST_M3U8_CLIENT_LOCK (demux->client);
989     last_sequence =
990         GST_M3U8_MEDIA_FILE (g_list_last (demux->client->current->
991             files)->data)->sequence;
992
993     if (demux->client->sequence >= last_sequence - 3) {
994       GST_DEBUG_OBJECT (demux, "Sequence is beyond playlist. Moving back to %d",
995           last_sequence - 3);
996       demux->need_segment = TRUE;
997       demux->client->sequence = last_sequence - 3;
998     }
999     GST_M3U8_CLIENT_UNLOCK (demux->client);
1000   }
1001
1002   return updated;
1003 }
1004
1005 static gboolean
1006 gst_hls_demux_change_playlist (GstHLSDemux * demux, guint max_bitrate)
1007 {
1008   GList *previous_variant, *current_variant;
1009   gint old_bandwidth, new_bandwidth;
1010
1011   /* If user specifies a connection speed never use a playlist with a bandwidth
1012    * superior than it */
1013   if (demux->connection_speed != 0 && max_bitrate > demux->connection_speed)
1014     max_bitrate = demux->connection_speed;
1015
1016   previous_variant = demux->client->main->current_variant;
1017   current_variant = gst_m3u8_client_get_playlist_for_bitrate (demux->client,
1018       max_bitrate);
1019
1020 retry_failover_protection:
1021   old_bandwidth = GST_M3U8 (previous_variant->data)->bandwidth;
1022   new_bandwidth = GST_M3U8 (current_variant->data)->bandwidth;
1023
1024   /* Don't do anything else if the playlist is the same */
1025   if (new_bandwidth == old_bandwidth) {
1026     return TRUE;
1027   }
1028
1029   demux->client->main->current_variant = current_variant;
1030   GST_M3U8_CLIENT_UNLOCK (demux->client);
1031
1032   gst_m3u8_client_set_current (demux->client, current_variant->data);
1033
1034   GST_INFO_OBJECT (demux, "Client was on %dbps, max allowed is %dbps, switching"
1035       " to bitrate %dbps", old_bandwidth, max_bitrate, new_bandwidth);
1036
1037   if (gst_hls_demux_update_playlist (demux, FALSE)) {
1038     GstStructure *s;
1039
1040     s = gst_structure_new ("playlist",
1041         "uri", G_TYPE_STRING, gst_m3u8_client_get_current_uri (demux->client),
1042         "bitrate", G_TYPE_INT, new_bandwidth, NULL);
1043     gst_element_post_message (GST_ELEMENT_CAST (demux),
1044         gst_message_new_element (GST_OBJECT_CAST (demux), s));
1045   } else {
1046     GList *failover = NULL;
1047
1048     GST_INFO_OBJECT (demux, "Unable to update playlist. Switching back");
1049     GST_M3U8_CLIENT_LOCK (demux->client);
1050
1051     failover = g_list_previous (current_variant);
1052     if (failover && new_bandwidth == GST_M3U8 (failover->data)->bandwidth) {
1053       current_variant = failover;
1054       goto retry_failover_protection;
1055     }
1056
1057     demux->client->main->current_variant = previous_variant;
1058     GST_M3U8_CLIENT_UNLOCK (demux->client);
1059     gst_m3u8_client_set_current (demux->client, previous_variant->data);
1060     /*  Try a lower bitrate (or stop if we just tried the lowest) */
1061     if (new_bandwidth ==
1062         GST_M3U8 (g_list_first (demux->client->main->lists)->data)->bandwidth)
1063       return FALSE;
1064     else
1065       return gst_hls_demux_change_playlist (demux, new_bandwidth - 1);
1066   }
1067
1068   /* Force typefinding since we might have changed media type */
1069   demux->do_typefind = TRUE;
1070
1071   return TRUE;
1072 }
1073
1074 static gboolean
1075 gst_hls_demux_schedule (GstHLSDemux * demux)
1076 {
1077   gfloat update_factor;
1078   gint count;
1079
1080   /* As defined in §6.3.4. Reloading the Playlist file:
1081    * "If the client reloads a Playlist file and finds that it has not
1082    * changed then it MUST wait for a period of time before retrying.  The
1083    * minimum delay is a multiple of the target duration.  This multiple is
1084    * 0.5 for the first attempt, 1.5 for the second, and 3.0 thereafter."
1085    */
1086   count = demux->client->update_failed_count;
1087   if (count < 3)
1088     update_factor = update_interval_factor[count];
1089   else
1090     update_factor = update_interval_factor[3];
1091
1092   /* schedule the next update using the target duration field of the
1093    * playlist */
1094   g_time_val_add (&demux->next_update,
1095       gst_m3u8_client_get_target_duration (demux->client)
1096       / GST_SECOND * G_USEC_PER_SEC * update_factor);
1097   GST_DEBUG_OBJECT (demux, "Next update scheduled at %s",
1098       g_time_val_to_iso8601 (&demux->next_update));
1099
1100   return TRUE;
1101 }
1102
1103 static gboolean
1104 gst_hls_demux_switch_playlist (GstHLSDemux * demux)
1105 {
1106   GTimeVal now;
1107   GstClockTime diff;
1108   gsize size;
1109   gint bitrate;
1110   GstFragment *fragment = g_queue_peek_tail (demux->queue);
1111   GstBuffer *buffer;
1112
1113   GST_M3U8_CLIENT_LOCK (demux->client);
1114   if (!demux->client->main->lists) {
1115     GST_M3U8_CLIENT_UNLOCK (demux->client);
1116     return TRUE;
1117   }
1118   GST_M3U8_CLIENT_UNLOCK (demux->client);
1119
1120   /* compare the time when the fragment was downloaded with the time when it was
1121    * scheduled */
1122   g_get_current_time (&now);
1123   diff = (GST_TIMEVAL_TO_TIME (now) - GST_TIMEVAL_TO_TIME (demux->next_update));
1124   buffer = gst_fragment_get_buffer (fragment);
1125   size = gst_buffer_get_size (buffer);
1126   bitrate = (size * 8) / ((double) diff / GST_SECOND);
1127
1128   GST_DEBUG ("Downloaded %d bytes in %" GST_TIME_FORMAT ". Bitrate is : %d",
1129       size, GST_TIME_ARGS (diff), bitrate);
1130
1131   gst_buffer_unref (buffer);
1132   return gst_hls_demux_change_playlist (demux, bitrate * demux->bitrate_limit);
1133 }
1134
1135 static gboolean
1136 gst_hls_demux_get_next_fragment (GstHLSDemux * demux, gboolean caching)
1137 {
1138   GstFragment *download;
1139   const gchar *next_fragment_uri;
1140   GstClockTime duration;
1141   GstClockTime timestamp;
1142   GstBuffer *buf;
1143   gboolean discont;
1144
1145   if (!gst_m3u8_client_get_next_fragment (demux->client, &discont,
1146           &next_fragment_uri, &duration, &timestamp)) {
1147     GST_INFO_OBJECT (demux, "This playlist doesn't contain more fragments");
1148     demux->end_of_playlist = TRUE;
1149     gst_task_start (demux->stream_task);
1150     return FALSE;
1151   }
1152
1153   GST_INFO_OBJECT (demux, "Fetching next fragment %s", next_fragment_uri);
1154
1155   download = gst_uri_downloader_fetch_uri (demux->downloader,
1156       next_fragment_uri);
1157
1158   if (download == NULL)
1159     goto error;
1160
1161   buf = gst_fragment_get_buffer (download);
1162   GST_BUFFER_DURATION (buf) = duration;
1163   GST_BUFFER_PTS (buf) = timestamp;
1164
1165   /* We actually need to do this every time we switch bitrate */
1166   if (G_UNLIKELY (demux->do_typefind)) {
1167     GstCaps *caps = gst_fragment_get_caps (download);
1168
1169     if (!demux->input_caps || !gst_caps_is_equal (caps, demux->input_caps)) {
1170       gst_caps_replace (&demux->input_caps, caps);
1171       /* gst_pad_set_caps (demux->srcpad, demux->input_caps); */
1172       GST_INFO_OBJECT (demux, "Input source caps: %" GST_PTR_FORMAT,
1173           demux->input_caps);
1174       demux->do_typefind = FALSE;
1175     }
1176     gst_caps_unref (caps);
1177   } else {
1178     gst_fragment_set_caps (download, demux->input_caps);
1179   }
1180
1181   if (discont) {
1182     GST_DEBUG_OBJECT (demux, "Marking fragment as discontinuous");
1183     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
1184   }
1185
1186   g_queue_push_tail (demux->queue, download);
1187   if (!caching) {
1188     GST_TASK_SIGNAL (demux->updates_task);
1189     gst_task_start (demux->stream_task);
1190   }
1191   return TRUE;
1192
1193 error:
1194   {
1195     gst_hls_demux_stop (demux);
1196     return FALSE;
1197   }
1198 }