adaptivedemux2: Improve minimum buffering threshold
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / ext / adaptivedemux2 / gstadaptivedemux-stream.c
1 /* GStreamer
2  *
3  * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4  *   Author: Thiago Santos <thiagoss@osg.samsung.com>
5  *
6  * Copyright (C) 2021-2022 Centricular Ltd
7  *   Author: Edward Hervey <edward@centricular.com>
8  *   Author: Jan Schmidt <jan@centricular.com>
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29
30 #include "gstadaptivedemux.h"
31 #include "gstadaptivedemux-private.h"
32
33 #include <glib/gi18n-lib.h>
34 #include <gst/app/gstappsrc.h>
35
36 GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
37 #define GST_CAT_DEFAULT adaptivedemux2_debug
38
39 static void gst_adaptive_demux2_stream_finalize (GObject * object);
40 static void gst_adaptive_demux2_stream_error (GstAdaptiveDemux2Stream * stream);
41
42 #define gst_adaptive_demux2_stream_parent_class parent_class
43 G_DEFINE_ABSTRACT_TYPE (GstAdaptiveDemux2Stream, gst_adaptive_demux2_stream,
44     GST_TYPE_OBJECT);
45
46 static void
47 gst_adaptive_demux2_stream_class_init (GstAdaptiveDemux2StreamClass * klass)
48 {
49   GObjectClass *gobject_class = (GObjectClass *) klass;
50
51   gobject_class->finalize = gst_adaptive_demux2_stream_finalize;
52 }
53
54 static GType tsdemux_type = 0;
55
56 static void
57 gst_adaptive_demux2_stream_init (GstAdaptiveDemux2Stream * stream)
58 {
59   stream->download_request = download_request_new ();
60   stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
61   stream->last_ret = GST_FLOW_OK;
62   stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
63
64   stream->fragment_bitrates =
65       g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS);
66
67   stream->start_position = stream->current_position = GST_CLOCK_TIME_NONE;
68
69   gst_segment_init (&stream->parse_segment, GST_FORMAT_TIME);
70 }
71
72 /* must be called with manifest_lock taken.
73  * It will temporarily drop the manifest_lock in order to join the task.
74  * It will join only the old_streams (the demux->streams are joined by
75  * gst_adaptive_demux_stop_tasks before gst_adaptive_demux2_stream_free is
76  * called)
77  */
78 static void
79 gst_adaptive_demux2_stream_finalize (GObject * object)
80 {
81   GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) object;
82
83   GST_LOG_OBJECT (object, "Finalizing");
84
85   if (stream->download_request)
86     download_request_unref (stream->download_request);
87
88   stream->cancelled = TRUE;
89   g_clear_error (&stream->last_error);
90
91   gst_adaptive_demux2_stream_fragment_clear (&stream->fragment);
92
93   if (stream->pending_events) {
94     g_list_free_full (stream->pending_events, (GDestroyNotify) gst_event_unref);
95     stream->pending_events = NULL;
96   }
97
98   if (stream->parsebin_sink) {
99     gst_object_unref (stream->parsebin_sink);
100     stream->parsebin_sink = NULL;
101   }
102
103   if (stream->pad_added_id)
104     g_signal_handler_disconnect (stream->parsebin, stream->pad_added_id);
105   if (stream->pad_removed_id)
106     g_signal_handler_disconnect (stream->parsebin, stream->pad_removed_id);
107
108   if (stream->parsebin != NULL) {
109     GST_LOG_OBJECT (stream, "Removing parsebin");
110     gst_bin_remove (GST_BIN_CAST (stream->demux), stream->parsebin);
111     gst_element_set_state (stream->parsebin, GST_STATE_NULL);
112     gst_object_unref (stream->parsebin);
113     stream->parsebin = NULL;
114   }
115
116   g_free (stream->fragment_bitrates);
117
118   g_list_free_full (stream->tracks,
119       (GDestroyNotify) gst_adaptive_demux_track_unref);
120
121   if (stream->pending_caps)
122     gst_caps_unref (stream->pending_caps);
123
124   g_clear_pointer (&stream->pending_tags, gst_tag_list_unref);
125   g_clear_pointer (&stream->stream_collection, gst_object_unref);
126
127   G_OBJECT_CLASS (parent_class)->finalize (object);
128 }
129
130 /**
131  * gst_adaptive_demux2_stream_add_track:
132  * @stream: A #GstAdaptiveDemux2Stream
133  * @track: (transfer none): A #GstAdaptiveDemuxTrack to assign to the @stream
134  *
135  * This function is called when a subclass knows of a target @track that this
136  * @stream can provide.
137  */
138 gboolean
139 gst_adaptive_demux2_stream_add_track (GstAdaptiveDemux2Stream * stream,
140     GstAdaptiveDemuxTrack * track)
141 {
142   g_return_val_if_fail (track != NULL, FALSE);
143
144   GST_DEBUG_OBJECT (stream->demux, "stream:%p track:%s", stream,
145       track->stream_id);
146   if (g_list_find (stream->tracks, track)) {
147     GST_DEBUG_OBJECT (stream->demux,
148         "track '%s' already handled by this stream", track->stream_id);
149     return FALSE;
150   }
151
152   if (stream->demux->buffering_low_watermark_time)
153     track->buffering_threshold = stream->demux->buffering_low_watermark_time;
154   else if (GST_CLOCK_TIME_IS_VALID (stream->recommended_buffering_threshold))
155     track->buffering_threshold =
156         MIN (10 * GST_SECOND, stream->recommended_buffering_threshold);
157   else {
158     /* Using a starting default, can be overriden later in
159      * ::update_stream_info() */
160     GST_DEBUG_OBJECT (stream,
161         "Setting default 10s buffering threshold on new track");
162     track->buffering_threshold = 10 * GST_SECOND;
163   }
164
165   stream->tracks =
166       g_list_append (stream->tracks, gst_adaptive_demux_track_ref (track));
167   if (stream->demux) {
168     g_assert (stream->period);
169     gst_adaptive_demux_period_add_track (stream->period, track);
170   }
171   return TRUE;
172 }
173
174 static gboolean
175 gst_adaptive_demux2_stream_next_download (GstAdaptiveDemux2Stream * stream);
176 static gboolean
177 gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream);
178 static void
179 gst_adaptive_demux2_stream_handle_playlist_eos (GstAdaptiveDemux2Stream *
180     stream);
181 static GstFlowReturn
182 gst_adaptive_demux2_stream_begin_download_uri (GstAdaptiveDemux * demux,
183     GstAdaptiveDemux2Stream * stream, const gchar * uri, gint64 start,
184     gint64 end);
185
186 #ifndef GST_DISABLE_GST_DEBUG
187 static const char *
188 uritype (GstAdaptiveDemux2Stream * s)
189 {
190   if (s->downloading_header)
191     return "header";
192   if (s->downloading_index)
193     return "index";
194   return "fragment";
195 }
196 #endif
197
198 /* Schedules another chunked download (returns TRUE) or FALSE if no more chunks */
199 static gboolean
200 schedule_another_chunk (GstAdaptiveDemux2Stream * stream)
201 {
202   GstAdaptiveDemux *demux = stream->demux;
203   DownloadRequest *request = stream->download_request;
204   GstFlowReturn ret;
205
206   gchar *uri = request->uri;
207   gint64 range_start = request->range_start;
208   gint64 range_end = request->range_end;
209   gint64 chunk_size;
210   gint64 chunk_end;
211
212   if (range_end == -1)
213     return FALSE;               /* This was a request to the end, no more to load */
214
215   /* The size of the request that just completed: */
216   chunk_size = range_end + 1 - range_start;
217
218   if (request->content_received < chunk_size)
219     return FALSE;               /* Short read - we're done */
220
221   /* Accumulate the data we just fetched, to figure out the next
222    * request start position and update the target chunk size from
223    * the updated stream fragment info */
224   range_start += chunk_size;
225   range_end = stream->fragment.range_end;
226   chunk_size = stream->fragment.chunk_size;
227
228   if (chunk_size == 0)
229     return FALSE;               /* Sub-class doesn't want another chunk */
230
231   /* HTTP ranges are inclusive for the end */
232   if (chunk_size != -1) {
233     chunk_end = range_start + chunk_size - 1;
234     if (range_end != -1 && range_end < chunk_end)
235       chunk_end = range_end;
236   } else {
237     chunk_end = range_end;
238   }
239
240   GST_DEBUG_OBJECT (stream,
241       "Starting next chunk %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT
242       " chunk_size %" G_GINT64_FORMAT, uri, range_start, chunk_end, chunk_size);
243
244   ret =
245       gst_adaptive_demux2_stream_begin_download_uri (demux, stream, uri,
246       range_start, chunk_end);
247   if (ret != GST_FLOW_OK) {
248     GST_DEBUG_OBJECT (stream,
249         "Stopping stream due to begin download failure - ret %s",
250         gst_flow_get_name (ret));
251     gst_adaptive_demux2_stream_stop (stream);
252     return FALSE;
253   }
254
255   return TRUE;
256 }
257
258 static void
259 drain_inactive_tracks (GstAdaptiveDemux * demux,
260     GstAdaptiveDemux2Stream * stream)
261 {
262   GList *iter;
263
264   TRACKS_LOCK (demux);
265   for (iter = stream->tracks; iter; iter = iter->next) {
266     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
267     if (!track->selected) {
268       gst_adaptive_demux_track_drain_to (track,
269           demux->priv->global_output_position);
270     }
271   }
272
273   TRACKS_UNLOCK (demux);
274 }
275
276 /* Called to complete a download, either due to failure or completion
277  * Should set up the next download if necessary */
278 static void
279 gst_adaptive_demux2_stream_finish_download (GstAdaptiveDemux2Stream *
280     stream, GstFlowReturn ret, GError * err)
281 {
282   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux);
283   GstAdaptiveDemux *demux = stream->demux;
284
285   GST_DEBUG_OBJECT (stream,
286       "%s download finish: %d %s - err: %p", uritype (stream), ret,
287       gst_flow_get_name (ret), err);
288
289   stream->download_finished = TRUE;
290
291   /* finish_fragment might call gst_adaptive_demux2_stream_advance_fragment,
292    * which can look at the last_ret - so make sure it's stored before calling that.
293    * Also, for not-linked or other errors passed in that are going to make
294    * this stream stop, we'll need to store it */
295   stream->last_ret = ret;
296
297   if (err) {
298     g_clear_error (&stream->last_error);
299     stream->last_error = g_error_copy (err);
300   }
301
302   /* For actual errors, stop now, no need to call finish_fragment and get
303    * confused if it returns a non-error status, but if EOS was passed in,
304    * continue and check whether finish_fragment() says we've finished
305    * the whole manifest or just this fragment */
306   if (ret < 0 && ret != GST_FLOW_EOS) {
307     GST_INFO_OBJECT (stream,
308         "Stopping stream due to error ret %s", gst_flow_get_name (ret));
309     gst_adaptive_demux2_stream_stop (stream);
310     return;
311   }
312
313   /* Handle all the possible flow returns here: */
314   if (ret == GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC) {
315     /* We lost sync, seek back to live and return */
316     GST_WARNING_OBJECT (stream, "Lost sync when downloading");
317     gst_adaptive_demux_handle_lost_sync (demux);
318     return;
319   } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_END_OF_FRAGMENT) {
320     /* The sub-class wants to stop the fragment immediately */
321     stream->fragment.finished = TRUE;
322     ret = klass->finish_fragment (demux, stream);
323
324     GST_DEBUG_OBJECT (stream, "finish_fragment ret %d %s", ret,
325         gst_flow_get_name (ret));
326   } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_RESTART_FRAGMENT) {
327     GST_DEBUG_OBJECT (stream, "Restarting download as requested");
328     /* Just mark the fragment as finished */
329     stream->fragment.finished = TRUE;
330     ret = GST_FLOW_OK;
331   } else if (!klass->need_another_chunk || stream->fragment.chunk_size == -1
332       || !klass->need_another_chunk (stream)
333       || stream->fragment.chunk_size == 0) {
334     stream->fragment.finished = TRUE;
335     ret = klass->finish_fragment (stream->demux, stream);
336
337     GST_DEBUG_OBJECT (stream, "finish_fragment ret %d %s", ret,
338         gst_flow_get_name (ret));
339   } else if (stream->fragment.chunk_size != 0
340       && schedule_another_chunk (stream)) {
341     /* Another download has already begun, no need to queue anything below */
342     return;
343   }
344
345   /* For HLS, we might be enqueueing data into tracks that aren't
346    * selected. Drain those ones out */
347   drain_inactive_tracks (stream->demux, stream);
348
349   /* Now that we've called finish_fragment we can clear these flags the
350    * sub-class might have checked */
351   if (stream->downloading_header) {
352     stream->need_header = FALSE;
353     stream->downloading_header = FALSE;
354   } else if (stream->downloading_index) {
355     stream->need_index = FALSE;
356     stream->downloading_index = FALSE;
357     /* Restart the fragment again now that header + index were loaded
358      * so that get_fragment_info() will be called again */
359     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
360   } else {
361     /* Finishing a fragment data download. Try for another */
362     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
363   }
364
365   /* if GST_FLOW_EOS was passed in that means this download is finished,
366    * but it's the result returned from finish_fragment() we really care
367    * about, as that tells us if the manifest has run out of fragments
368    * to load */
369   if (ret == GST_FLOW_EOS) {
370     stream->last_ret = ret;
371
372     gst_adaptive_demux2_stream_handle_playlist_eos (stream);
373     return;
374   }
375
376   /* Now finally, if ret is anything other than success, we should stop this
377    * stream */
378   if (ret < 0) {
379     GST_DEBUG_OBJECT (stream,
380         "Stopping stream due to finish fragment ret %s",
381         gst_flow_get_name (ret));
382     gst_adaptive_demux2_stream_stop (stream);
383     return;
384   }
385
386   /* Clear the last_ret marker before starting a fresh download */
387   stream->last_ret = GST_FLOW_OK;
388
389   GST_LOG_OBJECT (stream, "Scheduling next_download() call");
390   stream->pending_cb_id =
391       gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
392       (GSourceFunc) gst_adaptive_demux2_stream_next_download,
393       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
394 }
395
396 /* Must be called from the scheduler context */
397 void
398 gst_adaptive_demux2_stream_parse_error (GstAdaptiveDemux2Stream * stream,
399     GError * err)
400 {
401   GstAdaptiveDemux *demux = stream->demux;
402
403   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
404     return;
405
406   downloadhelper_cancel_request (demux->download_helper,
407       stream->download_request);
408
409   /* cancellation is async, so recycle our download request to avoid races */
410   download_request_unref (stream->download_request);
411   stream->download_request = download_request_new ();
412
413   gst_adaptive_demux2_stream_finish_download (stream, GST_FLOW_CUSTOM_ERROR,
414       err);
415 }
416
417 static void
418 gst_adaptive_demux2_stream_prepare_segment (GstAdaptiveDemux * demux,
419     GstAdaptiveDemux2Stream * stream, gboolean first_and_live)
420 {
421   GstClockTime period_start = gst_adaptive_demux_get_period_start_time (demux);
422   GstClockTime offset =
423       gst_adaptive_demux2_stream_get_presentation_offset (demux, stream);
424
425   stream->parse_segment = demux->segment;
426
427   /* The demuxer segment is just built from seek events, but for each stream
428    * we have to adjust segments according to the current period and the
429    * stream specific presentation time offset.
430    *
431    * For each period, buffer timestamps start again from 0. Additionally the
432    * buffer timestamps are shifted by the stream specific presentation time
433    * offset, so the first buffer timestamp of a period is 0 + presentation
434    * time offset. If the stream contains timestamps itself, this is also
435    * supposed to be the presentation time stored inside the stream.
436    *
437    * The stream time over periods is supposed to be continuous, that is the
438    * buffer timestamp 0 + presentation time offset should map to the start
439    * time of the current period.
440    *
441    *
442    * The adjustment of the stream segments as such works the following.
443    *
444    * If the demuxer segment start is bigger than the period start, this
445    * means that we have to drop some media at the beginning of the current
446    * period, e.g. because a seek into the middle of the period has
447    * happened. The amount of media to drop is the difference between the
448    * period start and the demuxer segment start, and as each period starts
449    * again from 0, this difference is going to be the actual stream's
450    * segment start. As all timestamps of the stream are shifted by the
451    * presentation time offset, we will also have to move the segment start
452    * by that offset.
453    *
454    * Likewise, the demuxer segment stop value is adjusted in the same
455    * fashion.
456    *
457    * Now the running time and stream time at the stream's segment start has
458    * to be the one that is stored inside the demuxer's segment, which means
459    * that segment.base and segment.time have to be copied over (done just
460    * above)
461    *
462    *
463    * If the demuxer segment start is smaller than the period start time,
464    * this means that the whole period is inside the segment. As each period
465    * starts timestamps from 0, and additionally timestamps are shifted by
466    * the presentation time offset, the stream's first timestamp (and as such
467    * the stream's segment start) has to be the presentation time offset.
468    * The stream time at the segment start is supposed to be the stream time
469    * of the period start according to the demuxer segment, so the stream
470    * segment's time would be set to that. The same goes for the stream
471    * segment's base, which is supposed to be the running time of the period
472    * start according to the demuxer's segment.
473    *
474    * The same logic applies for negative rates with the segment stop and
475    * the period stop time (which gets clamped).
476    *
477    *
478    * For the first case where not the complete period is inside the segment,
479    * the segment time and base as calculated by the second case would be
480    * equivalent.
481    */
482   GST_DEBUG_OBJECT (demux, "Using demux segment %" GST_SEGMENT_FORMAT,
483       &demux->segment);
484   GST_DEBUG_OBJECT (demux,
485       "period_start: %" GST_TIME_FORMAT " offset: %" GST_TIME_FORMAT,
486       GST_TIME_ARGS (period_start), GST_TIME_ARGS (offset));
487   /* note for readers:
488    * Since stream->parse_segment is initially a copy of demux->segment,
489    * only the values that need updating are modified below. */
490   if (first_and_live) {
491     /* If first and live, demuxer did seek to the current position already */
492     stream->parse_segment.start = demux->segment.start - period_start + offset;
493     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
494       stream->parse_segment.stop = demux->segment.stop - period_start + offset;
495     /* FIXME : Do we need to handle negative rates for this ? */
496     stream->parse_segment.position = stream->parse_segment.start;
497   } else if (demux->segment.start > period_start) {
498     /* seek within a period */
499     stream->parse_segment.start = demux->segment.start - period_start + offset;
500     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
501       stream->parse_segment.stop = demux->segment.stop - period_start + offset;
502     if (stream->parse_segment.rate >= 0)
503       stream->parse_segment.position = offset;
504     else
505       stream->parse_segment.position = stream->parse_segment.stop;
506   } else {
507     stream->parse_segment.start = offset;
508     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
509       stream->parse_segment.stop = demux->segment.stop - period_start + offset;
510     if (stream->parse_segment.rate >= 0) {
511       stream->parse_segment.position = offset;
512       stream->parse_segment.base =
513           gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
514           period_start);
515     } else {
516       stream->parse_segment.position = stream->parse_segment.stop;
517       stream->parse_segment.base =
518           gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
519           period_start + demux->segment.stop - demux->segment.start);
520     }
521     stream->parse_segment.time =
522         gst_segment_to_stream_time (&demux->segment, GST_FORMAT_TIME,
523         period_start);
524   }
525
526   stream->send_segment = TRUE;
527
528   GST_DEBUG_OBJECT (stream, "Prepared segment %" GST_SEGMENT_FORMAT,
529       &stream->parse_segment);
530 }
531
532 /* Segment lock hold */
533 static void
534 update_buffer_pts_and_demux_position_locked (GstAdaptiveDemux * demux,
535     GstAdaptiveDemux2Stream * stream, GstBuffer * buffer)
536 {
537   GstClockTimeDiff pos;
538
539   GST_DEBUG_OBJECT (stream, "stream->fragment.stream_time %" GST_STIME_FORMAT,
540       GST_STIME_ARGS (stream->fragment.stream_time));
541
542   pos = stream->fragment.stream_time;
543
544   if (GST_CLOCK_STIME_IS_VALID (pos)) {
545     GstClockTime offset =
546         gst_adaptive_demux2_stream_get_presentation_offset (demux, stream);
547
548     pos += offset;
549
550     if (pos < 0) {
551       GST_WARNING_OBJECT (stream, "Clamping segment and buffer position to 0");
552       pos = 0;
553     }
554
555     GST_BUFFER_PTS (buffer) = pos;
556   } else {
557     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
558   }
559
560   GST_DEBUG_OBJECT (stream, "Buffer/stream position is now: %" GST_TIME_FORMAT,
561       GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
562 }
563
564 /* Must be called from the scheduler context */
565 GstFlowReturn
566 gst_adaptive_demux2_stream_push_buffer (GstAdaptiveDemux2Stream * stream,
567     GstBuffer * buffer)
568 {
569   GstAdaptiveDemux *demux = stream->demux;
570   GstFlowReturn ret = GST_FLOW_OK;
571   gboolean discont = FALSE;
572   /* Pending events */
573   GstEvent *pending_caps = NULL, *pending_segment = NULL, *pending_tags =
574       NULL, *stream_start = NULL, *buffer_gap = NULL;
575   GList *pending_events = NULL;
576
577   if (stream->compute_segment) {
578     gst_adaptive_demux2_stream_prepare_segment (demux, stream,
579         stream->first_and_live);
580     stream->compute_segment = FALSE;
581     stream->first_and_live = FALSE;
582   }
583
584   if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DROPPABLE)) {
585     GST_DEBUG_OBJECT (stream, "Creating gap event for droppable buffer");
586     buffer_gap =
587         gst_event_new_gap (GST_BUFFER_PTS (buffer),
588         GST_BUFFER_DURATION (buffer));
589   }
590
591   if (stream->first_fragment_buffer) {
592     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
593     if (demux->segment.rate < 0)
594       /* Set DISCONT flag for every first buffer in reverse playback mode
595        * as each fragment for its own has to be reversed */
596       discont = TRUE;
597     update_buffer_pts_and_demux_position_locked (demux, stream, buffer);
598     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
599
600     GST_LOG_OBJECT (stream, "Handling initial buffer %" GST_PTR_FORMAT, buffer);
601
602     /* Do we need to inject STREAM_START and SEGMENT events ?
603      *
604      * This can happen when a stream is restarted, and also when switching to a
605      * variant which needs a header (in which case downloading_header will be
606      * TRUE)
607      */
608     if (G_UNLIKELY (stream->send_segment || stream->downloading_header)) {
609       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
610       pending_segment = gst_event_new_segment (&stream->parse_segment);
611       gst_event_set_seqnum (pending_segment, demux->priv->segment_seqnum);
612       stream->send_segment = FALSE;
613       GST_DEBUG_OBJECT (stream, "Sending %" GST_PTR_FORMAT, pending_segment);
614       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
615       stream_start = gst_event_new_stream_start ("bogus");
616       if (demux->have_group_id)
617         gst_event_set_group_id (stream_start, demux->group_id);
618     }
619   } else {
620     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
621   }
622   stream->first_fragment_buffer = FALSE;
623
624   if (stream->discont) {
625     discont = TRUE;
626     stream->discont = FALSE;
627   }
628
629   if (discont) {
630     GST_DEBUG_OBJECT (stream, "Marking fragment as discontinuous");
631     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
632   } else {
633     GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
634   }
635
636   GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
637   GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
638   if (G_UNLIKELY (stream->pending_caps)) {
639     pending_caps = gst_event_new_caps (stream->pending_caps);
640     gst_caps_unref (stream->pending_caps);
641     stream->pending_caps = NULL;
642   }
643
644   if (G_UNLIKELY (stream->pending_tags)) {
645     GstTagList *tags = stream->pending_tags;
646
647     stream->pending_tags = NULL;
648
649     if (tags)
650       pending_tags = gst_event_new_tag (tags);
651   }
652   if (G_UNLIKELY (stream->pending_events)) {
653     pending_events = stream->pending_events;
654     stream->pending_events = NULL;
655   }
656
657   /* Do not push events or buffers holding the manifest lock */
658   if (G_UNLIKELY (stream_start)) {
659     GST_DEBUG_OBJECT (stream,
660         "Setting stream start: %" GST_PTR_FORMAT, stream_start);
661     gst_pad_send_event (stream->parsebin_sink, stream_start);
662   }
663   if (G_UNLIKELY (pending_caps)) {
664     GST_DEBUG_OBJECT (stream,
665         "Setting pending caps: %" GST_PTR_FORMAT, pending_caps);
666     gst_pad_send_event (stream->parsebin_sink, pending_caps);
667   }
668   if (G_UNLIKELY (pending_segment)) {
669     GST_DEBUG_OBJECT (stream,
670         "Sending pending seg: %" GST_PTR_FORMAT, pending_segment);
671     gst_pad_send_event (stream->parsebin_sink, pending_segment);
672   }
673   if (G_UNLIKELY (pending_tags)) {
674     GST_DEBUG_OBJECT (stream,
675         "Sending pending tags: %" GST_PTR_FORMAT, pending_tags);
676     gst_pad_send_event (stream->parsebin_sink, pending_tags);
677   }
678   while (pending_events != NULL) {
679     GstEvent *event = pending_events->data;
680
681     GST_DEBUG_OBJECT (stream, "Sending pending event: %" GST_PTR_FORMAT, event);
682     if (!gst_pad_send_event (stream->parsebin_sink, event))
683       GST_ERROR_OBJECT (stream, "Failed to send pending event");
684
685     pending_events = g_list_delete_link (pending_events, pending_events);
686   }
687
688   GST_DEBUG_OBJECT (stream,
689       "About to push buffer of size %" G_GSIZE_FORMAT " offset %"
690       G_GUINT64_FORMAT, gst_buffer_get_size (buffer),
691       GST_BUFFER_OFFSET (buffer));
692
693   ret = gst_pad_chain (stream->parsebin_sink, buffer);
694
695   if (buffer_gap) {
696     GST_DEBUG_OBJECT (stream, "Sending %" GST_PTR_FORMAT, buffer_gap);
697     gst_pad_send_event (stream->parsebin_sink, buffer_gap);
698   }
699
700   if (G_UNLIKELY (stream->cancelled)) {
701     GST_LOG_OBJECT (demux, "Stream was cancelled");
702     return GST_FLOW_FLUSHING;
703   }
704
705   GST_LOG_OBJECT (stream, "Push result: %d %s", ret, gst_flow_get_name (ret));
706
707   return ret;
708 }
709
710 static GstFlowReturn
711 gst_adaptive_demux2_stream_parse_buffer (GstAdaptiveDemux2Stream * stream,
712     GstBuffer * buffer)
713 {
714   GstAdaptiveDemux *demux = stream->demux;
715   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
716   GstFlowReturn ret = GST_FLOW_OK;
717
718   /* do not make any changes if the stream is cancelled */
719   if (G_UNLIKELY (stream->cancelled)) {
720     gst_buffer_unref (buffer);
721     return GST_FLOW_FLUSHING;
722   }
723
724   /* starting_fragment is set to TRUE at the beginning of
725    * _stream_download_fragment()
726    * /!\ If there is a header/index being downloaded, then this will
727    * be TRUE for the first one ... but FALSE for the remaining ones,
728    * including the *actual* fragment ! */
729   if (stream->starting_fragment) {
730     stream->starting_fragment = FALSE;
731     if (klass->start_fragment != NULL && !klass->start_fragment (demux, stream))
732       return GST_FLOW_ERROR;
733   }
734
735   stream->download_total_bytes += gst_buffer_get_size (buffer);
736
737   GST_TRACE_OBJECT (stream,
738       "Received %s buffer of size %" G_GSIZE_FORMAT, uritype (stream),
739       gst_buffer_get_size (buffer));
740
741   ret = klass->data_received (demux, stream, buffer);
742
743   if (ret != GST_FLOW_OK) {
744     GST_DEBUG_OBJECT (stream, "data_received returned %s",
745         gst_flow_get_name (ret));
746
747     if (ret == GST_FLOW_FLUSHING) {
748       /* do not make any changes if the stream is cancelled */
749       if (G_UNLIKELY (stream->cancelled)) {
750         return ret;
751       }
752     }
753
754     if (ret < GST_FLOW_EOS) {
755       GstEvent *eos = gst_event_new_eos ();
756       GST_ELEMENT_FLOW_ERROR (demux, ret);
757
758       GST_DEBUG_OBJECT (stream, "Pushing EOS to parser");
759
760       /* TODO push this on all pads */
761       gst_event_set_seqnum (eos, stream->demux->priv->segment_seqnum);
762       gst_pad_send_event (stream->parsebin_sink, eos);
763       ret = GST_FLOW_ERROR;
764
765       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_ERRORED;
766     }
767   }
768
769   return ret;
770 }
771
772 /* Calculate the low and high download buffering watermarks
773  * in time as MAX (low-watermark-time, low-watermark-fragments) and
774  * MIN (high-watermark-time, high-watermark-fragments) respectively
775  */
776 static void
777 calculate_track_thresholds (GstAdaptiveDemux * demux,
778     GstAdaptiveDemux2Stream * stream,
779     GstClockTime fragment_duration, GstClockTime * low_threshold,
780     GstClockTime * high_threshold)
781 {
782   GST_OBJECT_LOCK (demux);
783   *low_threshold = demux->buffering_low_watermark_fragments * fragment_duration;
784   if (*low_threshold == 0 ||
785       (demux->buffering_low_watermark_time != 0
786           && demux->buffering_low_watermark_time > *low_threshold)) {
787     *low_threshold = demux->buffering_low_watermark_time;
788   }
789
790   if (*low_threshold == 0) {
791     /* This implies both low level properties were 0, the default is 10s unless
792      * the subclass has specified a recommended buffering threshold */
793     *low_threshold = 10 * GST_SECOND;
794     if (GST_CLOCK_TIME_IS_VALID (stream->recommended_buffering_threshold))
795       *low_threshold =
796           MIN (stream->recommended_buffering_threshold, *low_threshold);
797   }
798
799   *high_threshold =
800       demux->buffering_high_watermark_fragments * fragment_duration;
801   if (*high_threshold == 0 || (demux->buffering_high_watermark_time != 0
802           && demux->buffering_high_watermark_time < *high_threshold)) {
803     *high_threshold = demux->buffering_high_watermark_time;
804   }
805
806   /* Make sure the low and high thresholds are less than the maximum buffering
807    * time */
808   if (*high_threshold == 0 ||
809       (demux->max_buffering_time != 0
810           && demux->max_buffering_time < *high_threshold)) {
811     *high_threshold = demux->max_buffering_time;
812   }
813
814   if (*low_threshold == 0 ||
815       (demux->max_buffering_time != 0
816           && demux->max_buffering_time < *low_threshold)) {
817     *low_threshold = demux->max_buffering_time;
818   }
819
820   /* Make sure the high threshold is higher than (or equal to) the low threshold.
821    * It's OK if they are the same, as the minimum download is 1 fragment */
822   if (*high_threshold == 0 ||
823       (*low_threshold != 0 && *low_threshold > *high_threshold)) {
824     *high_threshold = *low_threshold;
825   }
826
827   GST_OBJECT_UNLOCK (demux);
828 }
829
830 #define ABSDIFF(a,b) ((a) < (b) ? (b) - (a) : (a) - (b))
831 static gboolean
832 gst_adaptive_demux2_stream_wait_for_output_space (GstAdaptiveDemux * demux,
833     GstAdaptiveDemux2Stream * stream, GstClockTime fragment_duration)
834 {
835   gboolean need_to_wait = TRUE;
836   gboolean have_any_tracks = FALSE;
837   gboolean have_active_tracks = FALSE;
838   gboolean have_filled_inactive = FALSE;
839   gboolean update_buffering = FALSE;
840
841   GstClockTime low_threshold = 0, high_threshold = 0;
842   GList *iter;
843
844   calculate_track_thresholds (demux, stream, fragment_duration,
845       &low_threshold, &high_threshold);
846   GST_DEBUG_OBJECT (stream,
847       "Thresholds low:%" GST_TIME_FORMAT " high:%" GST_TIME_FORMAT
848       " recommended:%" GST_TIME_FORMAT, GST_TIME_ARGS (low_threshold),
849       GST_TIME_ARGS (high_threshold),
850       GST_TIME_ARGS (stream->recommended_buffering_threshold));
851
852   /* If there are no tracks at all, don't wait. If there are no active
853    * tracks, keep filling until at least one track is full. If there
854    * are active tracks, require that they are all full */
855   TRACKS_LOCK (demux);
856   for (iter = stream->tracks; iter; iter = iter->next) {
857     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
858
859     /* Update the buffering threshold if it changed by more than a second */
860     if (ABSDIFF (low_threshold, track->buffering_threshold) > GST_SECOND) {
861       GST_DEBUG_OBJECT (stream, "Updating threshold");
862       /* The buffering threshold for this track changed, make sure to
863        * re-check buffering status */
864       update_buffering = TRUE;
865       track->buffering_threshold = low_threshold;
866     }
867
868     have_any_tracks = TRUE;
869     if (track->active)
870       have_active_tracks = TRUE;
871
872     if (track->level_time < high_threshold) {
873       if (track->active) {
874         need_to_wait = FALSE;
875         GST_DEBUG_OBJECT (stream,
876             "track %s has level %" GST_TIME_FORMAT
877             " - needs more data (target %" GST_TIME_FORMAT
878             ") (fragment duration %" GST_TIME_FORMAT ")",
879             track->stream_id, GST_TIME_ARGS (track->level_time),
880             GST_TIME_ARGS (high_threshold), GST_TIME_ARGS (fragment_duration));
881         continue;
882       }
883     } else if (!track->active) {        /* track is over threshold and inactive */
884       have_filled_inactive = TRUE;
885     }
886
887     GST_DEBUG_OBJECT (stream,
888         "track %s active (%d) has level %" GST_TIME_FORMAT,
889         track->stream_id, track->active, GST_TIME_ARGS (track->level_time));
890   }
891
892   /* If there are no tracks, don't wait (we might need data to create them),
893    * or if there are active tracks that need more data to hit the threshold,
894    * don't wait. Otherwise it means all active tracks are full and we should wait */
895   if (!have_any_tracks) {
896     GST_DEBUG_OBJECT (stream, "no tracks created yet - not waiting");
897     need_to_wait = FALSE;
898   } else if (!have_active_tracks && !have_filled_inactive) {
899     GST_DEBUG_OBJECT (stream,
900         "have only inactive tracks that need more data - not waiting");
901     need_to_wait = FALSE;
902   }
903
904   if (need_to_wait) {
905     stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
906
907     for (iter = stream->tracks; iter; iter = iter->next) {
908       GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
909
910       GST_DEBUG_OBJECT (stream,
911           "Waiting for queued data on track %s to drop below %"
912           GST_TIME_FORMAT " (fragment duration %" GST_TIME_FORMAT ")",
913           track->stream_id, GST_TIME_ARGS (high_threshold),
914           GST_TIME_ARGS (fragment_duration));
915
916       /* we want to get woken up when the global output position reaches
917        * a point where the input is closer than "high_threshold" to needing
918        * output, so we can put more data in */
919       GstClockTimeDiff wakeup_time = track->input_time - high_threshold;
920
921       if (stream->next_input_wakeup_time == GST_CLOCK_STIME_NONE ||
922           wakeup_time < stream->next_input_wakeup_time) {
923         stream->next_input_wakeup_time = wakeup_time;
924
925         GST_DEBUG_OBJECT (stream,
926             "Track %s level %" GST_TIME_FORMAT ". Input at position %"
927             GST_TIME_FORMAT " next wakeup should be %" GST_TIME_FORMAT " now %"
928             GST_TIME_FORMAT, track->stream_id,
929             GST_TIME_ARGS (track->level_time),
930             GST_TIME_ARGS (track->input_time), GST_TIME_ARGS (wakeup_time),
931             GST_TIME_ARGS (demux->priv->global_output_position));
932       }
933     }
934
935     if (stream->next_input_wakeup_time != GST_CLOCK_TIME_NONE) {
936       GST_DEBUG_OBJECT (stream,
937           "Next input wakeup time is now %" GST_TIME_FORMAT,
938           GST_TIME_ARGS (stream->next_input_wakeup_time));
939
940       /* If this stream needs waking up sooner than any other current one,
941        * update the period wakeup time, which is what the output loop
942        * will check */
943       GstAdaptiveDemuxPeriod *period = stream->period;
944       if (period->next_input_wakeup_time == GST_CLOCK_STIME_NONE ||
945           period->next_input_wakeup_time > stream->next_input_wakeup_time) {
946         period->next_input_wakeup_time = stream->next_input_wakeup_time;
947       }
948     }
949   }
950
951   if (update_buffering) {
952     demux_update_buffering_locked (demux);
953     demux_post_buffering_locked (demux);
954   }
955
956   TRACKS_UNLOCK (demux);
957
958   return need_to_wait;
959 }
960
961 static GstAdaptiveDemuxTrack *
962 match_parsebin_to_track (GstAdaptiveDemux2Stream * stream, GstPad * pad)
963 {
964   GList *tmp;
965   GstAdaptiveDemuxTrack *found_track = NULL, *first_matched_track = NULL;
966   gint num_possible_tracks = 0;
967   GstStream *gst_stream;
968   const gchar *internal_stream_id;
969   GstStreamType stream_type;
970
971   gst_stream = gst_pad_get_stream (pad);
972
973   /* FIXME: Edward: Added assertion because I don't see in what cases we would
974    * end up with a pad from parsebin which wouldn't have an associated
975    * GstStream. */
976   g_assert (gst_stream);
977
978   internal_stream_id = gst_stream_get_stream_id (gst_stream);
979   stream_type = gst_stream_get_stream_type (gst_stream);
980
981   GST_DEBUG_OBJECT (pad,
982       "Trying to match pad from parsebin with internal streamid %s and caps %"
983       GST_PTR_FORMAT, GST_STR_NULL (internal_stream_id),
984       gst_stream_get_caps (gst_stream));
985
986   /* Try to match directly by the track's pending upstream_stream_id */
987   for (tmp = stream->tracks; tmp; tmp = tmp->next) {
988     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
989
990     if (stream_type != GST_STREAM_TYPE_UNKNOWN && track->type != stream_type)
991       continue;
992
993     GST_DEBUG_OBJECT (pad, "track upstream_stream_id: %s",
994         track->upstream_stream_id);
995
996     if (first_matched_track == NULL)
997       first_matched_track = track;
998     num_possible_tracks++;
999
1000     /* If this track has a desired upstream stream id, match on it */
1001     if (track->upstream_stream_id == NULL ||
1002         g_strcmp0 (track->upstream_stream_id, internal_stream_id)) {
1003       /* This is not the track for this pad */
1004       continue;
1005     }
1006
1007     /* Remove pending upstream id (we have matched it for the pending
1008      * stream_id) */
1009     g_free (track->upstream_stream_id);
1010     track->upstream_stream_id = NULL;
1011     found_track = track;
1012     break;
1013   }
1014
1015   if (found_track == NULL) {
1016     /* If we arrive here, it means the stream is switching pads after
1017      * the stream has already started running */
1018     /* No track is currently waiting for this particular stream id -
1019      * try and match an existing linked track. If there's only 1 possible,
1020      * take it. */
1021     if (num_possible_tracks == 1 && first_matched_track != NULL) {
1022       GST_LOG_OBJECT (pad, "Only one possible track to link to");
1023       found_track = first_matched_track;
1024     }
1025   }
1026
1027   if (found_track == NULL) {
1028     /* TODO: There are multiple possible tracks, need to match based
1029      * on language code and caps. Have you found a stream like this? */
1030     GST_FIXME_OBJECT (pad, "Need to match track based on caps and language");
1031   }
1032
1033   if (found_track != NULL) {
1034     if (!gst_pad_is_linked (found_track->sinkpad)) {
1035       GST_LOG_OBJECT (pad, "Linking to track pad %" GST_PTR_FORMAT,
1036           found_track->sinkpad);
1037
1038       if (gst_pad_link (pad, found_track->sinkpad) != GST_PAD_LINK_OK) {
1039         GST_ERROR_OBJECT (pad, "Couldn't connect to track sinkpad");
1040         /* FIXME : Do something if we can't link ? */
1041       }
1042     } else {
1043       /* Store pad as pending link */
1044       GST_LOG_OBJECT (pad,
1045           "Remembering pad to be linked when current pad is unlinked");
1046       g_assert (found_track->pending_srcpad == NULL);
1047       found_track->pending_srcpad = gst_object_ref (pad);
1048     }
1049   }
1050
1051   if (gst_stream)
1052     gst_object_unref (gst_stream);
1053
1054   return found_track;
1055 }
1056
1057 static void
1058 parsebin_pad_removed_cb (GstElement * parsebin, GstPad * pad,
1059     GstAdaptiveDemux2Stream * stream)
1060 {
1061   GList *iter;
1062   GST_DEBUG_OBJECT (stream, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1063
1064   /* Remove from pending source pad */
1065   TRACKS_LOCK (stream->demux);
1066   for (iter = stream->tracks; iter; iter = iter->next) {
1067     GstAdaptiveDemuxTrack *track = iter->data;
1068     if (track->pending_srcpad == pad) {
1069       gst_object_unref (track->pending_srcpad);
1070       track->pending_srcpad = NULL;
1071       break;
1072     }
1073   }
1074   TRACKS_UNLOCK (stream->demux);
1075 }
1076
1077 static void
1078 parsebin_pad_added_cb (GstElement * parsebin, GstPad * pad,
1079     GstAdaptiveDemux2Stream * stream)
1080 {
1081   if (!GST_PAD_IS_SRC (pad))
1082     return;
1083
1084   GST_DEBUG_OBJECT (stream, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1085
1086   if (!match_parsebin_to_track (stream, pad))
1087     GST_WARNING_OBJECT (pad, "Found no track to handle pad");
1088
1089   GST_DEBUG_OBJECT (stream->demux, "Done linking");
1090 }
1091
1092 static void
1093 parsebin_deep_element_added_cb (GstBin * parsebin, GstBin * unused,
1094     GstElement * element, GstAdaptiveDemux * demux)
1095 {
1096   if (G_OBJECT_TYPE (element) == tsdemux_type) {
1097     GST_DEBUG_OBJECT (demux, "Overriding tsdemux ignore-pcr to TRUE");
1098     g_object_set (element, "ignore-pcr", TRUE, NULL);
1099   }
1100 }
1101
1102 /* must be called with manifest_lock taken */
1103 static gboolean
1104 gst_adaptive_demux2_stream_create_parser (GstAdaptiveDemux2Stream * stream)
1105 {
1106   GstAdaptiveDemux *demux = stream->demux;
1107
1108   if (stream->parsebin == NULL) {
1109     GstEvent *event;
1110
1111     GST_DEBUG_OBJECT (demux, "Setting up new parsing source");
1112
1113     /* Workaround to detect if tsdemux is being used */
1114     if (tsdemux_type == 0) {
1115       GstElement *element = gst_element_factory_make ("tsdemux", NULL);
1116       if (element) {
1117         tsdemux_type = G_OBJECT_TYPE (element);
1118         gst_object_unref (element);
1119       }
1120     }
1121
1122     stream->parsebin = gst_element_factory_make ("parsebin", NULL);
1123     if (tsdemux_type)
1124       g_signal_connect (stream->parsebin, "deep-element-added",
1125           (GCallback) parsebin_deep_element_added_cb, demux);
1126     gst_bin_add (GST_BIN_CAST (demux), gst_object_ref (stream->parsebin));
1127     stream->parsebin_sink =
1128         gst_element_get_static_pad (stream->parsebin, "sink");
1129     stream->pad_added_id = g_signal_connect (stream->parsebin, "pad-added",
1130         G_CALLBACK (parsebin_pad_added_cb), stream);
1131     stream->pad_removed_id = g_signal_connect (stream->parsebin, "pad-removed",
1132         G_CALLBACK (parsebin_pad_removed_cb), stream);
1133
1134     event = gst_event_new_stream_start ("bogus");
1135     if (demux->have_group_id)
1136       gst_event_set_group_id (event, demux->group_id);
1137
1138     gst_pad_send_event (stream->parsebin_sink, event);
1139
1140     /* Not sure if these need to be outside the manifest lock: */
1141     gst_element_sync_state_with_parent (stream->parsebin);
1142     stream->last_status_code = 200;     /* default to OK */
1143   }
1144   return TRUE;
1145 }
1146
1147 static void
1148 on_download_cancellation (DownloadRequest * request, DownloadRequestState state,
1149     GstAdaptiveDemux2Stream * stream)
1150 {
1151 }
1152
1153 static void
1154 on_download_error (DownloadRequest * request, DownloadRequestState state,
1155     GstAdaptiveDemux2Stream * stream)
1156 {
1157   GstAdaptiveDemux *demux = stream->demux;
1158   guint last_status_code = request->status_code;
1159   gboolean live;
1160
1161   stream->download_active = FALSE;
1162   stream->last_status_code = last_status_code;
1163
1164   GST_DEBUG_OBJECT (stream,
1165       "Download finished with error, request state %d http status %u, dc %d",
1166       request->state, last_status_code, stream->download_error_count);
1167
1168   live = gst_adaptive_demux_is_live (demux);
1169   if (((last_status_code / 100 == 4 && live)
1170           || last_status_code / 100 == 5)) {
1171     /* 4xx/5xx */
1172     /* if current position is before available start, switch to next */
1173     if (!gst_adaptive_demux2_stream_has_next_fragment (demux, stream))
1174       goto flushing;
1175
1176     if (live) {
1177       gint64 range_start, range_stop;
1178
1179       if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
1180               &range_stop))
1181         goto flushing;
1182
1183       if (demux->segment.position < range_start) {
1184         GstFlowReturn ret;
1185
1186         GST_DEBUG_OBJECT (stream, "Retrying once with next segment");
1187         gst_adaptive_demux2_stream_finish_download (stream, GST_FLOW_EOS, NULL);
1188
1189         GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1190
1191         ret = gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
1192         GST_DEBUG_OBJECT (stream, "update_fragment_info ret: %s",
1193             gst_flow_get_name (ret));
1194
1195         if (ret == GST_FLOW_OK)
1196           goto again;
1197
1198       } else if (demux->segment.position > range_stop) {
1199         /* wait a bit to be in range, we don't have any locks at that point */
1200         GstClockTime wait_time =
1201             gst_adaptive_demux2_stream_get_fragment_waiting_time (demux,
1202             stream);
1203         if (wait_time > 0) {
1204           GST_DEBUG_OBJECT (stream,
1205               "Download waiting for %" GST_TIME_FORMAT,
1206               GST_TIME_ARGS (wait_time));
1207           g_assert (stream->pending_cb_id == 0);
1208           GST_LOG_OBJECT (stream, "Scheduling delayed load_a_fragment() call");
1209           stream->pending_cb_id =
1210               gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1211               wait_time,
1212               (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1213               gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1214         }
1215       }
1216     }
1217
1218   flushing:
1219     if (stream->download_error_count >= MAX_DOWNLOAD_ERROR_COUNT) {
1220       /* looks like there is no way of knowing when a live stream has ended
1221        * Have to assume we are falling behind and cause a manifest reload */
1222       GST_DEBUG_OBJECT (stream, "Converting error of live stream to EOS");
1223       gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1224       return;
1225     }
1226   } else if (!gst_adaptive_demux2_stream_has_next_fragment (demux, stream)) {
1227     /* If this is the last fragment, consider failures EOS and not actual
1228      * errors. Due to rounding errors in the durations, the last fragment
1229      * might not actually exist */
1230     GST_DEBUG_OBJECT (stream, "Converting error for last fragment to EOS");
1231     gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1232     return;
1233   } else {
1234     /* retry same segment */
1235     if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
1236       gst_adaptive_demux2_stream_error (stream);
1237       return;
1238     }
1239     goto again;
1240   }
1241
1242 again:
1243   /* wait a short time in case the server needs a bit to recover */
1244   GST_LOG_OBJECT (stream,
1245       "Scheduling delayed load_a_fragment() call to retry in 10 milliseconds");
1246   g_assert (stream->pending_cb_id == 0);
1247   stream->pending_cb_id = gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task, 10 * GST_MSECOND,  /* Retry in 10 ms */
1248       (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1249       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1250 }
1251
1252 static void
1253 update_stream_bitrate (GstAdaptiveDemux2Stream * stream,
1254     DownloadRequest * request)
1255 {
1256   GstClockTimeDiff last_download_duration;
1257   guint64 fragment_bytes_downloaded = request->content_received;
1258
1259   /* The stream last_download time tracks the full download time for now */
1260   stream->last_download_time =
1261       GST_CLOCK_DIFF (request->download_request_time,
1262       request->download_end_time);
1263
1264   /* Here we only track the time the data took to arrive and ignore request delay, so we can estimate bitrate */
1265   last_download_duration =
1266       GST_CLOCK_DIFF (request->download_start_time, request->download_end_time);
1267
1268   /* If the entire response arrived in the first buffer
1269    * though, include the request time to get a valid
1270    * bitrate estimate */
1271   if (last_download_duration < 2 * stream->last_download_time)
1272     last_download_duration = stream->last_download_time;
1273
1274   if (last_download_duration > 0) {
1275     stream->last_bitrate =
1276         gst_util_uint64_scale (fragment_bytes_downloaded,
1277         8 * GST_SECOND, last_download_duration);
1278
1279     GST_DEBUG_OBJECT (stream,
1280         "Updated stream bitrate. %" G_GUINT64_FORMAT
1281         " bytes. download time %" GST_TIME_FORMAT " bitrate %"
1282         G_GUINT64_FORMAT " bps", fragment_bytes_downloaded,
1283         GST_TIME_ARGS (last_download_duration), stream->last_bitrate);
1284   }
1285 }
1286
1287 static void
1288 on_download_progress (DownloadRequest * request, DownloadRequestState state,
1289     GstAdaptiveDemux2Stream * stream)
1290 {
1291   GstAdaptiveDemux *demux = stream->demux;
1292   GstBuffer *buffer = download_request_take_buffer (request);
1293
1294   if (buffer) {
1295     GstFlowReturn ret;
1296
1297     GST_DEBUG_OBJECT (stream,
1298         "Handling buffer of %" G_GSIZE_FORMAT
1299         " bytes of ongoing download progress - %" G_GUINT64_FORMAT " / %"
1300         G_GUINT64_FORMAT " bytes", gst_buffer_get_size (buffer),
1301         request->content_received, request->content_length);
1302
1303     /* Drop the request lock when parsing data. FIXME: Check and comment why this is needed */
1304     download_request_unlock (request);
1305     ret = gst_adaptive_demux2_stream_parse_buffer (stream, buffer);
1306     download_request_lock (request);
1307
1308     if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
1309       return;
1310
1311     if (ret != GST_FLOW_OK) {
1312       GST_DEBUG_OBJECT (stream,
1313           "Buffer parsing returned: %d %s. Aborting download", ret,
1314           gst_flow_get_name (ret));
1315
1316       if (!stream->downloading_header && !stream->downloading_index)
1317         update_stream_bitrate (stream, request);
1318
1319       downloadhelper_cancel_request (demux->download_helper, request);
1320
1321       /* cancellation is async, so recycle our download request to avoid races */
1322       download_request_unref (stream->download_request);
1323       stream->download_request = download_request_new ();
1324
1325       gst_adaptive_demux2_stream_finish_download (stream, ret, NULL);
1326     }
1327   }
1328 }
1329
1330 static void
1331 on_download_complete (DownloadRequest * request, DownloadRequestState state,
1332     GstAdaptiveDemux2Stream * stream)
1333 {
1334   GstFlowReturn ret = GST_FLOW_OK;
1335   GstBuffer *buffer;
1336
1337   stream->download_active = FALSE;
1338
1339   if (G_UNLIKELY (stream->cancelled))
1340     return;
1341
1342   GST_DEBUG_OBJECT (stream,
1343       "Stream %p %s download for %s is complete with state %d",
1344       stream, uritype (stream), request->uri, request->state);
1345
1346   /* Update bitrate for fragment downloads */
1347   if (!stream->downloading_header && !stream->downloading_index)
1348     update_stream_bitrate (stream, request);
1349
1350   buffer = download_request_take_buffer (request);
1351   if (buffer)
1352     ret = gst_adaptive_demux2_stream_parse_buffer (stream, buffer);
1353
1354   GST_DEBUG_OBJECT (stream,
1355       "%s download finished: %s ret %d %s. Stream state %d", uritype (stream),
1356       request->uri, ret, gst_flow_get_name (ret), stream->state);
1357
1358   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
1359     return;
1360
1361   g_assert (stream->pending_cb_id == 0);
1362   gst_adaptive_demux2_stream_finish_download (stream, ret, NULL);
1363 }
1364
1365 /* must be called from the scheduler context
1366  *
1367  * Will submit the request only, which will complete asynchronously
1368  */
1369 static GstFlowReturn
1370 gst_adaptive_demux2_stream_begin_download_uri (GstAdaptiveDemux * demux,
1371     GstAdaptiveDemux2Stream * stream, const gchar * uri, gint64 start,
1372     gint64 end)
1373 {
1374   DownloadRequest *request = stream->download_request;
1375
1376   GST_DEBUG_OBJECT (demux,
1377       "Downloading %s uri: %s, range:%" G_GINT64_FORMAT " - %" G_GINT64_FORMAT,
1378       uritype (stream), uri, start, end);
1379
1380   if (!gst_adaptive_demux2_stream_create_parser (stream))
1381     return GST_FLOW_ERROR;
1382
1383   /* Configure our download request */
1384   download_request_set_uri (request, uri, start, end);
1385
1386   if (stream->downloading_header || stream->downloading_index) {
1387     download_request_set_callbacks (request,
1388         (DownloadRequestEventCallback) on_download_complete,
1389         (DownloadRequestEventCallback) on_download_error,
1390         (DownloadRequestEventCallback) on_download_cancellation,
1391         (DownloadRequestEventCallback) NULL, stream);
1392   } else {
1393     download_request_set_callbacks (request,
1394         (DownloadRequestEventCallback) on_download_complete,
1395         (DownloadRequestEventCallback) on_download_error,
1396         (DownloadRequestEventCallback) on_download_cancellation,
1397         (DownloadRequestEventCallback) on_download_progress, stream);
1398   }
1399
1400   if (!downloadhelper_submit_request (demux->download_helper,
1401           demux->manifest_uri, DOWNLOAD_FLAG_NONE, request, NULL))
1402     return GST_FLOW_ERROR;
1403
1404   stream->download_active = TRUE;
1405
1406   return GST_FLOW_OK;
1407 }
1408
1409 /* must be called from the scheduler context */
1410 static GstFlowReturn
1411 gst_adaptive_demux2_stream_download_fragment (GstAdaptiveDemux2Stream * stream)
1412 {
1413   GstAdaptiveDemux *demux = stream->demux;
1414   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1415   gchar *url = NULL;
1416
1417   /* FIXME :  */
1418   /* THERE ARE THREE DIFFERENT VARIABLES FOR THE "BEGINNING" OF A FRAGMENT ! */
1419   if (stream->starting_fragment) {
1420     GST_DEBUG_OBJECT (stream, "Downloading %s%s%s",
1421         stream->fragment.uri ? "FRAGMENT " : "",
1422         stream->need_header && stream->fragment.header_uri ? "HEADER " : "",
1423         stream->need_index && stream->fragment.index_uri ? "INDEX" : "");
1424
1425     if (stream->fragment.uri == NULL && stream->fragment.header_uri == NULL &&
1426         stream->fragment.index_uri == NULL)
1427       goto no_url_error;
1428
1429     stream->first_fragment_buffer = TRUE;
1430     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING;
1431   }
1432
1433   if (stream->need_header && stream->fragment.header_uri != NULL) {
1434
1435     /* Set the need_index flag when we start the header if we'll also need the index */
1436     stream->need_index = (stream->fragment.index_uri != NULL);
1437
1438     GST_DEBUG_OBJECT (stream, "Fetching header %s %" G_GINT64_FORMAT "-%"
1439         G_GINT64_FORMAT, stream->fragment.header_uri,
1440         stream->fragment.header_range_start, stream->fragment.header_range_end);
1441
1442     stream->downloading_header = TRUE;
1443
1444     return gst_adaptive_demux2_stream_begin_download_uri (demux, stream,
1445         stream->fragment.header_uri, stream->fragment.header_range_start,
1446         stream->fragment.header_range_end);
1447   }
1448
1449   /* check if we have an index */
1450   if (stream->need_index && stream->fragment.index_uri != NULL) {
1451     GST_DEBUG_OBJECT (stream,
1452         "Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
1453         stream->fragment.index_uri,
1454         stream->fragment.index_range_start, stream->fragment.index_range_end);
1455
1456     stream->downloading_index = TRUE;
1457
1458     return gst_adaptive_demux2_stream_begin_download_uri (demux, stream,
1459         stream->fragment.index_uri, stream->fragment.index_range_start,
1460         stream->fragment.index_range_end);
1461   }
1462
1463   url = stream->fragment.uri;
1464   GST_DEBUG_OBJECT (stream, "Got url '%s' for stream %p", url, stream);
1465   if (!url)
1466     return GST_FLOW_OK;
1467
1468   /* Download the actual fragment, either in chunks or in one go */
1469   stream->first_fragment_buffer = TRUE;
1470
1471   if (klass->need_another_chunk && klass->need_another_chunk (stream)
1472       && stream->fragment.chunk_size != 0) {
1473     /* Handle chunk downloading */
1474     gint64 range_start = stream->fragment.range_start;
1475     gint64 range_end = stream->fragment.range_end;
1476     gint chunk_size = stream->fragment.chunk_size;
1477     gint64 chunk_end;
1478
1479     /* HTTP ranges are inclusive for the end */
1480     if (chunk_size != -1) {
1481       chunk_end = range_start + chunk_size - 1;
1482       if (range_end != -1 && range_end < chunk_end)
1483         chunk_end = range_end;
1484     } else {
1485       chunk_end = range_end;
1486     }
1487
1488     GST_DEBUG_OBJECT (stream,
1489         "Starting chunked download %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
1490         url, range_start, chunk_end);
1491     return gst_adaptive_demux2_stream_begin_download_uri (demux, stream, url,
1492         range_start, chunk_end);
1493   }
1494
1495   /* regular single chunk download */
1496   stream->fragment.chunk_size = 0;
1497
1498   return gst_adaptive_demux2_stream_begin_download_uri (demux, stream, url,
1499       stream->fragment.range_start, stream->fragment.range_end);
1500
1501 no_url_error:
1502   {
1503     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1504         (_("Failed to get fragment URL.")),
1505         ("An error happened when getting fragment URL"));
1506     return GST_FLOW_ERROR;
1507   }
1508 }
1509
1510 static gboolean
1511 gst_adaptive_demux2_stream_push_event (GstAdaptiveDemux2Stream * stream,
1512     GstEvent * event)
1513 {
1514   gboolean ret = TRUE;
1515   GstPad *pad;
1516
1517   /* If there's a parsebin, push the event through it */
1518   if (stream->parsebin_sink != NULL) {
1519     pad = gst_object_ref (stream->parsebin_sink);
1520     GST_DEBUG_OBJECT (pad, "Pushing event %" GST_PTR_FORMAT, event);
1521     ret = gst_pad_send_event (pad, gst_event_ref (event));
1522     gst_object_unref (pad);
1523   }
1524
1525   /* If the event is EOS, ensure that all tracks are EOS. This catches
1526    * the case where the parsebin hasn't parsed anything yet (we switched
1527    * to a never before used track right near EOS, or it didn't parse enough
1528    * to create pads and be able to send EOS through to the tracks.
1529    *
1530    * We don't need to care about any other events
1531    */
1532   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
1533     GList *iter;
1534
1535     for (iter = stream->tracks; iter; iter = iter->next) {
1536       GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
1537       ret &= gst_pad_send_event (track->sinkpad, gst_event_ref (event));
1538     }
1539   }
1540
1541   gst_event_unref (event);
1542   return ret;
1543 }
1544
1545 static void
1546 gst_adaptive_demux2_stream_error (GstAdaptiveDemux2Stream * stream)
1547 {
1548   GstAdaptiveDemux *demux = stream->demux;
1549   GstMessage *msg;
1550   GstStructure *details;
1551
1552   details = gst_structure_new_empty ("details");
1553   gst_structure_set (details, "http-status-code", G_TYPE_UINT,
1554       stream->last_status_code, NULL);
1555
1556   stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_ERRORED;
1557
1558   if (stream->last_error) {
1559     gchar *debug = g_strdup_printf ("Error on stream %s",
1560         GST_OBJECT_NAME (stream));
1561     msg =
1562         gst_message_new_error_with_details (GST_OBJECT_CAST (demux),
1563         stream->last_error, debug, details);
1564     GST_ERROR_OBJECT (stream, "Download error: %s",
1565         stream->last_error->message);
1566     g_free (debug);
1567   } else {
1568     GError *err = g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_NOT_FOUND,
1569         _("Couldn't download fragments"));
1570     msg =
1571         gst_message_new_error_with_details (GST_OBJECT_CAST (demux), err,
1572         "Fragment downloading has failed consecutive times", details);
1573     g_error_free (err);
1574     GST_ERROR_OBJECT (stream,
1575         "Download error: Couldn't download fragments, too many failures");
1576   }
1577
1578   gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
1579 }
1580
1581 /* Called when a stream reaches the end of a playback segment */
1582 static void
1583 gst_adaptive_demux2_stream_end_of_manifest (GstAdaptiveDemux2Stream * stream)
1584 {
1585   GstAdaptiveDemux *demux = stream->demux;
1586   GstFlowReturn combined =
1587       gst_adaptive_demux_period_combine_stream_flows (demux->input_period);
1588
1589   GST_DEBUG_OBJECT (stream, "Combined flow %s", gst_flow_get_name (combined));
1590
1591   if (gst_adaptive_demux_has_next_period (demux)) {
1592     if (combined == GST_FLOW_EOS) {
1593       GST_DEBUG_OBJECT (stream, "Next period available, advancing");
1594       gst_adaptive_demux_advance_period (demux);
1595     } else {
1596       /* Ensure the 'has_next_period' flag is set on the period before
1597        * pushing EOS to the stream, so that the output loop knows not
1598        * to actually output the event */
1599       GST_DEBUG_OBJECT (stream, "Marking current period has a next one");
1600       demux->input_period->has_next_period = TRUE;
1601     }
1602   }
1603
1604   if (demux->priv->outputs) {
1605     GstEvent *eos = gst_event_new_eos ();
1606
1607     GST_DEBUG_OBJECT (stream, "Stream is EOS. Stopping.");
1608     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS;
1609
1610     gst_event_set_seqnum (eos, stream->demux->priv->segment_seqnum);
1611     gst_adaptive_demux2_stream_push_event (stream, eos);
1612   } else {
1613     GST_ERROR_OBJECT (demux, "Can't push EOS on non-exposed pad");
1614     gst_adaptive_demux2_stream_error (stream);
1615   }
1616 }
1617
1618 static gboolean
1619 gst_adaptive_demux2_stream_reload_manifest_cb (GstAdaptiveDemux2Stream * stream)
1620 {
1621   GstAdaptiveDemux *demux = stream->demux;
1622
1623   gboolean is_live = gst_adaptive_demux_is_live (demux);
1624
1625   stream->pending_cb_id = 0;
1626
1627   /* Refetch the playlist now after we waited */
1628   /* FIXME: Make this manifest update async and handle it on completion */
1629   if (!is_live && gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
1630     GST_DEBUG_OBJECT (demux, "Updated the playlist");
1631   }
1632
1633   /* We were called here from a timeout, so if the load function wants to loop
1634    * again, schedule an immediate callback but return G_SOURCE_REMOVE either
1635    * way */
1636   while (gst_adaptive_demux2_stream_next_download (stream));
1637
1638   return G_SOURCE_REMOVE;
1639 }
1640
1641 static gboolean
1642 gst_adaptive_demux2_stream_on_output_space_available_cb (GstAdaptiveDemux2Stream
1643     * stream)
1644 {
1645   /* If the state already moved on, the stream was stopped, or another track
1646    * already woke up and needed data */
1647   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE)
1648     return G_SOURCE_REMOVE;
1649
1650   GstAdaptiveDemux *demux = stream->demux;
1651   TRACKS_LOCK (demux);
1652
1653   GList *iter;
1654   for (iter = stream->tracks; iter; iter = iter->next) {
1655     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
1656
1657     /* We need to recompute the track's level_time value, as the
1658      * global output position may have advanced and reduced the
1659      * value, even without anything being dequeued yet */
1660     gst_adaptive_demux_track_update_level_locked (track);
1661
1662     GST_DEBUG_OBJECT (stream, "track %s woken level %" GST_TIME_FORMAT
1663         " input position %" GST_TIME_FORMAT " at %" GST_TIME_FORMAT,
1664         track->stream_id, GST_TIME_ARGS (track->level_time),
1665         GST_TIME_ARGS (track->input_time),
1666         GST_TIME_ARGS (demux->priv->global_output_position));
1667   }
1668   TRACKS_UNLOCK (demux);
1669
1670   while (gst_adaptive_demux2_stream_load_a_fragment (stream));
1671
1672   return G_SOURCE_REMOVE;
1673 }
1674
1675 void
1676 gst_adaptive_demux2_stream_on_output_space_available (GstAdaptiveDemux2Stream *
1677     stream)
1678 {
1679   GstAdaptiveDemux *demux = stream->demux;
1680
1681   stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
1682
1683   GST_LOG_OBJECT (stream, "Scheduling output_space_available() call");
1684
1685   gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
1686       (GSourceFunc) gst_adaptive_demux2_stream_on_output_space_available_cb,
1687       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1688 }
1689
1690 void
1691 gst_adaptive_demux2_stream_on_manifest_update (GstAdaptiveDemux2Stream * stream)
1692 {
1693   GstAdaptiveDemux *demux = stream->demux;
1694
1695   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE)
1696     return;
1697
1698   g_assert (stream->pending_cb_id == 0);
1699
1700   GST_LOG_OBJECT (stream, "Scheduling load_a_fragment() call");
1701   stream->pending_cb_id =
1702       gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
1703       (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1704       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1705 }
1706
1707 static void
1708 gst_adaptive_demux2_stream_handle_playlist_eos (GstAdaptiveDemux2Stream *
1709     stream)
1710 {
1711   GstAdaptiveDemux *demux = stream->demux;
1712
1713   if (gst_adaptive_demux_is_live (demux) && (demux->segment.rate == 1.0
1714           || gst_adaptive_demux2_stream_in_live_seek_range (demux, stream))) {
1715
1716     if (!gst_adaptive_demux_has_next_period (demux)) {
1717       /* Wait only if we can ensure current manifest has been expired.
1718        * The meaning "we have next period" *WITH* EOS is that, current
1719        * period has been ended but we can continue to the next period */
1720       GST_DEBUG_OBJECT (stream,
1721           "Live playlist EOS - waiting for manifest update");
1722       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE;
1723       /* Clear the stream last_ret EOS state, since we're not actually EOS */
1724       if (stream->last_ret == GST_FLOW_EOS)
1725         stream->last_ret = GST_FLOW_OK;
1726       gst_adaptive_demux2_stream_wants_manifest_update (demux);
1727       return;
1728     }
1729
1730     if (stream->replaced)
1731       return;
1732   }
1733
1734   gst_adaptive_demux2_stream_end_of_manifest (stream);
1735 }
1736
1737 static gboolean
1738 gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream)
1739 {
1740   GstAdaptiveDemux *demux = stream->demux;
1741   gboolean live = gst_adaptive_demux_is_live (demux);
1742   GstFlowReturn ret = GST_FLOW_OK;
1743
1744   stream->pending_cb_id = 0;
1745
1746   GST_LOG_OBJECT (stream, "entering, state = %d.", stream->state);
1747
1748   switch (stream->state) {
1749     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART:
1750     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT:
1751     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_LIVE:
1752     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE:
1753     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE:
1754       /* Get information about the fragment to download */
1755       GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1756       ret = gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
1757       GST_DEBUG_OBJECT (stream,
1758           "Fragment info update result: %d %s", ret, gst_flow_get_name (ret));
1759
1760       if (ret == GST_FLOW_OK)
1761         stream->starting_fragment = TRUE;
1762       break;
1763     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING:
1764       break;
1765     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS:
1766       GST_ERROR_OBJECT (stream,
1767           "Unexpected stream state EOS. The stream should not be running now.");
1768       return FALSE;
1769     default:
1770       GST_ERROR_OBJECT (stream, "Unexpected stream state %d", stream->state);
1771       g_assert_not_reached ();
1772       break;
1773   }
1774
1775   if (ret == GST_FLOW_OK) {
1776     /* Wait for room in the output tracks */
1777     if (gst_adaptive_demux2_stream_wait_for_output_space (demux, stream,
1778             stream->fragment.duration)) {
1779       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE;
1780       return FALSE;
1781     }
1782   }
1783
1784   if (ret == GST_FLOW_OK) {
1785     /* wait for live fragments to be available */
1786     if (live) {
1787       GstClockTime wait_time =
1788           gst_adaptive_demux2_stream_get_fragment_waiting_time (demux, stream);
1789       if (wait_time > 0) {
1790         GST_DEBUG_OBJECT (stream,
1791             "Download waiting for %" GST_TIME_FORMAT,
1792             GST_TIME_ARGS (wait_time));
1793
1794         stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_LIVE;
1795
1796         GST_LOG_OBJECT (stream, "Scheduling delayed load_a_fragment() call");
1797         g_assert (stream->pending_cb_id == 0);
1798         stream->pending_cb_id =
1799             gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1800             wait_time, (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1801             gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1802         return FALSE;
1803       }
1804     }
1805
1806     if (gst_adaptive_demux2_stream_download_fragment (stream) != GST_FLOW_OK) {
1807       GST_ERROR_OBJECT (demux,
1808           "Failed to begin fragment download for stream %p", stream);
1809       return FALSE;
1810     }
1811   }
1812
1813   /* Cast to int avoids a compiler warning that
1814    * GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC is not in the GstFlowReturn enum */
1815   switch ((int) ret) {
1816     case GST_FLOW_OK:
1817       break;                    /* all is good, let's go */
1818     case GST_FLOW_EOS:
1819       GST_DEBUG_OBJECT (stream, "EOS, checking to stop download loop");
1820       stream->last_ret = ret;
1821       gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1822       return FALSE;
1823     case GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC:
1824       GST_DEBUG_OBJECT (stream, "Lost sync, asking reset to current position");
1825       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
1826       gst_adaptive_demux_handle_lost_sync (demux);
1827       return FALSE;
1828     case GST_FLOW_NOT_LINKED:
1829     {
1830       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS;
1831
1832       if (gst_adaptive_demux_period_combine_stream_flows (demux->input_period)
1833           == GST_FLOW_NOT_LINKED) {
1834         GST_ELEMENT_FLOW_ERROR (demux, ret);
1835       }
1836     }
1837       break;
1838
1839     case GST_FLOW_FLUSHING:
1840       /* Flushing is normal, the target track might have been unselected */
1841       if (G_UNLIKELY (stream->cancelled))
1842         return FALSE;
1843
1844     default:
1845       if (ret <= GST_FLOW_ERROR) {
1846         GST_WARNING_OBJECT (demux, "Error while downloading fragment");
1847         if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
1848           gst_adaptive_demux2_stream_error (stream);
1849           return FALSE;
1850         }
1851
1852         g_clear_error (&stream->last_error);
1853
1854         /* First try to update the playlist for non-live playlists
1855          * in case the URIs have changed in the meantime. But only
1856          * try it the first time, after that we're going to wait a
1857          * a bit to not flood the server */
1858         if (stream->download_error_count == 1
1859             && !gst_adaptive_demux_is_live (demux)) {
1860           /* TODO hlsdemux had more options to this function (boolean and err) */
1861           if (gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
1862             /* Retry immediately, the playlist actually has changed */
1863             GST_DEBUG_OBJECT (demux, "Updated the playlist");
1864             return TRUE;
1865           }
1866         }
1867
1868         /* Wait half the fragment duration before retrying */
1869         GST_LOG_OBJECT (stream, "Scheduling delayed reload_manifest_cb() call");
1870         g_assert (stream->pending_cb_id == 0);
1871         stream->pending_cb_id =
1872             gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1873             stream->fragment.duration / 2,
1874             (GSourceFunc) gst_adaptive_demux2_stream_reload_manifest_cb,
1875             gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1876         return FALSE;
1877       }
1878       break;
1879   }
1880
1881   return FALSE;
1882 }
1883
1884 static gboolean
1885 gst_adaptive_demux2_stream_next_download (GstAdaptiveDemux2Stream * stream)
1886 {
1887   GstAdaptiveDemux *demux = stream->demux;
1888   gboolean end_of_manifest = FALSE;
1889
1890   GST_LOG_OBJECT (stream, "Looking for next download");
1891
1892   /* Restarting download, figure out new position
1893    * FIXME : Move this to a separate function ? */
1894   if (G_UNLIKELY (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART)) {
1895     GstClockTimeDiff stream_time = 0;
1896
1897     GST_DEBUG_OBJECT (stream, "Activating stream after restart");
1898
1899     if (stream->parsebin_sink != NULL) {
1900       /* If the parsebin already exists, we need to clear it out (if it doesn't,
1901        * this is the first time we've used this stream, so it's all good) */
1902       gst_adaptive_demux2_stream_push_event (stream,
1903           gst_event_new_flush_start ());
1904       gst_adaptive_demux2_stream_push_event (stream,
1905           gst_event_new_flush_stop (FALSE));
1906     }
1907
1908     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1909     stream_time = stream->start_position;
1910
1911     GST_DEBUG_OBJECT (stream, "Restarting stream at "
1912         "stream position %" GST_STIME_FORMAT, GST_STIME_ARGS (stream_time));
1913
1914     if (GST_CLOCK_STIME_IS_VALID (stream_time)) {
1915       /* TODO check return */
1916       gst_adaptive_demux2_stream_seek (demux, stream, demux->segment.rate >= 0,
1917           0, stream_time, &stream_time);
1918       stream->current_position = stream->start_position;
1919
1920       GST_DEBUG_OBJECT (stream,
1921           "stream_time after restart seek: %" GST_STIME_FORMAT
1922           " position %" GST_STIME_FORMAT, GST_STIME_ARGS (stream_time),
1923           GST_STIME_ARGS (stream->current_position));
1924     }
1925
1926     /* Trigger (re)computation of the parsebin input segment */
1927     stream->compute_segment = TRUE;
1928
1929     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1930
1931     stream->discont = TRUE;
1932     stream->need_header = TRUE;
1933     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
1934   }
1935
1936   /* Check if we're done with our segment */
1937   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1938   if (demux->segment.rate > 0) {
1939     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop)
1940         && stream->current_position >= demux->segment.stop) {
1941       end_of_manifest = TRUE;
1942     }
1943   } else {
1944     if (GST_CLOCK_TIME_IS_VALID (demux->segment.start)
1945         && stream->current_position <= demux->segment.start) {
1946       end_of_manifest = TRUE;
1947     }
1948   }
1949   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1950
1951   if (end_of_manifest) {
1952     gst_adaptive_demux2_stream_end_of_manifest (stream);
1953     return FALSE;
1954   }
1955   return gst_adaptive_demux2_stream_load_a_fragment (stream);
1956 }
1957
1958 static gboolean
1959 gst_adaptive_demux2_stream_can_start (GstAdaptiveDemux2Stream * stream)
1960 {
1961   GstAdaptiveDemux *demux = stream->demux;
1962   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1963
1964   if (!klass->stream_can_start)
1965     return TRUE;
1966   return klass->stream_can_start (demux, stream);
1967 }
1968
1969 /**
1970  * gst_adaptive_demux2_stream_start:
1971  * @stream: a #GstAdaptiveDemux2Stream
1972  *
1973  * Start the given @stream. Should be called by subclasses that previously
1974  * returned %FALSE in `GstAdaptiveDemux::stream_can_start()`
1975  */
1976 void
1977 gst_adaptive_demux2_stream_start (GstAdaptiveDemux2Stream * stream)
1978 {
1979   GstAdaptiveDemux *demux;
1980
1981   g_return_if_fail (stream && stream->demux);
1982
1983   demux = stream->demux;
1984
1985   if (stream->pending_cb_id != 0 || stream->download_active) {
1986     /* There is already something active / pending on this stream */
1987     GST_LOG_OBJECT (stream, "Stream already running");
1988     return;
1989   }
1990
1991   /* Some streams require a delayed start, i.e. they need more information
1992    * before they can actually be started */
1993   if (!gst_adaptive_demux2_stream_can_start (stream)) {
1994     GST_LOG_OBJECT (stream, "Stream will be started asynchronously");
1995     return;
1996   }
1997
1998   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS) {
1999     GST_LOG_OBJECT (stream, "Stream is EOS already");
2000     return;
2001   }
2002
2003   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED ||
2004       stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART) {
2005     GST_LOG_OBJECT (stream, "Activating stream. Current state %d",
2006         stream->state);
2007     stream->cancelled = FALSE;
2008     stream->replaced = FALSE;
2009     stream->last_ret = GST_FLOW_OK;
2010
2011     if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)
2012       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
2013   }
2014
2015   GST_LOG_OBJECT (stream, "Scheduling next_download() call");
2016   stream->pending_cb_id =
2017       gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2018       (GSourceFunc) gst_adaptive_demux2_stream_next_download,
2019       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
2020 }
2021
2022 void
2023 gst_adaptive_demux2_stream_stop (GstAdaptiveDemux2Stream * stream)
2024 {
2025   GstAdaptiveDemux *demux = stream->demux;
2026
2027   GST_DEBUG_OBJECT (stream, "Stopping stream (from state %d)", stream->state);
2028   stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
2029
2030   if (stream->pending_cb_id != 0) {
2031     gst_adaptive_demux_loop_cancel_call (demux->priv->scheduler_task,
2032         stream->pending_cb_id);
2033     stream->pending_cb_id = 0;
2034   }
2035
2036   /* Cancel and drop the existing download request */
2037   downloadhelper_cancel_request (demux->download_helper,
2038       stream->download_request);
2039   download_request_unref (stream->download_request);
2040   stream->downloading_header = stream->downloading_index = FALSE;
2041   stream->download_request = download_request_new ();
2042   stream->download_active = FALSE;
2043
2044   stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
2045 }
2046
2047 gboolean
2048 gst_adaptive_demux2_stream_is_running (GstAdaptiveDemux2Stream * stream)
2049 {
2050   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)
2051     return FALSE;
2052   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART)
2053     return FALSE;
2054   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS)
2055     return FALSE;
2056   return TRUE;
2057 }
2058
2059 gboolean
2060 gst_adaptive_demux2_stream_is_selected_locked (GstAdaptiveDemux2Stream * stream)
2061 {
2062   GList *tmp;
2063
2064   for (tmp = stream->tracks; tmp; tmp = tmp->next) {
2065     GstAdaptiveDemuxTrack *track = tmp->data;
2066     if (track->selected)
2067       return TRUE;
2068   }
2069
2070   return FALSE;
2071 }
2072
2073 /**
2074  * gst_adaptive_demux2_stream_is_selected:
2075  * @stream: A #GstAdaptiveDemux2Stream
2076  *
2077  * Returns: %TRUE if any of the tracks targetted by @stream is selected
2078  */
2079 gboolean
2080 gst_adaptive_demux2_stream_is_selected (GstAdaptiveDemux2Stream * stream)
2081 {
2082   gboolean ret;
2083
2084   g_return_val_if_fail (stream && stream->demux, FALSE);
2085
2086   TRACKS_LOCK (stream->demux);
2087   ret = gst_adaptive_demux2_stream_is_selected_locked (stream);
2088   TRACKS_UNLOCK (stream->demux);
2089
2090   return ret;
2091 }