adaptivedemux2: Add state checks and clean up obsolete variables
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / ext / adaptivedemux2 / gstadaptivedemux-stream.c
1 /* GStreamer
2  *
3  * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4  *   Author: Thiago Santos <thiagoss@osg.samsung.com>
5  *
6  * Copyright (C) 2021-2022 Centricular Ltd
7  *   Author: Edward Hervey <edward@centricular.com>
8  *   Author: Jan Schmidt <jan@centricular.com>
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29
30 #include "gstadaptivedemux.h"
31 #include "gstadaptivedemux-private.h"
32
33 #include <glib/gi18n-lib.h>
34 #include <gst/app/gstappsrc.h>
35
36 GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
37 #define GST_CAT_DEFAULT adaptivedemux2_debug
38
39 static void gst_adaptive_demux2_stream_finalize (GObject * object);
40 static void gst_adaptive_demux2_stream_error (GstAdaptiveDemux2Stream * stream);
41
42 #define gst_adaptive_demux2_stream_parent_class parent_class
43 G_DEFINE_ABSTRACT_TYPE (GstAdaptiveDemux2Stream, gst_adaptive_demux2_stream,
44     GST_TYPE_OBJECT);
45
46 static void
47 gst_adaptive_demux2_stream_class_init (GstAdaptiveDemux2StreamClass * klass)
48 {
49   GObjectClass *gobject_class = (GObjectClass *) klass;
50
51   gobject_class->finalize = gst_adaptive_demux2_stream_finalize;
52 }
53
54 static GType tsdemux_type = 0;
55
56 static void
57 gst_adaptive_demux2_stream_init (GstAdaptiveDemux2Stream * stream)
58 {
59   stream->download_request = download_request_new ();
60   stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
61   stream->last_ret = GST_FLOW_OK;
62   stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
63
64   stream->fragment_bitrates =
65       g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS);
66
67   stream->start_position = stream->current_position = GST_CLOCK_TIME_NONE;
68
69   gst_segment_init (&stream->parse_segment, GST_FORMAT_TIME);
70 }
71
72 /* must be called with manifest_lock taken.
73  * It will temporarily drop the manifest_lock in order to join the task.
74  * It will join only the old_streams (the demux->streams are joined by
75  * gst_adaptive_demux_stop_tasks before gst_adaptive_demux2_stream_free is
76  * called)
77  */
78 static void
79 gst_adaptive_demux2_stream_finalize (GObject * object)
80 {
81   GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) object;
82
83   GST_LOG_OBJECT (object, "Finalizing");
84
85   if (stream->download_request)
86     download_request_unref (stream->download_request);
87
88   g_clear_error (&stream->last_error);
89
90   gst_adaptive_demux2_stream_fragment_clear (&stream->fragment);
91
92   if (stream->pending_events) {
93     g_list_free_full (stream->pending_events, (GDestroyNotify) gst_event_unref);
94     stream->pending_events = NULL;
95   }
96
97   if (stream->parsebin_sink) {
98     gst_object_unref (stream->parsebin_sink);
99     stream->parsebin_sink = NULL;
100   }
101
102   if (stream->pad_added_id)
103     g_signal_handler_disconnect (stream->parsebin, stream->pad_added_id);
104   if (stream->pad_removed_id)
105     g_signal_handler_disconnect (stream->parsebin, stream->pad_removed_id);
106
107   if (stream->parsebin != NULL) {
108     GST_LOG_OBJECT (stream, "Removing parsebin");
109     gst_bin_remove (GST_BIN_CAST (stream->demux), stream->parsebin);
110     gst_element_set_state (stream->parsebin, GST_STATE_NULL);
111     gst_object_unref (stream->parsebin);
112     stream->parsebin = NULL;
113   }
114
115   g_free (stream->fragment_bitrates);
116
117   g_list_free_full (stream->tracks,
118       (GDestroyNotify) gst_adaptive_demux_track_unref);
119
120   if (stream->pending_caps)
121     gst_caps_unref (stream->pending_caps);
122
123   g_clear_pointer (&stream->pending_tags, gst_tag_list_unref);
124   g_clear_pointer (&stream->stream_collection, gst_object_unref);
125
126   G_OBJECT_CLASS (parent_class)->finalize (object);
127 }
128
129 /**
130  * gst_adaptive_demux2_stream_add_track:
131  * @stream: A #GstAdaptiveDemux2Stream
132  * @track: (transfer none): A #GstAdaptiveDemuxTrack to assign to the @stream
133  *
134  * This function is called when a subclass knows of a target @track that this
135  * @stream can provide.
136  */
137 gboolean
138 gst_adaptive_demux2_stream_add_track (GstAdaptiveDemux2Stream * stream,
139     GstAdaptiveDemuxTrack * track)
140 {
141   g_return_val_if_fail (track != NULL, FALSE);
142
143   GST_DEBUG_OBJECT (stream->demux, "stream:%p track:%s", stream,
144       track->stream_id);
145   if (g_list_find (stream->tracks, track)) {
146     GST_DEBUG_OBJECT (stream->demux,
147         "track '%s' already handled by this stream", track->stream_id);
148     return FALSE;
149   }
150
151   if (stream->demux->buffering_low_watermark_time)
152     track->buffering_threshold = stream->demux->buffering_low_watermark_time;
153   else if (GST_CLOCK_TIME_IS_VALID (stream->recommended_buffering_threshold))
154     track->buffering_threshold =
155         MIN (10 * GST_SECOND, stream->recommended_buffering_threshold);
156   else {
157     /* Using a starting default, can be overriden later in
158      * ::update_stream_info() */
159     GST_DEBUG_OBJECT (stream,
160         "Setting default 10s buffering threshold on new track");
161     track->buffering_threshold = 10 * GST_SECOND;
162   }
163
164   stream->tracks =
165       g_list_append (stream->tracks, gst_adaptive_demux_track_ref (track));
166   if (stream->demux) {
167     g_assert (stream->period);
168     gst_adaptive_demux_period_add_track (stream->period, track);
169   }
170   return TRUE;
171 }
172
173 static gboolean
174 gst_adaptive_demux2_stream_next_download (GstAdaptiveDemux2Stream * stream);
175 static gboolean
176 gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream);
177 static void
178 gst_adaptive_demux2_stream_handle_playlist_eos (GstAdaptiveDemux2Stream *
179     stream);
180 static GstFlowReturn
181 gst_adaptive_demux2_stream_begin_download_uri (GstAdaptiveDemux * demux,
182     GstAdaptiveDemux2Stream * stream, const gchar * uri, gint64 start,
183     gint64 end);
184
185 #ifndef GST_DISABLE_GST_DEBUG
186 static const char *
187 uritype (GstAdaptiveDemux2Stream * s)
188 {
189   if (s->downloading_header)
190     return "header";
191   if (s->downloading_index)
192     return "index";
193   return "fragment";
194 }
195 #endif
196
197 /* Schedules another chunked download (returns TRUE) or FALSE if no more chunks */
198 static gboolean
199 schedule_another_chunk (GstAdaptiveDemux2Stream * stream)
200 {
201   GstAdaptiveDemux *demux = stream->demux;
202   DownloadRequest *request = stream->download_request;
203   GstFlowReturn ret;
204
205   gchar *uri = request->uri;
206   gint64 range_start = request->range_start;
207   gint64 range_end = request->range_end;
208   gint64 chunk_size;
209   gint64 chunk_end;
210
211   if (range_end == -1)
212     return FALSE;               /* This was a request to the end, no more to load */
213
214   /* The size of the request that just completed: */
215   chunk_size = range_end + 1 - range_start;
216
217   if (request->content_received < chunk_size)
218     return FALSE;               /* Short read - we're done */
219
220   /* Accumulate the data we just fetched, to figure out the next
221    * request start position and update the target chunk size from
222    * the updated stream fragment info */
223   range_start += chunk_size;
224   range_end = stream->fragment.range_end;
225   chunk_size = stream->fragment.chunk_size;
226
227   if (chunk_size == 0)
228     return FALSE;               /* Sub-class doesn't want another chunk */
229
230   /* HTTP ranges are inclusive for the end */
231   if (chunk_size != -1) {
232     chunk_end = range_start + chunk_size - 1;
233     if (range_end != -1 && range_end < chunk_end)
234       chunk_end = range_end;
235   } else {
236     chunk_end = range_end;
237   }
238
239   GST_DEBUG_OBJECT (stream,
240       "Starting next chunk %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT
241       " chunk_size %" G_GINT64_FORMAT, uri, range_start, chunk_end, chunk_size);
242
243   ret =
244       gst_adaptive_demux2_stream_begin_download_uri (demux, stream, uri,
245       range_start, chunk_end);
246   if (ret != GST_FLOW_OK) {
247     GST_DEBUG_OBJECT (stream,
248         "Stopping stream due to begin download failure - ret %s",
249         gst_flow_get_name (ret));
250     gst_adaptive_demux2_stream_stop (stream);
251     return FALSE;
252   }
253
254   return TRUE;
255 }
256
257 static void
258 drain_inactive_tracks (GstAdaptiveDemux * demux,
259     GstAdaptiveDemux2Stream * stream)
260 {
261   GList *iter;
262
263   TRACKS_LOCK (demux);
264   for (iter = stream->tracks; iter; iter = iter->next) {
265     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
266     if (!track->selected) {
267       gst_adaptive_demux_track_drain_to (track,
268           demux->priv->global_output_position);
269     }
270   }
271
272   TRACKS_UNLOCK (demux);
273 }
274
275 /* Called to complete a download, either due to failure or completion
276  * Should set up the next download if necessary */
277 static void
278 gst_adaptive_demux2_stream_finish_download (GstAdaptiveDemux2Stream *
279     stream, GstFlowReturn ret, GError * err)
280 {
281   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux);
282   GstAdaptiveDemux *demux = stream->demux;
283
284   GST_DEBUG_OBJECT (stream,
285       "%s download finish: %d %s - err: %p", uritype (stream), ret,
286       gst_flow_get_name (ret), err);
287
288   stream->download_finished = TRUE;
289
290   /* finish_fragment might call gst_adaptive_demux2_stream_advance_fragment,
291    * which can look at the last_ret - so make sure it's stored before calling that.
292    * Also, for not-linked or other errors passed in that are going to make
293    * this stream stop, we'll need to store it */
294   stream->last_ret = ret;
295
296   if (err) {
297     g_clear_error (&stream->last_error);
298     stream->last_error = g_error_copy (err);
299   }
300
301   /* For actual errors, stop now, no need to call finish_fragment and get
302    * confused if it returns a non-error status, but if EOS was passed in,
303    * continue and check whether finish_fragment() says we've finished
304    * the whole manifest or just this fragment */
305   if (ret < 0 && ret != GST_FLOW_EOS) {
306     GST_INFO_OBJECT (stream,
307         "Stopping stream due to error ret %s", gst_flow_get_name (ret));
308     gst_adaptive_demux2_stream_stop (stream);
309     return;
310   }
311
312   /* Handle all the possible flow returns here: */
313   if (ret == GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC) {
314     /* We lost sync, seek back to live and return */
315     GST_WARNING_OBJECT (stream, "Lost sync when downloading");
316     gst_adaptive_demux_handle_lost_sync (demux);
317     return;
318   } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_END_OF_FRAGMENT) {
319     /* The sub-class wants to stop the fragment immediately */
320     stream->fragment.finished = TRUE;
321     ret = klass->finish_fragment (demux, stream);
322
323     GST_DEBUG_OBJECT (stream, "finish_fragment ret %d %s", ret,
324         gst_flow_get_name (ret));
325   } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_RESTART_FRAGMENT) {
326     GST_DEBUG_OBJECT (stream, "Restarting download as requested");
327     /* Just mark the fragment as finished */
328     stream->fragment.finished = TRUE;
329     ret = GST_FLOW_OK;
330   } else if (!klass->need_another_chunk || stream->fragment.chunk_size == -1
331       || !klass->need_another_chunk (stream)
332       || stream->fragment.chunk_size == 0) {
333     stream->fragment.finished = TRUE;
334     ret = klass->finish_fragment (stream->demux, stream);
335
336     GST_DEBUG_OBJECT (stream, "finish_fragment ret %d %s", ret,
337         gst_flow_get_name (ret));
338   } else if (stream->fragment.chunk_size != 0
339       && schedule_another_chunk (stream)) {
340     /* Another download has already begun, no need to queue anything below */
341     return;
342   }
343
344   /* For HLS, we might be enqueueing data into tracks that aren't
345    * selected. Drain those ones out */
346   drain_inactive_tracks (stream->demux, stream);
347
348   /* Now that we've called finish_fragment we can clear these flags the
349    * sub-class might have checked */
350   if (stream->downloading_header) {
351     stream->need_header = FALSE;
352     stream->downloading_header = FALSE;
353   } else if (stream->downloading_index) {
354     stream->need_index = FALSE;
355     stream->downloading_index = FALSE;
356     /* Restart the fragment again now that header + index were loaded
357      * so that get_fragment_info() will be called again */
358     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
359   } else {
360     /* Finishing a fragment data download. Try for another */
361     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
362   }
363
364   /* if GST_FLOW_EOS was passed in that means this download is finished,
365    * but it's the result returned from finish_fragment() we really care
366    * about, as that tells us if the manifest has run out of fragments
367    * to load */
368   if (ret == GST_FLOW_EOS) {
369     stream->last_ret = ret;
370
371     gst_adaptive_demux2_stream_handle_playlist_eos (stream);
372     return;
373   }
374
375   /* Now finally, if ret is anything other than success, we should stop this
376    * stream */
377   if (ret < 0) {
378     GST_DEBUG_OBJECT (stream,
379         "Stopping stream due to finish fragment ret %s",
380         gst_flow_get_name (ret));
381     gst_adaptive_demux2_stream_stop (stream);
382     return;
383   }
384
385   /* Clear the last_ret marker before starting a fresh download */
386   stream->last_ret = GST_FLOW_OK;
387
388   GST_LOG_OBJECT (stream, "Scheduling next_download() call");
389   stream->pending_cb_id =
390       gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
391       (GSourceFunc) gst_adaptive_demux2_stream_next_download,
392       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
393 }
394
395 /* Must be called from the scheduler context */
396 void
397 gst_adaptive_demux2_stream_parse_error (GstAdaptiveDemux2Stream * stream,
398     GError * err)
399 {
400   GstAdaptiveDemux *demux = stream->demux;
401
402   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
403     return;
404
405   downloadhelper_cancel_request (demux->download_helper,
406       stream->download_request);
407
408   /* cancellation is async, so recycle our download request to avoid races */
409   download_request_unref (stream->download_request);
410   stream->download_request = download_request_new ();
411
412   gst_adaptive_demux2_stream_finish_download (stream, GST_FLOW_CUSTOM_ERROR,
413       err);
414 }
415
416 static void
417 gst_adaptive_demux2_stream_prepare_segment (GstAdaptiveDemux * demux,
418     GstAdaptiveDemux2Stream * stream, gboolean first_and_live)
419 {
420   GstClockTime period_start = gst_adaptive_demux_get_period_start_time (demux);
421   GstClockTime offset =
422       gst_adaptive_demux2_stream_get_presentation_offset (demux, stream);
423
424   stream->parse_segment = demux->segment;
425
426   /* The demuxer segment is just built from seek events, but for each stream
427    * we have to adjust segments according to the current period and the
428    * stream specific presentation time offset.
429    *
430    * For each period, buffer timestamps start again from 0. Additionally the
431    * buffer timestamps are shifted by the stream specific presentation time
432    * offset, so the first buffer timestamp of a period is 0 + presentation
433    * time offset. If the stream contains timestamps itself, this is also
434    * supposed to be the presentation time stored inside the stream.
435    *
436    * The stream time over periods is supposed to be continuous, that is the
437    * buffer timestamp 0 + presentation time offset should map to the start
438    * time of the current period.
439    *
440    *
441    * The adjustment of the stream segments as such works the following.
442    *
443    * If the demuxer segment start is bigger than the period start, this
444    * means that we have to drop some media at the beginning of the current
445    * period, e.g. because a seek into the middle of the period has
446    * happened. The amount of media to drop is the difference between the
447    * period start and the demuxer segment start, and as each period starts
448    * again from 0, this difference is going to be the actual stream's
449    * segment start. As all timestamps of the stream are shifted by the
450    * presentation time offset, we will also have to move the segment start
451    * by that offset.
452    *
453    * Likewise, the demuxer segment stop value is adjusted in the same
454    * fashion.
455    *
456    * Now the running time and stream time at the stream's segment start has
457    * to be the one that is stored inside the demuxer's segment, which means
458    * that segment.base and segment.time have to be copied over (done just
459    * above)
460    *
461    *
462    * If the demuxer segment start is smaller than the period start time,
463    * this means that the whole period is inside the segment. As each period
464    * starts timestamps from 0, and additionally timestamps are shifted by
465    * the presentation time offset, the stream's first timestamp (and as such
466    * the stream's segment start) has to be the presentation time offset.
467    * The stream time at the segment start is supposed to be the stream time
468    * of the period start according to the demuxer segment, so the stream
469    * segment's time would be set to that. The same goes for the stream
470    * segment's base, which is supposed to be the running time of the period
471    * start according to the demuxer's segment.
472    *
473    * The same logic applies for negative rates with the segment stop and
474    * the period stop time (which gets clamped).
475    *
476    *
477    * For the first case where not the complete period is inside the segment,
478    * the segment time and base as calculated by the second case would be
479    * equivalent.
480    */
481   GST_DEBUG_OBJECT (demux, "Using demux segment %" GST_SEGMENT_FORMAT,
482       &demux->segment);
483   GST_DEBUG_OBJECT (demux,
484       "period_start: %" GST_TIME_FORMAT " offset: %" GST_TIME_FORMAT,
485       GST_TIME_ARGS (period_start), GST_TIME_ARGS (offset));
486   /* note for readers:
487    * Since stream->parse_segment is initially a copy of demux->segment,
488    * only the values that need updating are modified below. */
489   if (first_and_live) {
490     /* If first and live, demuxer did seek to the current position already */
491     stream->parse_segment.start = demux->segment.start - period_start + offset;
492     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
493       stream->parse_segment.stop = demux->segment.stop - period_start + offset;
494     /* FIXME : Do we need to handle negative rates for this ? */
495     stream->parse_segment.position = stream->parse_segment.start;
496   } else if (demux->segment.start > period_start) {
497     /* seek within a period */
498     stream->parse_segment.start = demux->segment.start - period_start + offset;
499     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
500       stream->parse_segment.stop = demux->segment.stop - period_start + offset;
501     if (stream->parse_segment.rate >= 0)
502       stream->parse_segment.position = offset;
503     else
504       stream->parse_segment.position = stream->parse_segment.stop;
505   } else {
506     stream->parse_segment.start = offset;
507     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
508       stream->parse_segment.stop = demux->segment.stop - period_start + offset;
509     if (stream->parse_segment.rate >= 0) {
510       stream->parse_segment.position = offset;
511       stream->parse_segment.base =
512           gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
513           period_start);
514     } else {
515       stream->parse_segment.position = stream->parse_segment.stop;
516       stream->parse_segment.base =
517           gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
518           period_start + demux->segment.stop - demux->segment.start);
519     }
520     stream->parse_segment.time =
521         gst_segment_to_stream_time (&demux->segment, GST_FORMAT_TIME,
522         period_start);
523   }
524
525   stream->send_segment = TRUE;
526
527   GST_DEBUG_OBJECT (stream, "Prepared segment %" GST_SEGMENT_FORMAT,
528       &stream->parse_segment);
529 }
530
531 /* Segment lock hold */
532 static void
533 update_buffer_pts_and_demux_position_locked (GstAdaptiveDemux * demux,
534     GstAdaptiveDemux2Stream * stream, GstBuffer * buffer)
535 {
536   GstClockTimeDiff pos;
537
538   GST_DEBUG_OBJECT (stream, "stream->fragment.stream_time %" GST_STIME_FORMAT,
539       GST_STIME_ARGS (stream->fragment.stream_time));
540
541   pos = stream->fragment.stream_time;
542
543   if (GST_CLOCK_STIME_IS_VALID (pos)) {
544     GstClockTime offset =
545         gst_adaptive_demux2_stream_get_presentation_offset (demux, stream);
546
547     pos += offset;
548
549     if (pos < 0) {
550       GST_WARNING_OBJECT (stream, "Clamping segment and buffer position to 0");
551       pos = 0;
552     }
553
554     GST_BUFFER_PTS (buffer) = pos;
555   } else {
556     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
557   }
558
559   GST_DEBUG_OBJECT (stream, "Buffer/stream position is now: %" GST_TIME_FORMAT,
560       GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
561 }
562
563 /* Must be called from the scheduler context */
564 GstFlowReturn
565 gst_adaptive_demux2_stream_push_buffer (GstAdaptiveDemux2Stream * stream,
566     GstBuffer * buffer)
567 {
568   GstAdaptiveDemux *demux = stream->demux;
569   GstFlowReturn ret = GST_FLOW_OK;
570   gboolean discont = FALSE;
571   /* Pending events */
572   GstEvent *pending_caps = NULL, *pending_segment = NULL, *pending_tags =
573       NULL, *stream_start = NULL, *buffer_gap = NULL;
574   GList *pending_events = NULL;
575
576   if (stream->compute_segment) {
577     gst_adaptive_demux2_stream_prepare_segment (demux, stream,
578         stream->first_and_live);
579     stream->compute_segment = FALSE;
580     stream->first_and_live = FALSE;
581   }
582
583   if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DROPPABLE)) {
584     GST_DEBUG_OBJECT (stream, "Creating gap event for droppable buffer");
585     buffer_gap =
586         gst_event_new_gap (GST_BUFFER_PTS (buffer),
587         GST_BUFFER_DURATION (buffer));
588   }
589
590   if (stream->first_fragment_buffer) {
591     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
592     if (demux->segment.rate < 0)
593       /* Set DISCONT flag for every first buffer in reverse playback mode
594        * as each fragment for its own has to be reversed */
595       discont = TRUE;
596     update_buffer_pts_and_demux_position_locked (demux, stream, buffer);
597     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
598
599     GST_LOG_OBJECT (stream, "Handling initial buffer %" GST_PTR_FORMAT, buffer);
600
601     /* Do we need to inject STREAM_START and SEGMENT events ?
602      *
603      * This can happen when a stream is restarted, and also when switching to a
604      * variant which needs a header (in which case downloading_header will be
605      * TRUE)
606      */
607     if (G_UNLIKELY (stream->send_segment || stream->downloading_header)) {
608       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
609       pending_segment = gst_event_new_segment (&stream->parse_segment);
610       gst_event_set_seqnum (pending_segment, demux->priv->segment_seqnum);
611       stream->send_segment = FALSE;
612       GST_DEBUG_OBJECT (stream, "Sending %" GST_PTR_FORMAT, pending_segment);
613       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
614       stream_start = gst_event_new_stream_start ("bogus");
615       if (demux->have_group_id)
616         gst_event_set_group_id (stream_start, demux->group_id);
617     }
618   } else {
619     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
620   }
621   stream->first_fragment_buffer = FALSE;
622
623   if (stream->discont) {
624     discont = TRUE;
625     stream->discont = FALSE;
626   }
627
628   if (discont) {
629     GST_DEBUG_OBJECT (stream, "Marking fragment as discontinuous");
630     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
631   } else {
632     GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
633   }
634
635   GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
636   GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
637   if (G_UNLIKELY (stream->pending_caps)) {
638     pending_caps = gst_event_new_caps (stream->pending_caps);
639     gst_caps_unref (stream->pending_caps);
640     stream->pending_caps = NULL;
641   }
642
643   if (G_UNLIKELY (stream->pending_tags)) {
644     GstTagList *tags = stream->pending_tags;
645
646     stream->pending_tags = NULL;
647
648     if (tags)
649       pending_tags = gst_event_new_tag (tags);
650   }
651   if (G_UNLIKELY (stream->pending_events)) {
652     pending_events = stream->pending_events;
653     stream->pending_events = NULL;
654   }
655
656   /* Do not push events or buffers holding the manifest lock */
657   if (G_UNLIKELY (stream_start)) {
658     GST_DEBUG_OBJECT (stream,
659         "Setting stream start: %" GST_PTR_FORMAT, stream_start);
660     gst_pad_send_event (stream->parsebin_sink, stream_start);
661   }
662   if (G_UNLIKELY (pending_caps)) {
663     GST_DEBUG_OBJECT (stream,
664         "Setting pending caps: %" GST_PTR_FORMAT, pending_caps);
665     gst_pad_send_event (stream->parsebin_sink, pending_caps);
666   }
667   if (G_UNLIKELY (pending_segment)) {
668     GST_DEBUG_OBJECT (stream,
669         "Sending pending seg: %" GST_PTR_FORMAT, pending_segment);
670     gst_pad_send_event (stream->parsebin_sink, pending_segment);
671   }
672   if (G_UNLIKELY (pending_tags)) {
673     GST_DEBUG_OBJECT (stream,
674         "Sending pending tags: %" GST_PTR_FORMAT, pending_tags);
675     gst_pad_send_event (stream->parsebin_sink, pending_tags);
676   }
677   while (pending_events != NULL) {
678     GstEvent *event = pending_events->data;
679
680     GST_DEBUG_OBJECT (stream, "Sending pending event: %" GST_PTR_FORMAT, event);
681     if (!gst_pad_send_event (stream->parsebin_sink, event))
682       GST_ERROR_OBJECT (stream, "Failed to send pending event");
683
684     pending_events = g_list_delete_link (pending_events, pending_events);
685   }
686
687   GST_DEBUG_OBJECT (stream,
688       "About to push buffer of size %" G_GSIZE_FORMAT " offset %"
689       G_GUINT64_FORMAT, gst_buffer_get_size (buffer),
690       GST_BUFFER_OFFSET (buffer));
691
692   ret = gst_pad_chain (stream->parsebin_sink, buffer);
693
694   if (buffer_gap) {
695     GST_DEBUG_OBJECT (stream, "Sending %" GST_PTR_FORMAT, buffer_gap);
696     gst_pad_send_event (stream->parsebin_sink, buffer_gap);
697   }
698
699   if (G_UNLIKELY (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)) {
700     GST_LOG_OBJECT (demux, "Stream was cancelled");
701     return GST_FLOW_FLUSHING;
702   }
703
704   GST_LOG_OBJECT (stream, "Push result: %d %s", ret, gst_flow_get_name (ret));
705
706   return ret;
707 }
708
709 static GstFlowReturn
710 gst_adaptive_demux2_stream_parse_buffer (GstAdaptiveDemux2Stream * stream,
711     GstBuffer * buffer)
712 {
713   GstAdaptiveDemux *demux = stream->demux;
714   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
715   GstFlowReturn ret = GST_FLOW_OK;
716
717   /* do not make any changes if the stream is cancelled */
718   if (G_UNLIKELY (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)) {
719     GST_DEBUG_OBJECT (stream, "Stream was stopped. Aborting");
720     gst_buffer_unref (buffer);
721     return GST_FLOW_FLUSHING;
722   }
723
724   /* starting_fragment is set to TRUE at the beginning of
725    * _stream_download_fragment()
726    * /!\ If there is a header/index being downloaded, then this will
727    * be TRUE for the first one ... but FALSE for the remaining ones,
728    * including the *actual* fragment ! */
729   if (stream->starting_fragment) {
730     stream->starting_fragment = FALSE;
731     if (klass->start_fragment != NULL && !klass->start_fragment (demux, stream))
732       return GST_FLOW_ERROR;
733   }
734
735   stream->download_total_bytes += gst_buffer_get_size (buffer);
736
737   GST_TRACE_OBJECT (stream,
738       "Received %s buffer of size %" G_GSIZE_FORMAT, uritype (stream),
739       gst_buffer_get_size (buffer));
740
741   ret = klass->data_received (demux, stream, buffer);
742
743   if (ret != GST_FLOW_OK) {
744     GST_DEBUG_OBJECT (stream, "data_received returned %s",
745         gst_flow_get_name (ret));
746
747     if (ret == GST_FLOW_FLUSHING) {
748       /* do not make any changes if the stream is cancelled */
749       if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED) {
750         GST_DEBUG_OBJECT (stream, "Stream was stopped. Aborting");
751         return ret;
752       }
753     }
754
755     if (ret < GST_FLOW_EOS) {
756       GstEvent *eos = gst_event_new_eos ();
757       GST_ELEMENT_FLOW_ERROR (demux, ret);
758
759       GST_DEBUG_OBJECT (stream, "Pushing EOS to parser");
760
761       /* TODO push this on all pads */
762       gst_event_set_seqnum (eos, stream->demux->priv->segment_seqnum);
763       gst_pad_send_event (stream->parsebin_sink, eos);
764       ret = GST_FLOW_ERROR;
765
766       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_ERRORED;
767     }
768   }
769
770   return ret;
771 }
772
773 /* Calculate the low and high download buffering watermarks
774  * in time as MAX (low-watermark-time, low-watermark-fragments) and
775  * MIN (high-watermark-time, high-watermark-fragments) respectively
776  */
777 static void
778 calculate_track_thresholds (GstAdaptiveDemux * demux,
779     GstAdaptiveDemux2Stream * stream,
780     GstClockTime fragment_duration, GstClockTime * low_threshold,
781     GstClockTime * high_threshold)
782 {
783   GST_OBJECT_LOCK (demux);
784   *low_threshold = demux->buffering_low_watermark_fragments * fragment_duration;
785   if (*low_threshold == 0 ||
786       (demux->buffering_low_watermark_time != 0
787           && demux->buffering_low_watermark_time > *low_threshold)) {
788     *low_threshold = demux->buffering_low_watermark_time;
789   }
790
791   if (*low_threshold == 0) {
792     /* This implies both low level properties were 0, the default is 10s unless
793      * the subclass has specified a recommended buffering threshold */
794     *low_threshold = 10 * GST_SECOND;
795     if (GST_CLOCK_TIME_IS_VALID (stream->recommended_buffering_threshold))
796       *low_threshold =
797           MIN (stream->recommended_buffering_threshold, *low_threshold);
798   }
799
800   *high_threshold =
801       demux->buffering_high_watermark_fragments * fragment_duration;
802   if (*high_threshold == 0 || (demux->buffering_high_watermark_time != 0
803           && demux->buffering_high_watermark_time < *high_threshold)) {
804     *high_threshold = demux->buffering_high_watermark_time;
805   }
806
807   /* Make sure the low and high thresholds are less than the maximum buffering
808    * time */
809   if (*high_threshold == 0 ||
810       (demux->max_buffering_time != 0
811           && demux->max_buffering_time < *high_threshold)) {
812     *high_threshold = demux->max_buffering_time;
813   }
814
815   if (*low_threshold == 0 ||
816       (demux->max_buffering_time != 0
817           && demux->max_buffering_time < *low_threshold)) {
818     *low_threshold = demux->max_buffering_time;
819   }
820
821   /* Make sure the high threshold is higher than (or equal to) the low threshold.
822    * It's OK if they are the same, as the minimum download is 1 fragment */
823   if (*high_threshold == 0 ||
824       (*low_threshold != 0 && *low_threshold > *high_threshold)) {
825     *high_threshold = *low_threshold;
826   }
827
828   GST_OBJECT_UNLOCK (demux);
829 }
830
831 #define ABSDIFF(a,b) ((a) < (b) ? (b) - (a) : (a) - (b))
832 static gboolean
833 gst_adaptive_demux2_stream_wait_for_output_space (GstAdaptiveDemux * demux,
834     GstAdaptiveDemux2Stream * stream, GstClockTime fragment_duration)
835 {
836   gboolean need_to_wait = TRUE;
837   gboolean have_any_tracks = FALSE;
838   gboolean have_active_tracks = FALSE;
839   gboolean have_filled_inactive = FALSE;
840   gboolean update_buffering = FALSE;
841
842   GstClockTime low_threshold = 0, high_threshold = 0;
843   GList *iter;
844
845   calculate_track_thresholds (demux, stream, fragment_duration,
846       &low_threshold, &high_threshold);
847   GST_DEBUG_OBJECT (stream,
848       "Thresholds low:%" GST_TIME_FORMAT " high:%" GST_TIME_FORMAT
849       " recommended:%" GST_TIME_FORMAT, GST_TIME_ARGS (low_threshold),
850       GST_TIME_ARGS (high_threshold),
851       GST_TIME_ARGS (stream->recommended_buffering_threshold));
852
853   /* If there are no tracks at all, don't wait. If there are no active
854    * tracks, keep filling until at least one track is full. If there
855    * are active tracks, require that they are all full */
856   TRACKS_LOCK (demux);
857   for (iter = stream->tracks; iter; iter = iter->next) {
858     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
859
860     /* Update the buffering threshold if it changed by more than a second */
861     if (ABSDIFF (low_threshold, track->buffering_threshold) > GST_SECOND) {
862       GST_DEBUG_OBJECT (stream, "Updating threshold");
863       /* The buffering threshold for this track changed, make sure to
864        * re-check buffering status */
865       update_buffering = TRUE;
866       track->buffering_threshold = low_threshold;
867     }
868
869     have_any_tracks = TRUE;
870     if (track->active)
871       have_active_tracks = TRUE;
872
873     if (track->level_time < high_threshold) {
874       if (track->active) {
875         need_to_wait = FALSE;
876         GST_DEBUG_OBJECT (stream,
877             "track %s has level %" GST_TIME_FORMAT
878             " - needs more data (target %" GST_TIME_FORMAT
879             ") (fragment duration %" GST_TIME_FORMAT ")",
880             track->stream_id, GST_TIME_ARGS (track->level_time),
881             GST_TIME_ARGS (high_threshold), GST_TIME_ARGS (fragment_duration));
882         continue;
883       }
884     } else if (!track->active) {        /* track is over threshold and inactive */
885       have_filled_inactive = TRUE;
886     }
887
888     GST_DEBUG_OBJECT (stream,
889         "track %s active (%d) has level %" GST_TIME_FORMAT,
890         track->stream_id, track->active, GST_TIME_ARGS (track->level_time));
891   }
892
893   /* If there are no tracks, don't wait (we might need data to create them),
894    * or if there are active tracks that need more data to hit the threshold,
895    * don't wait. Otherwise it means all active tracks are full and we should wait */
896   if (!have_any_tracks) {
897     GST_DEBUG_OBJECT (stream, "no tracks created yet - not waiting");
898     need_to_wait = FALSE;
899   } else if (!have_active_tracks && !have_filled_inactive) {
900     GST_DEBUG_OBJECT (stream,
901         "have only inactive tracks that need more data - not waiting");
902     need_to_wait = FALSE;
903   }
904
905   if (need_to_wait) {
906     stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
907
908     for (iter = stream->tracks; iter; iter = iter->next) {
909       GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
910
911       GST_DEBUG_OBJECT (stream,
912           "Waiting for queued data on track %s to drop below %"
913           GST_TIME_FORMAT " (fragment duration %" GST_TIME_FORMAT ")",
914           track->stream_id, GST_TIME_ARGS (high_threshold),
915           GST_TIME_ARGS (fragment_duration));
916
917       /* we want to get woken up when the global output position reaches
918        * a point where the input is closer than "high_threshold" to needing
919        * output, so we can put more data in */
920       GstClockTimeDiff wakeup_time = track->input_time - high_threshold;
921
922       if (stream->next_input_wakeup_time == GST_CLOCK_STIME_NONE ||
923           wakeup_time < stream->next_input_wakeup_time) {
924         stream->next_input_wakeup_time = wakeup_time;
925
926         GST_DEBUG_OBJECT (stream,
927             "Track %s level %" GST_TIME_FORMAT ". Input at position %"
928             GST_TIME_FORMAT " next wakeup should be %" GST_TIME_FORMAT " now %"
929             GST_TIME_FORMAT, track->stream_id,
930             GST_TIME_ARGS (track->level_time),
931             GST_TIME_ARGS (track->input_time), GST_TIME_ARGS (wakeup_time),
932             GST_TIME_ARGS (demux->priv->global_output_position));
933       }
934     }
935
936     if (stream->next_input_wakeup_time != GST_CLOCK_TIME_NONE) {
937       GST_DEBUG_OBJECT (stream,
938           "Next input wakeup time is now %" GST_TIME_FORMAT,
939           GST_TIME_ARGS (stream->next_input_wakeup_time));
940
941       /* If this stream needs waking up sooner than any other current one,
942        * update the period wakeup time, which is what the output loop
943        * will check */
944       GstAdaptiveDemuxPeriod *period = stream->period;
945       if (period->next_input_wakeup_time == GST_CLOCK_STIME_NONE ||
946           period->next_input_wakeup_time > stream->next_input_wakeup_time) {
947         period->next_input_wakeup_time = stream->next_input_wakeup_time;
948       }
949     }
950   }
951
952   if (update_buffering) {
953     demux_update_buffering_locked (demux);
954     demux_post_buffering_locked (demux);
955   }
956
957   TRACKS_UNLOCK (demux);
958
959   return need_to_wait;
960 }
961
962 static GstAdaptiveDemuxTrack *
963 match_parsebin_to_track (GstAdaptiveDemux2Stream * stream, GstPad * pad)
964 {
965   GList *tmp;
966   GstAdaptiveDemuxTrack *found_track = NULL, *first_matched_track = NULL;
967   gint num_possible_tracks = 0;
968   GstStream *gst_stream;
969   const gchar *internal_stream_id;
970   GstStreamType stream_type;
971
972   gst_stream = gst_pad_get_stream (pad);
973
974   /* FIXME: Edward: Added assertion because I don't see in what cases we would
975    * end up with a pad from parsebin which wouldn't have an associated
976    * GstStream. */
977   g_assert (gst_stream);
978
979   internal_stream_id = gst_stream_get_stream_id (gst_stream);
980   stream_type = gst_stream_get_stream_type (gst_stream);
981
982   GST_DEBUG_OBJECT (pad,
983       "Trying to match pad from parsebin with internal streamid %s and caps %"
984       GST_PTR_FORMAT, GST_STR_NULL (internal_stream_id),
985       gst_stream_get_caps (gst_stream));
986
987   /* Try to match directly by the track's pending upstream_stream_id */
988   for (tmp = stream->tracks; tmp; tmp = tmp->next) {
989     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
990
991     if (stream_type != GST_STREAM_TYPE_UNKNOWN && track->type != stream_type)
992       continue;
993
994     GST_DEBUG_OBJECT (pad, "track upstream_stream_id: %s",
995         track->upstream_stream_id);
996
997     if (first_matched_track == NULL)
998       first_matched_track = track;
999     num_possible_tracks++;
1000
1001     /* If this track has a desired upstream stream id, match on it */
1002     if (track->upstream_stream_id == NULL ||
1003         g_strcmp0 (track->upstream_stream_id, internal_stream_id)) {
1004       /* This is not the track for this pad */
1005       continue;
1006     }
1007
1008     /* Remove pending upstream id (we have matched it for the pending
1009      * stream_id) */
1010     g_free (track->upstream_stream_id);
1011     track->upstream_stream_id = NULL;
1012     found_track = track;
1013     break;
1014   }
1015
1016   if (found_track == NULL) {
1017     /* If we arrive here, it means the stream is switching pads after
1018      * the stream has already started running */
1019     /* No track is currently waiting for this particular stream id -
1020      * try and match an existing linked track. If there's only 1 possible,
1021      * take it. */
1022     if (num_possible_tracks == 1 && first_matched_track != NULL) {
1023       GST_LOG_OBJECT (pad, "Only one possible track to link to");
1024       found_track = first_matched_track;
1025     }
1026   }
1027
1028   if (found_track == NULL) {
1029     /* TODO: There are multiple possible tracks, need to match based
1030      * on language code and caps. Have you found a stream like this? */
1031     GST_FIXME_OBJECT (pad, "Need to match track based on caps and language");
1032   }
1033
1034   if (found_track != NULL) {
1035     if (!gst_pad_is_linked (found_track->sinkpad)) {
1036       GST_LOG_OBJECT (pad, "Linking to track pad %" GST_PTR_FORMAT,
1037           found_track->sinkpad);
1038
1039       if (gst_pad_link (pad, found_track->sinkpad) != GST_PAD_LINK_OK) {
1040         GST_ERROR_OBJECT (pad, "Couldn't connect to track sinkpad");
1041         /* FIXME : Do something if we can't link ? */
1042       }
1043     } else {
1044       /* Store pad as pending link */
1045       GST_LOG_OBJECT (pad,
1046           "Remembering pad to be linked when current pad is unlinked");
1047       g_assert (found_track->pending_srcpad == NULL);
1048       found_track->pending_srcpad = gst_object_ref (pad);
1049     }
1050   }
1051
1052   if (gst_stream)
1053     gst_object_unref (gst_stream);
1054
1055   return found_track;
1056 }
1057
1058 static void
1059 parsebin_pad_removed_cb (GstElement * parsebin, GstPad * pad,
1060     GstAdaptiveDemux2Stream * stream)
1061 {
1062   GList *iter;
1063   GST_DEBUG_OBJECT (stream, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1064
1065   /* Remove from pending source pad */
1066   TRACKS_LOCK (stream->demux);
1067   for (iter = stream->tracks; iter; iter = iter->next) {
1068     GstAdaptiveDemuxTrack *track = iter->data;
1069     if (track->pending_srcpad == pad) {
1070       gst_object_unref (track->pending_srcpad);
1071       track->pending_srcpad = NULL;
1072       break;
1073     }
1074   }
1075   TRACKS_UNLOCK (stream->demux);
1076 }
1077
1078 static void
1079 parsebin_pad_added_cb (GstElement * parsebin, GstPad * pad,
1080     GstAdaptiveDemux2Stream * stream)
1081 {
1082   if (!GST_PAD_IS_SRC (pad))
1083     return;
1084
1085   GST_DEBUG_OBJECT (stream, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1086
1087   if (!match_parsebin_to_track (stream, pad))
1088     GST_WARNING_OBJECT (pad, "Found no track to handle pad");
1089
1090   GST_DEBUG_OBJECT (stream->demux, "Done linking");
1091 }
1092
1093 static void
1094 parsebin_deep_element_added_cb (GstBin * parsebin, GstBin * unused,
1095     GstElement * element, GstAdaptiveDemux * demux)
1096 {
1097   if (G_OBJECT_TYPE (element) == tsdemux_type) {
1098     GST_DEBUG_OBJECT (demux, "Overriding tsdemux ignore-pcr to TRUE");
1099     g_object_set (element, "ignore-pcr", TRUE, NULL);
1100   }
1101 }
1102
1103 /* must be called with manifest_lock taken */
1104 static gboolean
1105 gst_adaptive_demux2_stream_create_parser (GstAdaptiveDemux2Stream * stream)
1106 {
1107   GstAdaptiveDemux *demux = stream->demux;
1108
1109   if (stream->parsebin == NULL) {
1110     GstEvent *event;
1111
1112     GST_DEBUG_OBJECT (demux, "Setting up new parsing source");
1113
1114     /* Workaround to detect if tsdemux is being used */
1115     if (tsdemux_type == 0) {
1116       GstElement *element = gst_element_factory_make ("tsdemux", NULL);
1117       if (element) {
1118         tsdemux_type = G_OBJECT_TYPE (element);
1119         gst_object_unref (element);
1120       }
1121     }
1122
1123     stream->parsebin = gst_element_factory_make ("parsebin", NULL);
1124     if (tsdemux_type)
1125       g_signal_connect (stream->parsebin, "deep-element-added",
1126           (GCallback) parsebin_deep_element_added_cb, demux);
1127     gst_bin_add (GST_BIN_CAST (demux), gst_object_ref (stream->parsebin));
1128     stream->parsebin_sink =
1129         gst_element_get_static_pad (stream->parsebin, "sink");
1130     stream->pad_added_id = g_signal_connect (stream->parsebin, "pad-added",
1131         G_CALLBACK (parsebin_pad_added_cb), stream);
1132     stream->pad_removed_id = g_signal_connect (stream->parsebin, "pad-removed",
1133         G_CALLBACK (parsebin_pad_removed_cb), stream);
1134
1135     event = gst_event_new_stream_start ("bogus");
1136     if (demux->have_group_id)
1137       gst_event_set_group_id (event, demux->group_id);
1138
1139     gst_pad_send_event (stream->parsebin_sink, event);
1140
1141     /* Not sure if these need to be outside the manifest lock: */
1142     gst_element_sync_state_with_parent (stream->parsebin);
1143     stream->last_status_code = 200;     /* default to OK */
1144   }
1145   return TRUE;
1146 }
1147
1148 static void
1149 on_download_cancellation (DownloadRequest * request, DownloadRequestState state,
1150     GstAdaptiveDemux2Stream * stream)
1151 {
1152 }
1153
1154 static void
1155 on_download_error (DownloadRequest * request, DownloadRequestState state,
1156     GstAdaptiveDemux2Stream * stream)
1157 {
1158   GstAdaptiveDemux *demux = stream->demux;
1159   guint last_status_code = request->status_code;
1160   gboolean live;
1161
1162   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING) {
1163     GST_DEBUG_OBJECT (stream, "Stream state changed to %d. Aborting",
1164         stream->state);
1165     return;
1166   }
1167
1168   stream->download_active = FALSE;
1169   stream->last_status_code = last_status_code;
1170
1171   GST_DEBUG_OBJECT (stream,
1172       "Download finished with error, request state %d http status %u, dc %d",
1173       request->state, last_status_code, stream->download_error_count);
1174
1175   live = gst_adaptive_demux_is_live (demux);
1176   if (((last_status_code / 100 == 4 && live)
1177           || last_status_code / 100 == 5)) {
1178     /* 4xx/5xx */
1179     /* if current position is before available start, switch to next */
1180     if (!gst_adaptive_demux2_stream_has_next_fragment (demux, stream))
1181       goto flushing;
1182
1183     if (live) {
1184       gint64 range_start, range_stop;
1185
1186       if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
1187               &range_stop))
1188         goto flushing;
1189
1190       if (demux->segment.position < range_start) {
1191         GstFlowReturn ret;
1192
1193         GST_DEBUG_OBJECT (stream, "Retrying once with next segment");
1194         gst_adaptive_demux2_stream_finish_download (stream, GST_FLOW_EOS, NULL);
1195
1196         GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1197
1198         ret = gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
1199         GST_DEBUG_OBJECT (stream, "update_fragment_info ret: %s",
1200             gst_flow_get_name (ret));
1201
1202         if (ret == GST_FLOW_OK)
1203           goto again;
1204
1205       } else if (demux->segment.position > range_stop) {
1206         /* wait a bit to be in range, we don't have any locks at that point */
1207         GstClockTime wait_time =
1208             gst_adaptive_demux2_stream_get_fragment_waiting_time (demux,
1209             stream);
1210         if (wait_time > 0) {
1211           GST_DEBUG_OBJECT (stream,
1212               "Download waiting for %" GST_TIME_FORMAT,
1213               GST_TIME_ARGS (wait_time));
1214           g_assert (stream->pending_cb_id == 0);
1215           GST_LOG_OBJECT (stream, "Scheduling delayed load_a_fragment() call");
1216           stream->pending_cb_id =
1217               gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1218               wait_time,
1219               (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1220               gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1221         }
1222       }
1223     }
1224
1225   flushing:
1226     if (stream->download_error_count >= MAX_DOWNLOAD_ERROR_COUNT) {
1227       /* looks like there is no way of knowing when a live stream has ended
1228        * Have to assume we are falling behind and cause a manifest reload */
1229       GST_DEBUG_OBJECT (stream, "Converting error of live stream to EOS");
1230       gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1231       return;
1232     }
1233   } else if (!gst_adaptive_demux2_stream_has_next_fragment (demux, stream)) {
1234     /* If this is the last fragment, consider failures EOS and not actual
1235      * errors. Due to rounding errors in the durations, the last fragment
1236      * might not actually exist */
1237     GST_DEBUG_OBJECT (stream, "Converting error for last fragment to EOS");
1238     gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1239     return;
1240   } else {
1241     /* retry same segment */
1242     if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
1243       gst_adaptive_demux2_stream_error (stream);
1244       return;
1245     }
1246     goto again;
1247   }
1248
1249 again:
1250   /* wait a short time in case the server needs a bit to recover */
1251   GST_LOG_OBJECT (stream,
1252       "Scheduling delayed load_a_fragment() call to retry in 10 milliseconds");
1253   g_assert (stream->pending_cb_id == 0);
1254   stream->pending_cb_id = gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task, 10 * GST_MSECOND,  /* Retry in 10 ms */
1255       (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1256       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1257 }
1258
1259 static void
1260 update_stream_bitrate (GstAdaptiveDemux2Stream * stream,
1261     DownloadRequest * request)
1262 {
1263   GstClockTimeDiff last_download_duration;
1264   guint64 fragment_bytes_downloaded = request->content_received;
1265
1266   /* The stream last_download time tracks the full download time for now */
1267   stream->last_download_time =
1268       GST_CLOCK_DIFF (request->download_request_time,
1269       request->download_end_time);
1270
1271   /* Here we only track the time the data took to arrive and ignore request delay, so we can estimate bitrate */
1272   last_download_duration =
1273       GST_CLOCK_DIFF (request->download_start_time, request->download_end_time);
1274
1275   /* If the entire response arrived in the first buffer
1276    * though, include the request time to get a valid
1277    * bitrate estimate */
1278   if (last_download_duration < 2 * stream->last_download_time)
1279     last_download_duration = stream->last_download_time;
1280
1281   if (last_download_duration > 0) {
1282     stream->last_bitrate =
1283         gst_util_uint64_scale (fragment_bytes_downloaded,
1284         8 * GST_SECOND, last_download_duration);
1285
1286     GST_DEBUG_OBJECT (stream,
1287         "Updated stream bitrate. %" G_GUINT64_FORMAT
1288         " bytes. download time %" GST_TIME_FORMAT " bitrate %"
1289         G_GUINT64_FORMAT " bps", fragment_bytes_downloaded,
1290         GST_TIME_ARGS (last_download_duration), stream->last_bitrate);
1291   }
1292 }
1293
1294 static void
1295 on_download_progress (DownloadRequest * request, DownloadRequestState state,
1296     GstAdaptiveDemux2Stream * stream)
1297 {
1298   GstAdaptiveDemux *demux = stream->demux;
1299   GstBuffer *buffer = download_request_take_buffer (request);
1300
1301   if (buffer) {
1302     GstFlowReturn ret;
1303
1304     GST_DEBUG_OBJECT (stream,
1305         "Handling buffer of %" G_GSIZE_FORMAT
1306         " bytes of ongoing download progress - %" G_GUINT64_FORMAT " / %"
1307         G_GUINT64_FORMAT " bytes", gst_buffer_get_size (buffer),
1308         request->content_received, request->content_length);
1309
1310     /* Drop the request lock when parsing data. FIXME: Check and comment why this is needed */
1311     download_request_unlock (request);
1312     ret = gst_adaptive_demux2_stream_parse_buffer (stream, buffer);
1313     download_request_lock (request);
1314
1315     if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
1316       return;
1317
1318     if (ret != GST_FLOW_OK) {
1319       GST_DEBUG_OBJECT (stream,
1320           "Buffer parsing returned: %d %s. Aborting download", ret,
1321           gst_flow_get_name (ret));
1322
1323       if (!stream->downloading_header && !stream->downloading_index)
1324         update_stream_bitrate (stream, request);
1325
1326       downloadhelper_cancel_request (demux->download_helper, request);
1327
1328       /* cancellation is async, so recycle our download request to avoid races */
1329       download_request_unref (stream->download_request);
1330       stream->download_request = download_request_new ();
1331
1332       gst_adaptive_demux2_stream_finish_download (stream, ret, NULL);
1333     }
1334   }
1335 }
1336
1337 static void
1338 on_download_complete (DownloadRequest * request, DownloadRequestState state,
1339     GstAdaptiveDemux2Stream * stream)
1340 {
1341   GstFlowReturn ret = GST_FLOW_OK;
1342   GstBuffer *buffer;
1343
1344   stream->download_active = FALSE;
1345
1346   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING) {
1347     GST_DEBUG_OBJECT (stream, "Stream state changed to %d. Aborting",
1348         stream->state);
1349     return;
1350   }
1351
1352   GST_DEBUG_OBJECT (stream,
1353       "Stream %p %s download for %s is complete with state %d",
1354       stream, uritype (stream), request->uri, request->state);
1355
1356   /* Update bitrate for fragment downloads */
1357   if (!stream->downloading_header && !stream->downloading_index)
1358     update_stream_bitrate (stream, request);
1359
1360   buffer = download_request_take_buffer (request);
1361   if (buffer)
1362     ret = gst_adaptive_demux2_stream_parse_buffer (stream, buffer);
1363
1364   GST_DEBUG_OBJECT (stream,
1365       "%s download finished: %s ret %d %s. Stream state %d", uritype (stream),
1366       request->uri, ret, gst_flow_get_name (ret), stream->state);
1367
1368   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
1369     return;
1370
1371   g_assert (stream->pending_cb_id == 0);
1372   gst_adaptive_demux2_stream_finish_download (stream, ret, NULL);
1373 }
1374
1375 /* must be called from the scheduler context
1376  *
1377  * Will submit the request only, which will complete asynchronously
1378  */
1379 static GstFlowReturn
1380 gst_adaptive_demux2_stream_begin_download_uri (GstAdaptiveDemux * demux,
1381     GstAdaptiveDemux2Stream * stream, const gchar * uri, gint64 start,
1382     gint64 end)
1383 {
1384   DownloadRequest *request = stream->download_request;
1385
1386   GST_DEBUG_OBJECT (demux,
1387       "Downloading %s uri: %s, range:%" G_GINT64_FORMAT " - %" G_GINT64_FORMAT,
1388       uritype (stream), uri, start, end);
1389
1390   if (!gst_adaptive_demux2_stream_create_parser (stream))
1391     return GST_FLOW_ERROR;
1392
1393   /* Configure our download request */
1394   download_request_set_uri (request, uri, start, end);
1395
1396   if (stream->downloading_header || stream->downloading_index) {
1397     download_request_set_callbacks (request,
1398         (DownloadRequestEventCallback) on_download_complete,
1399         (DownloadRequestEventCallback) on_download_error,
1400         (DownloadRequestEventCallback) on_download_cancellation,
1401         (DownloadRequestEventCallback) NULL, stream);
1402   } else {
1403     download_request_set_callbacks (request,
1404         (DownloadRequestEventCallback) on_download_complete,
1405         (DownloadRequestEventCallback) on_download_error,
1406         (DownloadRequestEventCallback) on_download_cancellation,
1407         (DownloadRequestEventCallback) on_download_progress, stream);
1408   }
1409
1410   if (!downloadhelper_submit_request (demux->download_helper,
1411           demux->manifest_uri, DOWNLOAD_FLAG_NONE, request, NULL))
1412     return GST_FLOW_ERROR;
1413
1414   stream->download_active = TRUE;
1415
1416   return GST_FLOW_OK;
1417 }
1418
1419 /* must be called from the scheduler context */
1420 static GstFlowReturn
1421 gst_adaptive_demux2_stream_download_fragment (GstAdaptiveDemux2Stream * stream)
1422 {
1423   GstAdaptiveDemux *demux = stream->demux;
1424   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1425   gchar *url = NULL;
1426
1427   /* FIXME :  */
1428   /* THERE ARE THREE DIFFERENT VARIABLES FOR THE "BEGINNING" OF A FRAGMENT ! */
1429   if (stream->starting_fragment) {
1430     GST_DEBUG_OBJECT (stream, "Downloading %s%s%s",
1431         stream->fragment.uri ? "FRAGMENT " : "",
1432         stream->need_header && stream->fragment.header_uri ? "HEADER " : "",
1433         stream->need_index && stream->fragment.index_uri ? "INDEX" : "");
1434
1435     if (stream->fragment.uri == NULL && stream->fragment.header_uri == NULL &&
1436         stream->fragment.index_uri == NULL)
1437       goto no_url_error;
1438
1439     stream->first_fragment_buffer = TRUE;
1440     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING;
1441   }
1442
1443   if (stream->need_header && stream->fragment.header_uri != NULL) {
1444
1445     /* Set the need_index flag when we start the header if we'll also need the index */
1446     stream->need_index = (stream->fragment.index_uri != NULL);
1447
1448     GST_DEBUG_OBJECT (stream, "Fetching header %s %" G_GINT64_FORMAT "-%"
1449         G_GINT64_FORMAT, stream->fragment.header_uri,
1450         stream->fragment.header_range_start, stream->fragment.header_range_end);
1451
1452     stream->downloading_header = TRUE;
1453
1454     return gst_adaptive_demux2_stream_begin_download_uri (demux, stream,
1455         stream->fragment.header_uri, stream->fragment.header_range_start,
1456         stream->fragment.header_range_end);
1457   }
1458
1459   /* check if we have an index */
1460   if (stream->need_index && stream->fragment.index_uri != NULL) {
1461     GST_DEBUG_OBJECT (stream,
1462         "Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
1463         stream->fragment.index_uri,
1464         stream->fragment.index_range_start, stream->fragment.index_range_end);
1465
1466     stream->downloading_index = TRUE;
1467
1468     return gst_adaptive_demux2_stream_begin_download_uri (demux, stream,
1469         stream->fragment.index_uri, stream->fragment.index_range_start,
1470         stream->fragment.index_range_end);
1471   }
1472
1473   url = stream->fragment.uri;
1474   GST_DEBUG_OBJECT (stream, "Got url '%s' for stream %p", url, stream);
1475   if (!url)
1476     return GST_FLOW_OK;
1477
1478   /* Download the actual fragment, either in chunks or in one go */
1479   stream->first_fragment_buffer = TRUE;
1480
1481   if (klass->need_another_chunk && klass->need_another_chunk (stream)
1482       && stream->fragment.chunk_size != 0) {
1483     /* Handle chunk downloading */
1484     gint64 range_start = stream->fragment.range_start;
1485     gint64 range_end = stream->fragment.range_end;
1486     gint chunk_size = stream->fragment.chunk_size;
1487     gint64 chunk_end;
1488
1489     /* HTTP ranges are inclusive for the end */
1490     if (chunk_size != -1) {
1491       chunk_end = range_start + chunk_size - 1;
1492       if (range_end != -1 && range_end < chunk_end)
1493         chunk_end = range_end;
1494     } else {
1495       chunk_end = range_end;
1496     }
1497
1498     GST_DEBUG_OBJECT (stream,
1499         "Starting chunked download %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
1500         url, range_start, chunk_end);
1501     return gst_adaptive_demux2_stream_begin_download_uri (demux, stream, url,
1502         range_start, chunk_end);
1503   }
1504
1505   /* regular single chunk download */
1506   stream->fragment.chunk_size = 0;
1507
1508   return gst_adaptive_demux2_stream_begin_download_uri (demux, stream, url,
1509       stream->fragment.range_start, stream->fragment.range_end);
1510
1511 no_url_error:
1512   {
1513     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1514         (_("Failed to get fragment URL.")),
1515         ("An error happened when getting fragment URL"));
1516     return GST_FLOW_ERROR;
1517   }
1518 }
1519
1520 static gboolean
1521 gst_adaptive_demux2_stream_push_event (GstAdaptiveDemux2Stream * stream,
1522     GstEvent * event)
1523 {
1524   gboolean ret = TRUE;
1525   GstPad *pad;
1526
1527   /* If there's a parsebin, push the event through it */
1528   if (stream->parsebin_sink != NULL) {
1529     pad = gst_object_ref (stream->parsebin_sink);
1530     GST_DEBUG_OBJECT (pad, "Pushing event %" GST_PTR_FORMAT, event);
1531     ret = gst_pad_send_event (pad, gst_event_ref (event));
1532     gst_object_unref (pad);
1533   }
1534
1535   /* If the event is EOS, ensure that all tracks are EOS. This catches
1536    * the case where the parsebin hasn't parsed anything yet (we switched
1537    * to a never before used track right near EOS, or it didn't parse enough
1538    * to create pads and be able to send EOS through to the tracks.
1539    *
1540    * We don't need to care about any other events
1541    */
1542   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
1543     GList *iter;
1544
1545     for (iter = stream->tracks; iter; iter = iter->next) {
1546       GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
1547       ret &= gst_pad_send_event (track->sinkpad, gst_event_ref (event));
1548     }
1549   }
1550
1551   gst_event_unref (event);
1552   return ret;
1553 }
1554
1555 static void
1556 gst_adaptive_demux2_stream_error (GstAdaptiveDemux2Stream * stream)
1557 {
1558   GstAdaptiveDemux *demux = stream->demux;
1559   GstMessage *msg;
1560   GstStructure *details;
1561
1562   details = gst_structure_new_empty ("details");
1563   gst_structure_set (details, "http-status-code", G_TYPE_UINT,
1564       stream->last_status_code, NULL);
1565
1566   stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_ERRORED;
1567
1568   if (stream->last_error) {
1569     gchar *debug = g_strdup_printf ("Error on stream %s",
1570         GST_OBJECT_NAME (stream));
1571     msg =
1572         gst_message_new_error_with_details (GST_OBJECT_CAST (demux),
1573         stream->last_error, debug, details);
1574     GST_ERROR_OBJECT (stream, "Download error: %s",
1575         stream->last_error->message);
1576     g_free (debug);
1577   } else {
1578     GError *err = g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_NOT_FOUND,
1579         _("Couldn't download fragments"));
1580     msg =
1581         gst_message_new_error_with_details (GST_OBJECT_CAST (demux), err,
1582         "Fragment downloading has failed consecutive times", details);
1583     g_error_free (err);
1584     GST_ERROR_OBJECT (stream,
1585         "Download error: Couldn't download fragments, too many failures");
1586   }
1587
1588   gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
1589 }
1590
1591 /* Called when a stream reaches the end of a playback segment */
1592 static void
1593 gst_adaptive_demux2_stream_end_of_manifest (GstAdaptiveDemux2Stream * stream)
1594 {
1595   GstAdaptiveDemux *demux = stream->demux;
1596   GstFlowReturn combined =
1597       gst_adaptive_demux_period_combine_stream_flows (demux->input_period);
1598
1599   GST_DEBUG_OBJECT (stream, "Combined flow %s", gst_flow_get_name (combined));
1600
1601   if (gst_adaptive_demux_has_next_period (demux)) {
1602     if (combined == GST_FLOW_EOS) {
1603       GST_DEBUG_OBJECT (stream, "Next period available, advancing");
1604       gst_adaptive_demux_advance_period (demux);
1605     } else {
1606       /* Ensure the 'has_next_period' flag is set on the period before
1607        * pushing EOS to the stream, so that the output loop knows not
1608        * to actually output the event */
1609       GST_DEBUG_OBJECT (stream, "Marking current period has a next one");
1610       demux->input_period->has_next_period = TRUE;
1611     }
1612   }
1613
1614   if (demux->priv->outputs) {
1615     GstEvent *eos = gst_event_new_eos ();
1616
1617     GST_DEBUG_OBJECT (stream, "Stream is EOS. Stopping.");
1618     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS;
1619
1620     gst_event_set_seqnum (eos, stream->demux->priv->segment_seqnum);
1621     gst_adaptive_demux2_stream_push_event (stream, eos);
1622   } else {
1623     GST_ERROR_OBJECT (demux, "Can't push EOS on non-exposed pad");
1624     gst_adaptive_demux2_stream_error (stream);
1625   }
1626 }
1627
1628 static gboolean
1629 gst_adaptive_demux2_stream_reload_manifest_cb (GstAdaptiveDemux2Stream * stream)
1630 {
1631   GstAdaptiveDemux *demux = stream->demux;
1632
1633   gboolean is_live = gst_adaptive_demux_is_live (demux);
1634
1635   stream->pending_cb_id = 0;
1636
1637   /* Refetch the playlist now after we waited */
1638   /* FIXME: Make this manifest update async and handle it on completion */
1639   if (!is_live && gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
1640     GST_DEBUG_OBJECT (demux, "Updated the playlist");
1641   }
1642
1643   /* We were called here from a timeout, so if the load function wants to loop
1644    * again, schedule an immediate callback but return G_SOURCE_REMOVE either
1645    * way */
1646   while (gst_adaptive_demux2_stream_next_download (stream));
1647
1648   return G_SOURCE_REMOVE;
1649 }
1650
1651 static gboolean
1652 gst_adaptive_demux2_stream_on_output_space_available_cb (GstAdaptiveDemux2Stream
1653     * stream)
1654 {
1655   /* If the state already moved on, the stream was stopped, or another track
1656    * already woke up and needed data */
1657   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE)
1658     return G_SOURCE_REMOVE;
1659
1660   GstAdaptiveDemux *demux = stream->demux;
1661   TRACKS_LOCK (demux);
1662
1663   GList *iter;
1664   for (iter = stream->tracks; iter; iter = iter->next) {
1665     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
1666
1667     /* We need to recompute the track's level_time value, as the
1668      * global output position may have advanced and reduced the
1669      * value, even without anything being dequeued yet */
1670     gst_adaptive_demux_track_update_level_locked (track);
1671
1672     GST_DEBUG_OBJECT (stream, "track %s woken level %" GST_TIME_FORMAT
1673         " input position %" GST_TIME_FORMAT " at %" GST_TIME_FORMAT,
1674         track->stream_id, GST_TIME_ARGS (track->level_time),
1675         GST_TIME_ARGS (track->input_time),
1676         GST_TIME_ARGS (demux->priv->global_output_position));
1677   }
1678   TRACKS_UNLOCK (demux);
1679
1680   while (gst_adaptive_demux2_stream_load_a_fragment (stream));
1681
1682   return G_SOURCE_REMOVE;
1683 }
1684
1685 void
1686 gst_adaptive_demux2_stream_on_output_space_available (GstAdaptiveDemux2Stream *
1687     stream)
1688 {
1689   GstAdaptiveDemux *demux = stream->demux;
1690
1691   stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
1692
1693   GST_LOG_OBJECT (stream, "Scheduling output_space_available() call");
1694
1695   gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
1696       (GSourceFunc) gst_adaptive_demux2_stream_on_output_space_available_cb,
1697       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1698 }
1699
1700 void
1701 gst_adaptive_demux2_stream_on_manifest_update (GstAdaptiveDemux2Stream * stream)
1702 {
1703   GstAdaptiveDemux *demux = stream->demux;
1704
1705   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE)
1706     return;
1707
1708   g_assert (stream->pending_cb_id == 0);
1709
1710   GST_LOG_OBJECT (stream, "Scheduling load_a_fragment() call");
1711   stream->pending_cb_id =
1712       gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
1713       (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1714       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1715 }
1716
1717 static void
1718 gst_adaptive_demux2_stream_handle_playlist_eos (GstAdaptiveDemux2Stream *
1719     stream)
1720 {
1721   GstAdaptiveDemux *demux = stream->demux;
1722
1723   if (gst_adaptive_demux_is_live (demux) && (demux->segment.rate == 1.0
1724           || gst_adaptive_demux2_stream_in_live_seek_range (demux, stream))) {
1725
1726     if (!gst_adaptive_demux_has_next_period (demux)) {
1727       /* Wait only if we can ensure current manifest has been expired.
1728        * The meaning "we have next period" *WITH* EOS is that, current
1729        * period has been ended but we can continue to the next period */
1730       GST_DEBUG_OBJECT (stream,
1731           "Live playlist EOS - waiting for manifest update");
1732       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE;
1733       /* Clear the stream last_ret EOS state, since we're not actually EOS */
1734       if (stream->last_ret == GST_FLOW_EOS)
1735         stream->last_ret = GST_FLOW_OK;
1736       gst_adaptive_demux2_stream_wants_manifest_update (demux);
1737       return;
1738     }
1739   }
1740
1741   gst_adaptive_demux2_stream_end_of_manifest (stream);
1742 }
1743
1744 static gboolean
1745 gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream)
1746 {
1747   GstAdaptiveDemux *demux = stream->demux;
1748   gboolean live = gst_adaptive_demux_is_live (demux);
1749   GstFlowReturn ret = GST_FLOW_OK;
1750
1751   stream->pending_cb_id = 0;
1752
1753   GST_LOG_OBJECT (stream, "entering, state = %d.", stream->state);
1754
1755   switch (stream->state) {
1756     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART:
1757     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT:
1758     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_LIVE:
1759     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE:
1760     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE:
1761       /* Get information about the fragment to download */
1762       GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1763       ret = gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
1764       GST_DEBUG_OBJECT (stream,
1765           "Fragment info update result: %d %s", ret, gst_flow_get_name (ret));
1766
1767       if (ret == GST_FLOW_OK)
1768         stream->starting_fragment = TRUE;
1769       break;
1770     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING:
1771       break;
1772     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS:
1773       GST_ERROR_OBJECT (stream,
1774           "Unexpected stream state EOS. The stream should not be running now.");
1775       return FALSE;
1776     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED:
1777       /* The stream was stopped. Just finish up */
1778       return FALSE;
1779     default:
1780       GST_ERROR_OBJECT (stream, "Unexpected stream state %d", stream->state);
1781       g_assert_not_reached ();
1782       break;
1783   }
1784
1785   if (ret == GST_FLOW_OK) {
1786     /* Wait for room in the output tracks */
1787     if (gst_adaptive_demux2_stream_wait_for_output_space (demux, stream,
1788             stream->fragment.duration)) {
1789       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE;
1790       return FALSE;
1791     }
1792   }
1793
1794   if (ret == GST_FLOW_OK) {
1795     /* wait for live fragments to be available */
1796     if (live) {
1797       GstClockTime wait_time =
1798           gst_adaptive_demux2_stream_get_fragment_waiting_time (demux, stream);
1799       if (wait_time > 0) {
1800         GST_DEBUG_OBJECT (stream,
1801             "Download waiting for %" GST_TIME_FORMAT,
1802             GST_TIME_ARGS (wait_time));
1803
1804         stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_LIVE;
1805
1806         GST_LOG_OBJECT (stream, "Scheduling delayed load_a_fragment() call");
1807         g_assert (stream->pending_cb_id == 0);
1808         stream->pending_cb_id =
1809             gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1810             wait_time, (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1811             gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1812         return FALSE;
1813       }
1814     }
1815
1816     if (gst_adaptive_demux2_stream_download_fragment (stream) != GST_FLOW_OK) {
1817       GST_ERROR_OBJECT (demux,
1818           "Failed to begin fragment download for stream %p", stream);
1819       return FALSE;
1820     }
1821   }
1822
1823   /* Cast to int avoids a compiler warning that
1824    * GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC is not in the GstFlowReturn enum */
1825   switch ((int) ret) {
1826     case GST_FLOW_OK:
1827       break;                    /* all is good, let's go */
1828     case GST_FLOW_EOS:
1829       GST_DEBUG_OBJECT (stream, "EOS, checking to stop download loop");
1830       stream->last_ret = ret;
1831       gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1832       return FALSE;
1833     case GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC:
1834       GST_DEBUG_OBJECT (stream, "Lost sync, asking reset to current position");
1835       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
1836       gst_adaptive_demux_handle_lost_sync (demux);
1837       return FALSE;
1838     case GST_FLOW_NOT_LINKED:
1839     {
1840       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS;
1841
1842       if (gst_adaptive_demux_period_combine_stream_flows (demux->input_period)
1843           == GST_FLOW_NOT_LINKED) {
1844         GST_ELEMENT_FLOW_ERROR (demux, ret);
1845       }
1846     }
1847       break;
1848
1849     case GST_FLOW_FLUSHING:
1850       /* Flushing is normal, the target track might have been unselected */
1851       GST_DEBUG_OBJECT (stream, "Got flushing return. Stopping callback.");
1852       return FALSE;
1853     default:
1854       if (ret <= GST_FLOW_ERROR) {
1855         GST_WARNING_OBJECT (demux, "Error while downloading fragment");
1856         if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
1857           gst_adaptive_demux2_stream_error (stream);
1858           return FALSE;
1859         }
1860
1861         g_clear_error (&stream->last_error);
1862
1863         /* First try to update the playlist for non-live playlists
1864          * in case the URIs have changed in the meantime. But only
1865          * try it the first time, after that we're going to wait a
1866          * a bit to not flood the server */
1867         if (stream->download_error_count == 1
1868             && !gst_adaptive_demux_is_live (demux)) {
1869           /* TODO hlsdemux had more options to this function (boolean and err) */
1870           if (gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
1871             /* Retry immediately, the playlist actually has changed */
1872             GST_DEBUG_OBJECT (demux, "Updated the playlist");
1873             return TRUE;
1874           }
1875         }
1876
1877         /* Wait half the fragment duration before retrying */
1878         GST_LOG_OBJECT (stream, "Scheduling delayed reload_manifest_cb() call");
1879         g_assert (stream->pending_cb_id == 0);
1880         stream->pending_cb_id =
1881             gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1882             stream->fragment.duration / 2,
1883             (GSourceFunc) gst_adaptive_demux2_stream_reload_manifest_cb,
1884             gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1885         return FALSE;
1886       }
1887       break;
1888   }
1889
1890   return FALSE;
1891 }
1892
1893 static gboolean
1894 gst_adaptive_demux2_stream_next_download (GstAdaptiveDemux2Stream * stream)
1895 {
1896   GstAdaptiveDemux *demux = stream->demux;
1897   gboolean end_of_manifest = FALSE;
1898
1899   GST_LOG_OBJECT (stream, "Looking for next download");
1900
1901   /* Restarting download, figure out new position
1902    * FIXME : Move this to a separate function ? */
1903   if (G_UNLIKELY (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART)) {
1904     GstClockTimeDiff stream_time = 0;
1905
1906     GST_DEBUG_OBJECT (stream, "Activating stream after restart");
1907
1908     if (stream->parsebin_sink != NULL) {
1909       /* If the parsebin already exists, we need to clear it out (if it doesn't,
1910        * this is the first time we've used this stream, so it's all good) */
1911       gst_adaptive_demux2_stream_push_event (stream,
1912           gst_event_new_flush_start ());
1913       gst_adaptive_demux2_stream_push_event (stream,
1914           gst_event_new_flush_stop (FALSE));
1915     }
1916
1917     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1918     stream_time = stream->start_position;
1919
1920     GST_DEBUG_OBJECT (stream, "Restarting stream at "
1921         "stream position %" GST_STIME_FORMAT, GST_STIME_ARGS (stream_time));
1922
1923     if (GST_CLOCK_STIME_IS_VALID (stream_time)) {
1924       /* TODO check return */
1925       gst_adaptive_demux2_stream_seek (demux, stream, demux->segment.rate >= 0,
1926           0, stream_time, &stream_time);
1927       stream->current_position = stream->start_position;
1928
1929       GST_DEBUG_OBJECT (stream,
1930           "stream_time after restart seek: %" GST_STIME_FORMAT
1931           " position %" GST_STIME_FORMAT, GST_STIME_ARGS (stream_time),
1932           GST_STIME_ARGS (stream->current_position));
1933     }
1934
1935     /* Trigger (re)computation of the parsebin input segment */
1936     stream->compute_segment = TRUE;
1937
1938     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1939
1940     stream->discont = TRUE;
1941     stream->need_header = TRUE;
1942     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
1943   }
1944
1945   /* Check if we're done with our segment */
1946   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1947   if (demux->segment.rate > 0) {
1948     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop)
1949         && stream->current_position >= demux->segment.stop) {
1950       end_of_manifest = TRUE;
1951     }
1952   } else {
1953     if (GST_CLOCK_TIME_IS_VALID (demux->segment.start)
1954         && stream->current_position <= demux->segment.start) {
1955       end_of_manifest = TRUE;
1956     }
1957   }
1958   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1959
1960   if (end_of_manifest) {
1961     gst_adaptive_demux2_stream_end_of_manifest (stream);
1962     return FALSE;
1963   }
1964   return gst_adaptive_demux2_stream_load_a_fragment (stream);
1965 }
1966
1967 static gboolean
1968 gst_adaptive_demux2_stream_can_start (GstAdaptiveDemux2Stream * stream)
1969 {
1970   GstAdaptiveDemux *demux = stream->demux;
1971   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1972
1973   if (!klass->stream_can_start)
1974     return TRUE;
1975   return klass->stream_can_start (demux, stream);
1976 }
1977
1978 /**
1979  * gst_adaptive_demux2_stream_start:
1980  * @stream: a #GstAdaptiveDemux2Stream
1981  *
1982  * Start the given @stream. Should be called by subclasses that previously
1983  * returned %FALSE in `GstAdaptiveDemux::stream_can_start()`
1984  */
1985 void
1986 gst_adaptive_demux2_stream_start (GstAdaptiveDemux2Stream * stream)
1987 {
1988   GstAdaptiveDemux *demux;
1989
1990   g_return_if_fail (stream && stream->demux);
1991
1992   demux = stream->demux;
1993
1994   if (stream->pending_cb_id != 0 || stream->download_active) {
1995     /* There is already something active / pending on this stream */
1996     GST_LOG_OBJECT (stream, "Stream already running");
1997     return;
1998   }
1999
2000   /* Some streams require a delayed start, i.e. they need more information
2001    * before they can actually be started */
2002   if (!gst_adaptive_demux2_stream_can_start (stream)) {
2003     GST_LOG_OBJECT (stream, "Stream will be started asynchronously");
2004     return;
2005   }
2006
2007   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS) {
2008     GST_LOG_OBJECT (stream, "Stream is EOS already");
2009     return;
2010   }
2011
2012   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED ||
2013       stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART) {
2014     GST_LOG_OBJECT (stream, "Activating stream. Current state %d",
2015         stream->state);
2016     stream->last_ret = GST_FLOW_OK;
2017
2018     if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)
2019       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
2020   }
2021
2022   GST_LOG_OBJECT (stream, "Scheduling next_download() call");
2023   stream->pending_cb_id =
2024       gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2025       (GSourceFunc) gst_adaptive_demux2_stream_next_download,
2026       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
2027 }
2028
2029 void
2030 gst_adaptive_demux2_stream_stop (GstAdaptiveDemux2Stream * stream)
2031 {
2032   GstAdaptiveDemux *demux = stream->demux;
2033
2034   GST_DEBUG_OBJECT (stream, "Stopping stream (from state %d)", stream->state);
2035   stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
2036
2037   if (stream->pending_cb_id != 0) {
2038     gst_adaptive_demux_loop_cancel_call (demux->priv->scheduler_task,
2039         stream->pending_cb_id);
2040     stream->pending_cb_id = 0;
2041   }
2042
2043   /* Cancel and drop the existing download request */
2044   downloadhelper_cancel_request (demux->download_helper,
2045       stream->download_request);
2046   download_request_unref (stream->download_request);
2047   stream->downloading_header = stream->downloading_index = FALSE;
2048   stream->download_request = download_request_new ();
2049   stream->download_active = FALSE;
2050
2051   stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
2052 }
2053
2054 gboolean
2055 gst_adaptive_demux2_stream_is_running (GstAdaptiveDemux2Stream * stream)
2056 {
2057   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)
2058     return FALSE;
2059   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART)
2060     return FALSE;
2061   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS)
2062     return FALSE;
2063   return TRUE;
2064 }
2065
2066 gboolean
2067 gst_adaptive_demux2_stream_is_selected_locked (GstAdaptiveDemux2Stream * stream)
2068 {
2069   GList *tmp;
2070
2071   for (tmp = stream->tracks; tmp; tmp = tmp->next) {
2072     GstAdaptiveDemuxTrack *track = tmp->data;
2073     if (track->selected)
2074       return TRUE;
2075   }
2076
2077   return FALSE;
2078 }
2079
2080 /**
2081  * gst_adaptive_demux2_stream_is_selected:
2082  * @stream: A #GstAdaptiveDemux2Stream
2083  *
2084  * Returns: %TRUE if any of the tracks targetted by @stream is selected
2085  */
2086 gboolean
2087 gst_adaptive_demux2_stream_is_selected (GstAdaptiveDemux2Stream * stream)
2088 {
2089   gboolean ret;
2090
2091   g_return_val_if_fail (stream && stream->demux, FALSE);
2092
2093   TRACKS_LOCK (stream->demux);
2094   ret = gst_adaptive_demux2_stream_is_selected_locked (stream);
2095   TRACKS_UNLOCK (stream->demux);
2096
2097   return ret;
2098 }