adaptivedemux2-stream: Silence a compiler warning
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / ext / adaptivedemux2 / gstadaptivedemux-stream.c
1 /* GStreamer
2  *
3  * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4  *   Author: Thiago Santos <thiagoss@osg.samsung.com>
5  *
6  * Copyright (C) 2021-2022 Centricular Ltd
7  *   Author: Edward Hervey <edward@centricular.com>
8  *   Author: Jan Schmidt <jan@centricular.com>
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29
30 #include "gstadaptivedemux.h"
31 #include "gstadaptivedemux-private.h"
32
33 #include <glib/gi18n-lib.h>
34 #include <gst/app/gstappsrc.h>
35
36 GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
37 #define GST_CAT_DEFAULT adaptivedemux2_debug
38
39 static void gst_adaptive_demux2_stream_finalize (GObject * object);
40 static void gst_adaptive_demux2_stream_error (GstAdaptiveDemux2Stream * stream);
41
42 #define gst_adaptive_demux2_stream_parent_class parent_class
43 G_DEFINE_ABSTRACT_TYPE (GstAdaptiveDemux2Stream, gst_adaptive_demux2_stream,
44     GST_TYPE_OBJECT);
45
46 static void
47 gst_adaptive_demux2_stream_class_init (GstAdaptiveDemux2StreamClass * klass)
48 {
49   GObjectClass *gobject_class = (GObjectClass *) klass;
50
51   gobject_class->finalize = gst_adaptive_demux2_stream_finalize;
52 }
53
54 static GType tsdemux_type = 0;
55
56 static void
57 gst_adaptive_demux2_stream_init (GstAdaptiveDemux2Stream * stream)
58 {
59   stream->download_request = download_request_new ();
60   stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
61   stream->last_ret = GST_FLOW_OK;
62
63   stream->fragment_bitrates =
64       g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS);
65
66   stream->start_position = stream->current_position = GST_CLOCK_TIME_NONE;
67
68   gst_segment_init (&stream->parse_segment, GST_FORMAT_TIME);
69 }
70
71 /* must be called with manifest_lock taken.
72  * It will temporarily drop the manifest_lock in order to join the task.
73  * It will join only the old_streams (the demux->streams are joined by
74  * gst_adaptive_demux_stop_tasks before gst_adaptive_demux2_stream_free is
75  * called)
76  */
77 static void
78 gst_adaptive_demux2_stream_finalize (GObject * object)
79 {
80   GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) object;
81
82   GST_LOG_OBJECT (object, "Finalizing");
83
84   if (stream->download_request)
85     download_request_unref (stream->download_request);
86
87   stream->cancelled = TRUE;
88   g_clear_error (&stream->last_error);
89
90   gst_adaptive_demux2_stream_fragment_clear (&stream->fragment);
91
92   if (stream->pending_events) {
93     g_list_free_full (stream->pending_events, (GDestroyNotify) gst_event_unref);
94     stream->pending_events = NULL;
95   }
96
97   if (stream->parsebin_sink) {
98     gst_object_unref (stream->parsebin_sink);
99     stream->parsebin_sink = NULL;
100   }
101
102   if (stream->pad_added_id)
103     g_signal_handler_disconnect (stream->parsebin, stream->pad_added_id);
104   if (stream->pad_removed_id)
105     g_signal_handler_disconnect (stream->parsebin, stream->pad_removed_id);
106
107   if (stream->parsebin != NULL) {
108     GST_LOG_OBJECT (stream, "Removing parsebin");
109     gst_bin_remove (GST_BIN_CAST (stream->demux), stream->parsebin);
110     gst_element_set_state (stream->parsebin, GST_STATE_NULL);
111     gst_object_unref (stream->parsebin);
112     stream->parsebin = NULL;
113   }
114
115   g_free (stream->fragment_bitrates);
116
117   g_list_free_full (stream->tracks,
118       (GDestroyNotify) gst_adaptive_demux_track_unref);
119
120   if (stream->pending_caps)
121     gst_caps_unref (stream->pending_caps);
122
123   g_clear_pointer (&stream->pending_tags, gst_tag_list_unref);
124   g_clear_pointer (&stream->stream_collection, gst_object_unref);
125
126   G_OBJECT_CLASS (parent_class)->finalize (object);
127 }
128
129 /**
130  * gst_adaptive_demux2_stream_add_track:
131  * @stream: A #GstAdaptiveDemux2Stream
132  * @track: (transfer none): A #GstAdaptiveDemuxTrack to assign to the @stream
133  *
134  * This function is called when a subclass knows of a target @track that this
135  * @stream can provide.
136  */
137 gboolean
138 gst_adaptive_demux2_stream_add_track (GstAdaptiveDemux2Stream * stream,
139     GstAdaptiveDemuxTrack * track)
140 {
141   g_return_val_if_fail (track != NULL, FALSE);
142
143   GST_DEBUG_OBJECT (stream->demux, "stream:%p track:%s", stream,
144       track->stream_id);
145   if (g_list_find (stream->tracks, track)) {
146     GST_DEBUG_OBJECT (stream->demux,
147         "track '%s' already handled by this stream", track->stream_id);
148     return FALSE;
149   }
150
151   stream->tracks =
152       g_list_append (stream->tracks, gst_adaptive_demux_track_ref (track));
153   if (stream->demux) {
154     g_assert (stream->period);
155     gst_adaptive_demux_period_add_track (stream->period, track);
156   }
157   return TRUE;
158 }
159
160 static gboolean
161 gst_adaptive_demux2_stream_next_download (GstAdaptiveDemux2Stream * stream);
162 static gboolean
163 gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream);
164 static void
165 gst_adaptive_demux2_stream_handle_playlist_eos (GstAdaptiveDemux2Stream *
166     stream);
167 static GstFlowReturn
168 gst_adaptive_demux2_stream_begin_download_uri (GstAdaptiveDemux * demux,
169     GstAdaptiveDemux2Stream * stream, const gchar * uri, gint64 start,
170     gint64 end);
171
172 #ifndef GST_DISABLE_GST_DEBUG
173 static const char *
174 uritype (GstAdaptiveDemux2Stream * s)
175 {
176   if (s->downloading_header)
177     return "header";
178   if (s->downloading_index)
179     return "index";
180   return "fragment";
181 }
182 #endif
183
184 /* Schedules another chunked download (returns TRUE) or FALSE if no more chunks */
185 static gboolean
186 schedule_another_chunk (GstAdaptiveDemux2Stream * stream)
187 {
188   GstAdaptiveDemux *demux = stream->demux;
189   DownloadRequest *request = stream->download_request;
190   GstFlowReturn ret;
191
192   gchar *uri = request->uri;
193   gint64 range_start = request->range_start;
194   gint64 range_end = request->range_end;
195   gint64 chunk_size;
196   gint64 chunk_end;
197
198   if (range_end == -1)
199     return FALSE;               /* This was a request to the end, no more to load */
200
201   /* The size of the request that just completed: */
202   chunk_size = range_end + 1 - range_start;
203
204   if (request->content_received < chunk_size)
205     return FALSE;               /* Short read - we're done */
206
207   /* Accumulate the data we just fetched, to figure out the next
208    * request start position and update the target chunk size from
209    * the updated stream fragment info */
210   range_start += chunk_size;
211   range_end = stream->fragment.range_end;
212   chunk_size = stream->fragment.chunk_size;
213
214   if (chunk_size == 0)
215     return FALSE;               /* Sub-class doesn't want another chunk */
216
217   /* HTTP ranges are inclusive for the end */
218   if (chunk_size != -1) {
219     chunk_end = range_start + chunk_size - 1;
220     if (range_end != -1 && range_end < chunk_end)
221       chunk_end = range_end;
222   } else {
223     chunk_end = range_end;
224   }
225
226   GST_DEBUG_OBJECT (stream,
227       "Starting next chunk %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT
228       " chunk_size %" G_GINT64_FORMAT, uri, range_start, chunk_end, chunk_size);
229
230   ret =
231       gst_adaptive_demux2_stream_begin_download_uri (demux, stream, uri,
232       range_start, chunk_end);
233   if (ret != GST_FLOW_OK) {
234     GST_DEBUG_OBJECT (stream,
235         "Stopping stream due to begin download failure - ret %s",
236         gst_flow_get_name (ret));
237     gst_adaptive_demux2_stream_stop (stream);
238     return FALSE;
239   }
240
241   return TRUE;
242 }
243
244 static void
245 drain_inactive_tracks (GstAdaptiveDemux * demux,
246     GstAdaptiveDemux2Stream * stream)
247 {
248   GList *iter;
249
250   TRACKS_LOCK (demux);
251   for (iter = stream->tracks; iter; iter = iter->next) {
252     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
253     if (!track->selected) {
254       gst_adaptive_demux_track_drain_to (track,
255           demux->priv->global_output_position);
256     }
257   }
258
259   TRACKS_UNLOCK (demux);
260 }
261
262 /* Called to complete a download, either due to failure or completion
263  * Should set up the next download if necessary */
264 static void
265 gst_adaptive_demux2_stream_finish_download (GstAdaptiveDemux2Stream *
266     stream, GstFlowReturn ret, GError * err)
267 {
268   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux);
269   GstAdaptiveDemux *demux = stream->demux;
270
271   GST_DEBUG_OBJECT (stream,
272       "%s download finish: %d %s - err: %p", uritype (stream), ret,
273       gst_flow_get_name (ret), err);
274
275   stream->download_finished = TRUE;
276
277   /* finish_fragment might call gst_adaptive_demux2_stream_advance_fragment,
278    * which can look at the last_ret - so make sure it's stored before calling that.
279    * Also, for not-linked or other errors passed in that are going to make
280    * this stream stop, we'll need to store it */
281   stream->last_ret = ret;
282
283   if (err) {
284     g_clear_error (&stream->last_error);
285     stream->last_error = g_error_copy (err);
286   }
287
288   /* For actual errors, stop now, no need to call finish_fragment and get
289    * confused if it returns a non-error status, but if EOS was passed in,
290    * continue and check whether finish_fragment() says we've finished
291    * the whole manifest or just this fragment */
292   if (ret < 0 && ret != GST_FLOW_EOS) {
293     GST_INFO_OBJECT (stream,
294         "Stopping stream due to error ret %s", gst_flow_get_name (ret));
295     gst_adaptive_demux2_stream_stop (stream);
296     return;
297   }
298
299   /* Handle all the possible flow returns here: */
300   if (ret == GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC) {
301     /* We lost sync, seek back to live and return */
302     GST_WARNING_OBJECT (stream, "Lost sync when downloading");
303     gst_adaptive_demux_handle_lost_sync (demux);
304     return;
305   } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_END_OF_FRAGMENT) {
306     /* The sub-class wants to stop the fragment immediately */
307     stream->fragment.finished = TRUE;
308     ret = klass->finish_fragment (demux, stream);
309
310     GST_DEBUG_OBJECT (stream, "finish_fragment ret %d %s", ret,
311         gst_flow_get_name (ret));
312   } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_RESTART_FRAGMENT) {
313     GST_DEBUG_OBJECT (stream, "Restarting download as requested");
314     /* Just mark the fragment as finished */
315     stream->fragment.finished = TRUE;
316     ret = GST_FLOW_OK;
317   } else if (!klass->need_another_chunk || stream->fragment.chunk_size == -1
318       || !klass->need_another_chunk (stream)
319       || stream->fragment.chunk_size == 0) {
320     stream->fragment.finished = TRUE;
321     ret = klass->finish_fragment (stream->demux, stream);
322
323     GST_DEBUG_OBJECT (stream, "finish_fragment ret %d %s", ret,
324         gst_flow_get_name (ret));
325   } else if (stream->fragment.chunk_size != 0
326       && schedule_another_chunk (stream)) {
327     /* Another download has already begun, no need to queue anything below */
328     return;
329   }
330
331   /* For HLS, we might be enqueueing data into tracks that aren't
332    * selected. Drain those ones out */
333   drain_inactive_tracks (stream->demux, stream);
334
335   /* Now that we've called finish_fragment we can clear these flags the
336    * sub-class might have checked */
337   if (stream->downloading_header) {
338     stream->need_header = FALSE;
339     stream->downloading_header = FALSE;
340   } else if (stream->downloading_index) {
341     stream->need_index = FALSE;
342     stream->downloading_index = FALSE;
343     /* Restart the fragment again now that header + index were loaded
344      * so that get_fragment_info() will be called again */
345     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
346   } else {
347     /* Finishing a fragment data download. Try for another */
348     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
349   }
350
351   /* if GST_FLOW_EOS was passed in that means this download is finished,
352    * but it's the result returned from finish_fragment() we really care
353    * about, as that tells us if the manifest has run out of fragments
354    * to load */
355   if (ret == GST_FLOW_EOS) {
356     stream->last_ret = ret;
357
358     gst_adaptive_demux2_stream_handle_playlist_eos (stream);
359     return;
360   }
361
362   /* Now finally, if ret is anything other than success, we should stop this
363    * stream */
364   if (ret < 0) {
365     GST_DEBUG_OBJECT (stream,
366         "Stopping stream due to finish fragment ret %s",
367         gst_flow_get_name (ret));
368     gst_adaptive_demux2_stream_stop (stream);
369     return;
370   }
371
372   /* Clear the last_ret marker before starting a fresh download */
373   stream->last_ret = GST_FLOW_OK;
374
375   GST_LOG_OBJECT (stream, "Scheduling next_download() call");
376   stream->pending_cb_id =
377       gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
378       (GSourceFunc) gst_adaptive_demux2_stream_next_download,
379       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
380 }
381
382 /* Must be called from the scheduler context */
383 void
384 gst_adaptive_demux2_stream_parse_error (GstAdaptiveDemux2Stream * stream,
385     GError * err)
386 {
387   GstAdaptiveDemux *demux = stream->demux;
388
389   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
390     return;
391
392   downloadhelper_cancel_request (demux->download_helper,
393       stream->download_request);
394
395   /* cancellation is async, so recycle our download request to avoid races */
396   download_request_unref (stream->download_request);
397   stream->download_request = download_request_new ();
398
399   gst_adaptive_demux2_stream_finish_download (stream, GST_FLOW_CUSTOM_ERROR,
400       err);
401 }
402
403 static void
404 gst_adaptive_demux2_stream_prepare_segment (GstAdaptiveDemux * demux,
405     GstAdaptiveDemux2Stream * stream, gboolean first_and_live)
406 {
407   GstClockTime period_start = gst_adaptive_demux_get_period_start_time (demux);
408   GstClockTime offset =
409       gst_adaptive_demux2_stream_get_presentation_offset (demux, stream);
410
411   stream->parse_segment = demux->segment;
412
413   /* The demuxer segment is just built from seek events, but for each stream
414    * we have to adjust segments according to the current period and the
415    * stream specific presentation time offset.
416    *
417    * For each period, buffer timestamps start again from 0. Additionally the
418    * buffer timestamps are shifted by the stream specific presentation time
419    * offset, so the first buffer timestamp of a period is 0 + presentation
420    * time offset. If the stream contains timestamps itself, this is also
421    * supposed to be the presentation time stored inside the stream.
422    *
423    * The stream time over periods is supposed to be continuous, that is the
424    * buffer timestamp 0 + presentation time offset should map to the start
425    * time of the current period.
426    *
427    *
428    * The adjustment of the stream segments as such works the following.
429    *
430    * If the demuxer segment start is bigger than the period start, this
431    * means that we have to drop some media at the beginning of the current
432    * period, e.g. because a seek into the middle of the period has
433    * happened. The amount of media to drop is the difference between the
434    * period start and the demuxer segment start, and as each period starts
435    * again from 0, this difference is going to be the actual stream's
436    * segment start. As all timestamps of the stream are shifted by the
437    * presentation time offset, we will also have to move the segment start
438    * by that offset.
439    *
440    * Likewise, the demuxer segment stop value is adjusted in the same
441    * fashion.
442    *
443    * Now the running time and stream time at the stream's segment start has
444    * to be the one that is stored inside the demuxer's segment, which means
445    * that segment.base and segment.time have to be copied over (done just
446    * above)
447    *
448    *
449    * If the demuxer segment start is smaller than the period start time,
450    * this means that the whole period is inside the segment. As each period
451    * starts timestamps from 0, and additionally timestamps are shifted by
452    * the presentation time offset, the stream's first timestamp (and as such
453    * the stream's segment start) has to be the presentation time offset.
454    * The stream time at the segment start is supposed to be the stream time
455    * of the period start according to the demuxer segment, so the stream
456    * segment's time would be set to that. The same goes for the stream
457    * segment's base, which is supposed to be the running time of the period
458    * start according to the demuxer's segment.
459    *
460    * The same logic applies for negative rates with the segment stop and
461    * the period stop time (which gets clamped).
462    *
463    *
464    * For the first case where not the complete period is inside the segment,
465    * the segment time and base as calculated by the second case would be
466    * equivalent.
467    */
468   GST_DEBUG_OBJECT (demux, "Using demux segment %" GST_SEGMENT_FORMAT,
469       &demux->segment);
470   GST_DEBUG_OBJECT (demux,
471       "period_start: %" GST_TIME_FORMAT " offset: %" GST_TIME_FORMAT,
472       GST_TIME_ARGS (period_start), GST_TIME_ARGS (offset));
473   /* note for readers:
474    * Since stream->parse_segment is initially a copy of demux->segment,
475    * only the values that need updating are modified below. */
476   if (first_and_live) {
477     /* If first and live, demuxer did seek to the current position already */
478     stream->parse_segment.start = demux->segment.start - period_start + offset;
479     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
480       stream->parse_segment.stop = demux->segment.stop - period_start + offset;
481     /* FIXME : Do we need to handle negative rates for this ? */
482     stream->parse_segment.position = stream->parse_segment.start;
483   } else if (demux->segment.start > period_start) {
484     /* seek within a period */
485     stream->parse_segment.start = demux->segment.start - period_start + offset;
486     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
487       stream->parse_segment.stop = demux->segment.stop - period_start + offset;
488     if (stream->parse_segment.rate >= 0)
489       stream->parse_segment.position = offset;
490     else
491       stream->parse_segment.position = stream->parse_segment.stop;
492   } else {
493     stream->parse_segment.start = offset;
494     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
495       stream->parse_segment.stop = demux->segment.stop - period_start + offset;
496     if (stream->parse_segment.rate >= 0) {
497       stream->parse_segment.position = offset;
498       stream->parse_segment.base =
499           gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
500           period_start);
501     } else {
502       stream->parse_segment.position = stream->parse_segment.stop;
503       stream->parse_segment.base =
504           gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
505           period_start + demux->segment.stop - demux->segment.start);
506     }
507     stream->parse_segment.time =
508         gst_segment_to_stream_time (&demux->segment, GST_FORMAT_TIME,
509         period_start);
510   }
511
512   stream->send_segment = TRUE;
513
514   GST_DEBUG_OBJECT (stream, "Prepared segment %" GST_SEGMENT_FORMAT,
515       &stream->parse_segment);
516 }
517
518 /* Segment lock hold */
519 static void
520 update_buffer_pts_and_demux_position_locked (GstAdaptiveDemux * demux,
521     GstAdaptiveDemux2Stream * stream, GstBuffer * buffer)
522 {
523   GstClockTimeDiff pos;
524
525   GST_DEBUG_OBJECT (stream, "stream->fragment.stream_time %" GST_STIME_FORMAT,
526       GST_STIME_ARGS (stream->fragment.stream_time));
527
528   pos = stream->fragment.stream_time;
529
530   if (GST_CLOCK_STIME_IS_VALID (pos)) {
531     GstClockTime offset =
532         gst_adaptive_demux2_stream_get_presentation_offset (demux, stream);
533
534     pos += offset;
535
536     if (pos < 0) {
537       GST_WARNING_OBJECT (stream, "Clamping segment and buffer position to 0");
538       pos = 0;
539     }
540
541     GST_BUFFER_PTS (buffer) = pos;
542   } else {
543     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
544   }
545
546   GST_DEBUG_OBJECT (stream, "Buffer/stream position is now: %" GST_TIME_FORMAT,
547       GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
548 }
549
550 /* Must be called from the scheduler context */
551 GstFlowReturn
552 gst_adaptive_demux2_stream_push_buffer (GstAdaptiveDemux2Stream * stream,
553     GstBuffer * buffer)
554 {
555   GstAdaptiveDemux *demux = stream->demux;
556   GstFlowReturn ret = GST_FLOW_OK;
557   gboolean discont = FALSE;
558   /* Pending events */
559   GstEvent *pending_caps = NULL, *pending_segment = NULL, *pending_tags =
560       NULL, *stream_start = NULL, *buffer_gap = NULL;
561   GList *pending_events = NULL;
562
563   if (stream->compute_segment) {
564     gst_adaptive_demux2_stream_prepare_segment (demux, stream,
565         stream->first_and_live);
566     stream->compute_segment = FALSE;
567     stream->first_and_live = FALSE;
568   }
569
570   if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DROPPABLE)) {
571     GST_DEBUG_OBJECT (stream, "Creating gap event for droppable buffer");
572     buffer_gap =
573         gst_event_new_gap (GST_BUFFER_PTS (buffer),
574         GST_BUFFER_DURATION (buffer));
575   }
576
577   if (stream->first_fragment_buffer) {
578     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
579     if (demux->segment.rate < 0)
580       /* Set DISCONT flag for every first buffer in reverse playback mode
581        * as each fragment for its own has to be reversed */
582       discont = TRUE;
583     update_buffer_pts_and_demux_position_locked (demux, stream, buffer);
584     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
585
586     GST_LOG_OBJECT (stream, "Handling initial buffer %" GST_PTR_FORMAT, buffer);
587
588     /* Do we need to inject STREAM_START and SEGMENT events ?
589      *
590      * This can happen when a stream is restarted, and also when switching to a
591      * variant which needs a header (in which case downloading_header will be
592      * TRUE)
593      */
594     if (G_UNLIKELY (stream->send_segment || stream->downloading_header)) {
595       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
596       pending_segment = gst_event_new_segment (&stream->parse_segment);
597       gst_event_set_seqnum (pending_segment, demux->priv->segment_seqnum);
598       stream->send_segment = FALSE;
599       GST_DEBUG_OBJECT (stream, "Sending %" GST_PTR_FORMAT, pending_segment);
600       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
601       stream_start = gst_event_new_stream_start ("bogus");
602       if (demux->have_group_id)
603         gst_event_set_group_id (stream_start, demux->group_id);
604     }
605   } else {
606     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
607   }
608   stream->first_fragment_buffer = FALSE;
609
610   if (stream->discont) {
611     discont = TRUE;
612     stream->discont = FALSE;
613   }
614
615   if (discont) {
616     GST_DEBUG_OBJECT (stream, "Marking fragment as discontinuous");
617     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
618   } else {
619     GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
620   }
621
622   GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
623   GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
624   if (G_UNLIKELY (stream->pending_caps)) {
625     pending_caps = gst_event_new_caps (stream->pending_caps);
626     gst_caps_unref (stream->pending_caps);
627     stream->pending_caps = NULL;
628   }
629
630   if (G_UNLIKELY (stream->pending_tags)) {
631     GstTagList *tags = stream->pending_tags;
632
633     stream->pending_tags = NULL;
634
635     if (tags)
636       pending_tags = gst_event_new_tag (tags);
637   }
638   if (G_UNLIKELY (stream->pending_events)) {
639     pending_events = stream->pending_events;
640     stream->pending_events = NULL;
641   }
642
643   /* Do not push events or buffers holding the manifest lock */
644   if (G_UNLIKELY (stream_start)) {
645     GST_DEBUG_OBJECT (stream,
646         "Setting stream start: %" GST_PTR_FORMAT, stream_start);
647     gst_pad_send_event (stream->parsebin_sink, stream_start);
648   }
649   if (G_UNLIKELY (pending_caps)) {
650     GST_DEBUG_OBJECT (stream,
651         "Setting pending caps: %" GST_PTR_FORMAT, pending_caps);
652     gst_pad_send_event (stream->parsebin_sink, pending_caps);
653   }
654   if (G_UNLIKELY (pending_segment)) {
655     GST_DEBUG_OBJECT (stream,
656         "Sending pending seg: %" GST_PTR_FORMAT, pending_segment);
657     gst_pad_send_event (stream->parsebin_sink, pending_segment);
658   }
659   if (G_UNLIKELY (pending_tags)) {
660     GST_DEBUG_OBJECT (stream,
661         "Sending pending tags: %" GST_PTR_FORMAT, pending_tags);
662     gst_pad_send_event (stream->parsebin_sink, pending_tags);
663   }
664   while (pending_events != NULL) {
665     GstEvent *event = pending_events->data;
666
667     GST_DEBUG_OBJECT (stream, "Sending pending event: %" GST_PTR_FORMAT, event);
668     if (!gst_pad_send_event (stream->parsebin_sink, event))
669       GST_ERROR_OBJECT (stream, "Failed to send pending event");
670
671     pending_events = g_list_delete_link (pending_events, pending_events);
672   }
673
674   GST_DEBUG_OBJECT (stream,
675       "About to push buffer of size %" G_GSIZE_FORMAT " offset %"
676       G_GUINT64_FORMAT, gst_buffer_get_size (buffer),
677       GST_BUFFER_OFFSET (buffer));
678
679   ret = gst_pad_chain (stream->parsebin_sink, buffer);
680
681   if (buffer_gap) {
682     GST_DEBUG_OBJECT (stream, "Sending %" GST_PTR_FORMAT, buffer_gap);
683     gst_pad_send_event (stream->parsebin_sink, buffer_gap);
684   }
685
686   if (G_UNLIKELY (stream->cancelled)) {
687     GST_LOG_OBJECT (demux, "Stream was cancelled");
688     return GST_FLOW_FLUSHING;
689   }
690
691   GST_LOG_OBJECT (stream, "Push result: %d %s", ret, gst_flow_get_name (ret));
692
693   return ret;
694 }
695
696 static GstFlowReturn
697 gst_adaptive_demux2_stream_parse_buffer (GstAdaptiveDemux2Stream * stream,
698     GstBuffer * buffer)
699 {
700   GstAdaptiveDemux *demux = stream->demux;
701   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
702   GstFlowReturn ret = GST_FLOW_OK;
703
704   /* do not make any changes if the stream is cancelled */
705   if (G_UNLIKELY (stream->cancelled)) {
706     gst_buffer_unref (buffer);
707     return GST_FLOW_FLUSHING;
708   }
709
710   /* starting_fragment is set to TRUE at the beginning of
711    * _stream_download_fragment()
712    * /!\ If there is a header/index being downloaded, then this will
713    * be TRUE for the first one ... but FALSE for the remaining ones,
714    * including the *actual* fragment ! */
715   if (stream->starting_fragment) {
716     stream->starting_fragment = FALSE;
717     if (klass->start_fragment != NULL && !klass->start_fragment (demux, stream))
718       return GST_FLOW_ERROR;
719   }
720
721   stream->download_total_bytes += gst_buffer_get_size (buffer);
722
723   GST_TRACE_OBJECT (stream,
724       "Received %s buffer of size %" G_GSIZE_FORMAT, uritype (stream),
725       gst_buffer_get_size (buffer));
726
727   ret = klass->data_received (demux, stream, buffer);
728
729   if (ret != GST_FLOW_OK) {
730     GST_DEBUG_OBJECT (stream, "data_received returned %s",
731         gst_flow_get_name (ret));
732
733     if (ret == GST_FLOW_FLUSHING) {
734       /* do not make any changes if the stream is cancelled */
735       if (G_UNLIKELY (stream->cancelled)) {
736         return ret;
737       }
738     }
739
740     if (ret < GST_FLOW_EOS) {
741       GstEvent *eos = gst_event_new_eos ();
742       GST_ELEMENT_FLOW_ERROR (demux, ret);
743
744       GST_DEBUG_OBJECT (stream, "Pushing EOS to parser");
745
746       /* TODO push this on all pads */
747       gst_event_set_seqnum (eos, stream->demux->priv->segment_seqnum);
748       gst_pad_send_event (stream->parsebin_sink, eos);
749       ret = GST_FLOW_ERROR;
750
751       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_ERRORED;
752     }
753   }
754
755   return ret;
756 }
757
758 /* Calculate the low and high download buffering watermarks
759  * in time as MAX (low-watermark-time, low-watermark-fragments) and
760  * MIN (high-watermark-time, high-watermark-fragments) respectively
761  */
762 static void
763 calculate_track_thresholds (GstAdaptiveDemux * demux,
764     GstClockTime fragment_duration, GstClockTime * low_threshold,
765     GstClockTime * high_threshold)
766 {
767   GST_OBJECT_LOCK (demux);
768   *low_threshold = demux->buffering_low_watermark_fragments * fragment_duration;
769   if (*low_threshold == 0 ||
770       (demux->buffering_low_watermark_time != 0
771           && demux->buffering_low_watermark_time > *low_threshold)) {
772     *low_threshold = demux->buffering_low_watermark_time;
773   }
774
775   *high_threshold =
776       demux->buffering_high_watermark_fragments * fragment_duration;
777   if (*high_threshold == 0 || (demux->buffering_high_watermark_time != 0
778           && demux->buffering_high_watermark_time < *high_threshold)) {
779     *high_threshold = demux->buffering_high_watermark_time;
780   }
781
782   /* Make sure the low and high thresholds are less than the maximum buffering
783    * time */
784   if (*high_threshold == 0 ||
785       (demux->max_buffering_time != 0
786           && demux->max_buffering_time < *high_threshold)) {
787     *high_threshold = demux->max_buffering_time;
788   }
789
790   if (*low_threshold == 0 ||
791       (demux->max_buffering_time != 0
792           && demux->max_buffering_time < *low_threshold)) {
793     *low_threshold = demux->max_buffering_time;
794   }
795
796   /* Make sure the high threshold is higher than (or equal to) the low threshold.
797    * It's OK if they are the same, as the minimum download is 1 fragment */
798   if (*high_threshold == 0 ||
799       (*low_threshold != 0 && *low_threshold > *high_threshold)) {
800     *high_threshold = *low_threshold;
801   }
802   GST_OBJECT_UNLOCK (demux);
803 }
804
805 static gboolean
806 gst_adaptive_demux2_stream_wait_for_output_space (GstAdaptiveDemux * demux,
807     GstAdaptiveDemux2Stream * stream, GstClockTime fragment_duration)
808 {
809   gboolean need_to_wait = TRUE;
810   gboolean have_any_tracks = FALSE;
811   gboolean have_active_tracks = FALSE;
812   gboolean have_filled_inactive = FALSE;
813   gboolean update_buffering = FALSE;
814
815   GstClockTime low_threshold = 0, high_threshold = 0;
816   GList *iter;
817
818   calculate_track_thresholds (demux, fragment_duration,
819       &low_threshold, &high_threshold);
820
821   /* If there are no tracks at all, don't wait. If there are no active
822    * tracks, keep filling until at least one track is full. If there
823    * are active tracks, require that they are all full */
824   TRACKS_LOCK (demux);
825   for (iter = stream->tracks; iter; iter = iter->next) {
826     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
827
828     /* Update the buffering threshold */
829     if (low_threshold != track->buffering_threshold) {
830       /* The buffering threshold for this track changed, make sure to
831        * re-check buffering status */
832       update_buffering = TRUE;
833       track->buffering_threshold = low_threshold;
834     }
835
836     have_any_tracks = TRUE;
837     if (track->active)
838       have_active_tracks = TRUE;
839
840     if (track->level_time < high_threshold) {
841       if (track->active) {
842         need_to_wait = FALSE;
843         GST_DEBUG_OBJECT (demux,
844             "stream %p track %s has level %" GST_TIME_FORMAT
845             " - needs more data (target %" GST_TIME_FORMAT
846             ") (fragment duration %" GST_TIME_FORMAT ")", stream,
847             track->stream_id, GST_TIME_ARGS (track->level_time),
848             GST_TIME_ARGS (high_threshold), GST_TIME_ARGS (fragment_duration));
849         continue;
850       }
851     } else if (!track->active) {        /* track is over threshold and inactive */
852       have_filled_inactive = TRUE;
853     }
854
855     GST_DEBUG_OBJECT (demux,
856         "stream %p track %s active (%d) has level %" GST_TIME_FORMAT,
857         stream, track->stream_id, track->active,
858         GST_TIME_ARGS (track->level_time));
859   }
860
861   /* If there are no tracks, don't wait (we might need data to create them),
862    * or if there are active tracks that need more data to hit the threshold,
863    * don't wait. Otherwise it means all active tracks are full and we should wait */
864   if (!have_any_tracks) {
865     GST_DEBUG_OBJECT (demux, "stream %p has no tracks - not waiting", stream);
866     need_to_wait = FALSE;
867   } else if (!have_active_tracks && !have_filled_inactive) {
868     GST_DEBUG_OBJECT (demux,
869         "stream %p has inactive tracks that need more data - not waiting",
870         stream);
871     need_to_wait = FALSE;
872   }
873
874   if (need_to_wait) {
875     for (iter = stream->tracks; iter; iter = iter->next) {
876       GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
877       track->waiting_del_level = high_threshold;
878       GST_DEBUG_OBJECT (demux,
879           "Waiting for queued data on stream %p track %s to drop below %"
880           GST_TIME_FORMAT " (fragment duration %" GST_TIME_FORMAT ")",
881           stream, track->stream_id, GST_TIME_ARGS (track->waiting_del_level),
882           GST_TIME_ARGS (fragment_duration));
883     }
884   }
885
886   if (update_buffering) {
887     demux_update_buffering_locked (demux);
888     demux_post_buffering_locked (demux);
889   }
890
891   TRACKS_UNLOCK (demux);
892
893   return need_to_wait;
894 }
895
896 static GstAdaptiveDemuxTrack *
897 match_parsebin_to_track (GstAdaptiveDemux2Stream * stream, GstPad * pad)
898 {
899   GList *tmp;
900   GstAdaptiveDemuxTrack *found_track = NULL, *first_matched_track = NULL;
901   gint num_possible_tracks = 0;
902   GstStream *gst_stream;
903   const gchar *internal_stream_id;
904   GstStreamType stream_type;
905
906   gst_stream = gst_pad_get_stream (pad);
907
908   /* FIXME: Edward: Added assertion because I don't see in what cases we would
909    * end up with a pad from parsebin which wouldn't have an associated
910    * GstStream. */
911   g_assert (gst_stream);
912
913   internal_stream_id = gst_stream_get_stream_id (gst_stream);
914   stream_type = gst_stream_get_stream_type (gst_stream);
915
916   GST_DEBUG_OBJECT (pad,
917       "Trying to match pad from parsebin with internal streamid %s and caps %"
918       GST_PTR_FORMAT, GST_STR_NULL (internal_stream_id),
919       gst_stream_get_caps (gst_stream));
920
921   /* Try to match directly by the track's pending upstream_stream_id */
922   for (tmp = stream->tracks; tmp; tmp = tmp->next) {
923     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
924
925     if (stream_type != GST_STREAM_TYPE_UNKNOWN && track->type != stream_type)
926       continue;
927
928     GST_DEBUG_OBJECT (pad, "track upstream_stream_id: %s",
929         track->upstream_stream_id);
930
931     if (first_matched_track == NULL)
932       first_matched_track = track;
933     num_possible_tracks++;
934
935     /* If this track has a desired upstream stream id, match on it */
936     if (track->upstream_stream_id == NULL ||
937         g_strcmp0 (track->upstream_stream_id, internal_stream_id)) {
938       /* This is not the track for this pad */
939       continue;
940     }
941
942     /* Remove pending upstream id (we have matched it for the pending
943      * stream_id) */
944     g_free (track->upstream_stream_id);
945     track->upstream_stream_id = NULL;
946     found_track = track;
947     break;
948   }
949
950   if (found_track == NULL) {
951     /* If we arrive here, it means the stream is switching pads after
952      * the stream has already started running */
953     /* No track is currently waiting for this particular stream id -
954      * try and match an existing linked track. If there's only 1 possible,
955      * take it. */
956     if (num_possible_tracks == 1 && first_matched_track != NULL) {
957       GST_LOG_OBJECT (pad, "Only one possible track to link to");
958       found_track = first_matched_track;
959     }
960   }
961
962   if (found_track == NULL) {
963     /* TODO: There are multiple possible tracks, need to match based
964      * on language code and caps. Have you found a stream like this? */
965     GST_FIXME_OBJECT (pad, "Need to match track based on caps and language");
966   }
967
968   if (found_track != NULL) {
969     if (!gst_pad_is_linked (found_track->sinkpad)) {
970       GST_LOG_OBJECT (pad, "Linking to track pad %" GST_PTR_FORMAT,
971           found_track->sinkpad);
972
973       if (gst_pad_link (pad, found_track->sinkpad) != GST_PAD_LINK_OK) {
974         GST_ERROR_OBJECT (pad, "Couldn't connect to track sinkpad");
975         /* FIXME : Do something if we can't link ? */
976       }
977     } else {
978       /* Store pad as pending link */
979       GST_LOG_OBJECT (pad,
980           "Remembering pad to be linked when current pad is unlinked");
981       g_assert (found_track->pending_srcpad == NULL);
982       found_track->pending_srcpad = gst_object_ref (pad);
983     }
984   }
985
986   if (gst_stream)
987     gst_object_unref (gst_stream);
988
989   return found_track;
990 }
991
992 static void
993 parsebin_pad_removed_cb (GstElement * parsebin, GstPad * pad,
994     GstAdaptiveDemux2Stream * stream)
995 {
996   GList *iter;
997   GST_DEBUG_OBJECT (stream, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
998
999   /* Remove from pending source pad */
1000   TRACKS_LOCK (stream->demux);
1001   for (iter = stream->tracks; iter; iter = iter->next) {
1002     GstAdaptiveDemuxTrack *track = iter->data;
1003     if (track->pending_srcpad == pad) {
1004       gst_object_unref (track->pending_srcpad);
1005       track->pending_srcpad = NULL;
1006       break;
1007     }
1008   }
1009   TRACKS_UNLOCK (stream->demux);
1010 }
1011
1012 static void
1013 parsebin_pad_added_cb (GstElement * parsebin, GstPad * pad,
1014     GstAdaptiveDemux2Stream * stream)
1015 {
1016   if (!GST_PAD_IS_SRC (pad))
1017     return;
1018
1019   GST_DEBUG_OBJECT (stream, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1020
1021   if (!match_parsebin_to_track (stream, pad))
1022     GST_WARNING_OBJECT (pad, "Found no track to handle pad");
1023
1024   GST_DEBUG_OBJECT (stream->demux, "Done linking");
1025 }
1026
1027 static void
1028 parsebin_deep_element_added_cb (GstBin * parsebin, GstBin * unused,
1029     GstElement * element, GstAdaptiveDemux * demux)
1030 {
1031   if (G_OBJECT_TYPE (element) == tsdemux_type) {
1032     GST_DEBUG_OBJECT (demux, "Overriding tsdemux ignore-pcr to TRUE");
1033     g_object_set (element, "ignore-pcr", TRUE, NULL);
1034   }
1035 }
1036
1037 /* must be called with manifest_lock taken */
1038 static gboolean
1039 gst_adaptive_demux2_stream_create_parser (GstAdaptiveDemux2Stream * stream)
1040 {
1041   GstAdaptiveDemux *demux = stream->demux;
1042
1043   if (stream->parsebin == NULL) {
1044     GstEvent *event;
1045
1046     GST_DEBUG_OBJECT (demux, "Setting up new parsing source");
1047
1048     /* Workaround to detect if tsdemux is being used */
1049     if (tsdemux_type == 0) {
1050       GstElement *element = gst_element_factory_make ("tsdemux", NULL);
1051       if (element) {
1052         tsdemux_type = G_OBJECT_TYPE (element);
1053         gst_object_unref (element);
1054       }
1055     }
1056
1057     stream->parsebin = gst_element_factory_make ("parsebin", NULL);
1058     if (tsdemux_type)
1059       g_signal_connect (stream->parsebin, "deep-element-added",
1060           (GCallback) parsebin_deep_element_added_cb, demux);
1061     gst_bin_add (GST_BIN_CAST (demux), gst_object_ref (stream->parsebin));
1062     stream->parsebin_sink =
1063         gst_element_get_static_pad (stream->parsebin, "sink");
1064     stream->pad_added_id = g_signal_connect (stream->parsebin, "pad-added",
1065         G_CALLBACK (parsebin_pad_added_cb), stream);
1066     stream->pad_removed_id = g_signal_connect (stream->parsebin, "pad-removed",
1067         G_CALLBACK (parsebin_pad_removed_cb), stream);
1068
1069     event = gst_event_new_stream_start ("bogus");
1070     if (demux->have_group_id)
1071       gst_event_set_group_id (event, demux->group_id);
1072
1073     gst_pad_send_event (stream->parsebin_sink, event);
1074
1075     /* Not sure if these need to be outside the manifest lock: */
1076     gst_element_sync_state_with_parent (stream->parsebin);
1077     stream->last_status_code = 200;     /* default to OK */
1078   }
1079   return TRUE;
1080 }
1081
1082 static void
1083 on_download_cancellation (DownloadRequest * request, DownloadRequestState state,
1084     GstAdaptiveDemux2Stream * stream)
1085 {
1086 }
1087
1088 static void
1089 on_download_error (DownloadRequest * request, DownloadRequestState state,
1090     GstAdaptiveDemux2Stream * stream)
1091 {
1092   GstAdaptiveDemux *demux = stream->demux;
1093   guint last_status_code = request->status_code;
1094   gboolean live;
1095
1096   stream->download_active = FALSE;
1097   stream->last_status_code = last_status_code;
1098
1099   GST_DEBUG_OBJECT (stream,
1100       "Download finished with error, request state %d http status %u, dc %d",
1101       request->state, last_status_code, stream->download_error_count);
1102
1103   live = gst_adaptive_demux_is_live (demux);
1104   if (((last_status_code / 100 == 4 && live)
1105           || last_status_code / 100 == 5)) {
1106     /* 4xx/5xx */
1107     /* if current position is before available start, switch to next */
1108     if (!gst_adaptive_demux2_stream_has_next_fragment (demux, stream))
1109       goto flushing;
1110
1111     if (live) {
1112       gint64 range_start, range_stop;
1113
1114       if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
1115               &range_stop))
1116         goto flushing;
1117
1118       if (demux->segment.position < range_start) {
1119         GstFlowReturn ret;
1120
1121         GST_DEBUG_OBJECT (stream, "Retrying once with next segment");
1122         gst_adaptive_demux2_stream_finish_download (stream, GST_FLOW_EOS, NULL);
1123
1124         GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1125
1126         ret = gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
1127         GST_DEBUG_OBJECT (stream, "update_fragment_info ret: %s",
1128             gst_flow_get_name (ret));
1129
1130         if (ret == GST_FLOW_OK)
1131           goto again;
1132
1133       } else if (demux->segment.position > range_stop) {
1134         /* wait a bit to be in range, we don't have any locks at that point */
1135         GstClockTime wait_time =
1136             gst_adaptive_demux2_stream_get_fragment_waiting_time (demux,
1137             stream);
1138         if (wait_time > 0) {
1139           GST_DEBUG_OBJECT (stream,
1140               "Download waiting for %" GST_TIME_FORMAT,
1141               GST_TIME_ARGS (wait_time));
1142           g_assert (stream->pending_cb_id == 0);
1143           GST_LOG_OBJECT (stream, "Scheduling delayed load_a_fragment() call");
1144           stream->pending_cb_id =
1145               gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1146               wait_time,
1147               (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1148               gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1149         }
1150       }
1151     }
1152
1153   flushing:
1154     if (stream->download_error_count >= MAX_DOWNLOAD_ERROR_COUNT) {
1155       /* looks like there is no way of knowing when a live stream has ended
1156        * Have to assume we are falling behind and cause a manifest reload */
1157       GST_DEBUG_OBJECT (stream, "Converting error of live stream to EOS");
1158       gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1159       return;
1160     }
1161   } else if (!gst_adaptive_demux2_stream_has_next_fragment (demux, stream)) {
1162     /* If this is the last fragment, consider failures EOS and not actual
1163      * errors. Due to rounding errors in the durations, the last fragment
1164      * might not actually exist */
1165     GST_DEBUG_OBJECT (stream, "Converting error for last fragment to EOS");
1166     gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1167     return;
1168   } else {
1169     /* retry same segment */
1170     if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
1171       gst_adaptive_demux2_stream_error (stream);
1172       return;
1173     }
1174     goto again;
1175   }
1176
1177 again:
1178   /* wait a short time in case the server needs a bit to recover */
1179   GST_LOG_OBJECT (stream,
1180       "Scheduling delayed load_a_fragment() call to retry in 10 milliseconds");
1181   g_assert (stream->pending_cb_id == 0);
1182   stream->pending_cb_id = gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task, 10 * GST_MSECOND,  /* Retry in 10 ms */
1183       (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1184       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1185 }
1186
1187 static void
1188 update_stream_bitrate (GstAdaptiveDemux2Stream * stream,
1189     DownloadRequest * request)
1190 {
1191   GstClockTimeDiff last_download_duration;
1192   guint64 fragment_bytes_downloaded = request->content_received;
1193
1194   /* The stream last_download time tracks the full download time for now */
1195   stream->last_download_time =
1196       GST_CLOCK_DIFF (request->download_request_time,
1197       request->download_end_time);
1198
1199   /* Here we only track the time the data took to arrive and ignore request delay, so we can estimate bitrate */
1200   last_download_duration =
1201       GST_CLOCK_DIFF (request->download_start_time, request->download_end_time);
1202
1203   /* If the entire response arrived in the first buffer
1204    * though, include the request time to get a valid
1205    * bitrate estimate */
1206   if (last_download_duration < 2 * stream->last_download_time)
1207     last_download_duration = stream->last_download_time;
1208
1209   if (last_download_duration > 0) {
1210     stream->last_bitrate =
1211         gst_util_uint64_scale (fragment_bytes_downloaded,
1212         8 * GST_SECOND, last_download_duration);
1213
1214     GST_DEBUG_OBJECT (stream,
1215         "Updated stream bitrate. %" G_GUINT64_FORMAT
1216         " bytes. download time %" GST_TIME_FORMAT " bitrate %"
1217         G_GUINT64_FORMAT " bps", fragment_bytes_downloaded,
1218         GST_TIME_ARGS (last_download_duration), stream->last_bitrate);
1219   }
1220 }
1221
1222 static void
1223 on_download_progress (DownloadRequest * request, DownloadRequestState state,
1224     GstAdaptiveDemux2Stream * stream)
1225 {
1226   GstAdaptiveDemux *demux = stream->demux;
1227   GstBuffer *buffer = download_request_take_buffer (request);
1228
1229   if (buffer) {
1230     GstFlowReturn ret;
1231
1232     GST_DEBUG_OBJECT (stream,
1233         "Handling buffer of %" G_GSIZE_FORMAT
1234         " bytes of ongoing download progress - %" G_GUINT64_FORMAT " / %"
1235         G_GUINT64_FORMAT " bytes", gst_buffer_get_size (buffer),
1236         request->content_received, request->content_length);
1237
1238     /* Drop the request lock when parsing data. FIXME: Check and comment why this is needed */
1239     download_request_unlock (request);
1240     ret = gst_adaptive_demux2_stream_parse_buffer (stream, buffer);
1241     download_request_lock (request);
1242
1243     if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
1244       return;
1245
1246     if (ret != GST_FLOW_OK) {
1247       GST_DEBUG_OBJECT (stream,
1248           "Buffer parsing returned: %d %s. Aborting download", ret,
1249           gst_flow_get_name (ret));
1250
1251       if (!stream->downloading_header && !stream->downloading_index)
1252         update_stream_bitrate (stream, request);
1253
1254       downloadhelper_cancel_request (demux->download_helper, request);
1255
1256       /* cancellation is async, so recycle our download request to avoid races */
1257       download_request_unref (stream->download_request);
1258       stream->download_request = download_request_new ();
1259
1260       gst_adaptive_demux2_stream_finish_download (stream, ret, NULL);
1261     }
1262   }
1263 }
1264
1265 static void
1266 on_download_complete (DownloadRequest * request, DownloadRequestState state,
1267     GstAdaptiveDemux2Stream * stream)
1268 {
1269   GstFlowReturn ret = GST_FLOW_OK;
1270   GstBuffer *buffer;
1271
1272   stream->download_active = FALSE;
1273
1274   if (G_UNLIKELY (stream->cancelled))
1275     return;
1276
1277   GST_DEBUG_OBJECT (stream,
1278       "Stream %p %s download for %s is complete with state %d",
1279       stream, uritype (stream), request->uri, request->state);
1280
1281   /* Update bitrate for fragment downloads */
1282   if (!stream->downloading_header && !stream->downloading_index)
1283     update_stream_bitrate (stream, request);
1284
1285   buffer = download_request_take_buffer (request);
1286   if (buffer)
1287     ret = gst_adaptive_demux2_stream_parse_buffer (stream, buffer);
1288
1289   GST_DEBUG_OBJECT (stream,
1290       "%s download finished: %s ret %d %s. Stream state %d", uritype (stream),
1291       request->uri, ret, gst_flow_get_name (ret), stream->state);
1292
1293   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
1294     return;
1295
1296   g_assert (stream->pending_cb_id == 0);
1297   gst_adaptive_demux2_stream_finish_download (stream, ret, NULL);
1298 }
1299
1300 /* must be called from the scheduler context
1301  *
1302  * Will submit the request only, which will complete asynchronously
1303  */
1304 static GstFlowReturn
1305 gst_adaptive_demux2_stream_begin_download_uri (GstAdaptiveDemux * demux,
1306     GstAdaptiveDemux2Stream * stream, const gchar * uri, gint64 start,
1307     gint64 end)
1308 {
1309   DownloadRequest *request = stream->download_request;
1310
1311   GST_DEBUG_OBJECT (demux,
1312       "Downloading %s uri: %s, range:%" G_GINT64_FORMAT " - %" G_GINT64_FORMAT,
1313       uritype (stream), uri, start, end);
1314
1315   if (!gst_adaptive_demux2_stream_create_parser (stream))
1316     return GST_FLOW_ERROR;
1317
1318   /* Configure our download request */
1319   download_request_set_uri (request, uri, start, end);
1320
1321   if (stream->downloading_header || stream->downloading_index) {
1322     download_request_set_callbacks (request,
1323         (DownloadRequestEventCallback) on_download_complete,
1324         (DownloadRequestEventCallback) on_download_error,
1325         (DownloadRequestEventCallback) on_download_cancellation,
1326         (DownloadRequestEventCallback) NULL, stream);
1327   } else {
1328     download_request_set_callbacks (request,
1329         (DownloadRequestEventCallback) on_download_complete,
1330         (DownloadRequestEventCallback) on_download_error,
1331         (DownloadRequestEventCallback) on_download_cancellation,
1332         (DownloadRequestEventCallback) on_download_progress, stream);
1333   }
1334
1335   if (!downloadhelper_submit_request (demux->download_helper,
1336           demux->manifest_uri, DOWNLOAD_FLAG_NONE, request, NULL))
1337     return GST_FLOW_ERROR;
1338
1339   stream->download_active = TRUE;
1340
1341   return GST_FLOW_OK;
1342 }
1343
1344 /* must be called from the scheduler context */
1345 static GstFlowReturn
1346 gst_adaptive_demux2_stream_download_fragment (GstAdaptiveDemux2Stream * stream)
1347 {
1348   GstAdaptiveDemux *demux = stream->demux;
1349   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1350   gchar *url = NULL;
1351
1352   /* FIXME :  */
1353   /* THERE ARE THREE DIFFERENT VARIABLES FOR THE "BEGINNING" OF A FRAGMENT ! */
1354   if (stream->starting_fragment) {
1355     GST_DEBUG_OBJECT (stream, "Downloading %s%s%s",
1356         stream->fragment.uri ? "FRAGMENT " : "",
1357         stream->need_header && stream->fragment.header_uri ? "HEADER " : "",
1358         stream->need_index && stream->fragment.index_uri ? "INDEX" : "");
1359
1360     if (stream->fragment.uri == NULL && stream->fragment.header_uri == NULL &&
1361         stream->fragment.index_uri == NULL)
1362       goto no_url_error;
1363
1364     stream->first_fragment_buffer = TRUE;
1365     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING;
1366   }
1367
1368   if (stream->need_header && stream->fragment.header_uri != NULL) {
1369
1370     /* Set the need_index flag when we start the header if we'll also need the index */
1371     stream->need_index = (stream->fragment.index_uri != NULL);
1372
1373     GST_DEBUG_OBJECT (stream, "Fetching header %s %" G_GINT64_FORMAT "-%"
1374         G_GINT64_FORMAT, stream->fragment.header_uri,
1375         stream->fragment.header_range_start, stream->fragment.header_range_end);
1376
1377     stream->downloading_header = TRUE;
1378
1379     return gst_adaptive_demux2_stream_begin_download_uri (demux, stream,
1380         stream->fragment.header_uri, stream->fragment.header_range_start,
1381         stream->fragment.header_range_end);
1382   }
1383
1384   /* check if we have an index */
1385   if (stream->need_index && stream->fragment.index_uri != NULL) {
1386     GST_DEBUG_OBJECT (stream,
1387         "Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
1388         stream->fragment.index_uri,
1389         stream->fragment.index_range_start, stream->fragment.index_range_end);
1390
1391     stream->downloading_index = TRUE;
1392
1393     return gst_adaptive_demux2_stream_begin_download_uri (demux, stream,
1394         stream->fragment.index_uri, stream->fragment.index_range_start,
1395         stream->fragment.index_range_end);
1396   }
1397
1398   url = stream->fragment.uri;
1399   GST_DEBUG_OBJECT (stream, "Got url '%s' for stream %p", url, stream);
1400   if (!url)
1401     return GST_FLOW_OK;
1402
1403   /* Download the actual fragment, either in chunks or in one go */
1404   stream->first_fragment_buffer = TRUE;
1405
1406   if (klass->need_another_chunk && klass->need_another_chunk (stream)
1407       && stream->fragment.chunk_size != 0) {
1408     /* Handle chunk downloading */
1409     gint64 range_start = stream->fragment.range_start;
1410     gint64 range_end = stream->fragment.range_end;
1411     gint chunk_size = stream->fragment.chunk_size;
1412     gint64 chunk_end;
1413
1414     /* HTTP ranges are inclusive for the end */
1415     if (chunk_size != -1) {
1416       chunk_end = range_start + chunk_size - 1;
1417       if (range_end != -1 && range_end < chunk_end)
1418         chunk_end = range_end;
1419     } else {
1420       chunk_end = range_end;
1421     }
1422
1423     GST_DEBUG_OBJECT (stream,
1424         "Starting chunked download %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
1425         url, range_start, chunk_end);
1426     return gst_adaptive_demux2_stream_begin_download_uri (demux, stream, url,
1427         range_start, chunk_end);
1428   }
1429
1430   /* regular single chunk download */
1431   stream->fragment.chunk_size = 0;
1432
1433   return gst_adaptive_demux2_stream_begin_download_uri (demux, stream, url,
1434       stream->fragment.range_start, stream->fragment.range_end);
1435
1436 no_url_error:
1437   {
1438     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1439         (_("Failed to get fragment URL.")),
1440         ("An error happened when getting fragment URL"));
1441     return GST_FLOW_ERROR;
1442   }
1443 }
1444
1445 static gboolean
1446 gst_adaptive_demux2_stream_push_event (GstAdaptiveDemux2Stream * stream,
1447     GstEvent * event)
1448 {
1449   gboolean ret = TRUE;
1450   GstPad *pad;
1451
1452   /* If there's a parsebin, push the event through it */
1453   if (stream->parsebin_sink != NULL) {
1454     pad = gst_object_ref (stream->parsebin_sink);
1455     GST_DEBUG_OBJECT (pad, "Pushing event %" GST_PTR_FORMAT, event);
1456     ret = gst_pad_send_event (pad, gst_event_ref (event));
1457     gst_object_unref (pad);
1458   }
1459
1460   /* If the event is EOS, ensure that all tracks are EOS. This catches
1461    * the case where the parsebin hasn't parsed anything yet (we switched
1462    * to a never before used track right near EOS, or it didn't parse enough
1463    * to create pads and be able to send EOS through to the tracks.
1464    *
1465    * We don't need to care about any other events
1466    */
1467   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
1468     GList *iter;
1469
1470     for (iter = stream->tracks; iter; iter = iter->next) {
1471       GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
1472       ret &= gst_pad_send_event (track->sinkpad, gst_event_ref (event));
1473     }
1474   }
1475
1476   gst_event_unref (event);
1477   return ret;
1478 }
1479
1480 static void
1481 gst_adaptive_demux2_stream_error (GstAdaptiveDemux2Stream * stream)
1482 {
1483   GstAdaptiveDemux *demux = stream->demux;
1484   GstMessage *msg;
1485   GstStructure *details;
1486
1487   details = gst_structure_new_empty ("details");
1488   gst_structure_set (details, "http-status-code", G_TYPE_UINT,
1489       stream->last_status_code, NULL);
1490
1491   stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_ERRORED;
1492
1493   if (stream->last_error) {
1494     gchar *debug = g_strdup_printf ("Error on stream %s",
1495         GST_OBJECT_NAME (stream));
1496     msg =
1497         gst_message_new_error_with_details (GST_OBJECT_CAST (demux),
1498         stream->last_error, debug, details);
1499     GST_ERROR_OBJECT (stream, "Download error: %s",
1500         stream->last_error->message);
1501     g_free (debug);
1502   } else {
1503     GError *err = g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_NOT_FOUND,
1504         _("Couldn't download fragments"));
1505     msg =
1506         gst_message_new_error_with_details (GST_OBJECT_CAST (demux), err,
1507         "Fragment downloading has failed consecutive times", details);
1508     g_error_free (err);
1509     GST_ERROR_OBJECT (stream,
1510         "Download error: Couldn't download fragments, too many failures");
1511   }
1512
1513   gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
1514 }
1515
1516 /* Called when a stream reaches the end of a playback segment */
1517 static void
1518 gst_adaptive_demux2_stream_end_of_manifest (GstAdaptiveDemux2Stream * stream)
1519 {
1520   GstAdaptiveDemux *demux = stream->demux;
1521   GstFlowReturn combined =
1522       gst_adaptive_demux_period_combine_stream_flows (demux->input_period);
1523
1524   GST_DEBUG_OBJECT (stream, "Combined flow %s", gst_flow_get_name (combined));
1525
1526   if (gst_adaptive_demux_has_next_period (demux)) {
1527     if (combined == GST_FLOW_EOS) {
1528       GST_DEBUG_OBJECT (stream, "Next period available, advancing");
1529       gst_adaptive_demux_advance_period (demux);
1530     } else {
1531       /* Ensure the 'has_next_period' flag is set on the period before
1532        * pushing EOS to the stream, so that the output loop knows not
1533        * to actually output the event */
1534       GST_DEBUG_OBJECT (stream, "Marking current period has a next one");
1535       demux->input_period->has_next_period = TRUE;
1536     }
1537   }
1538
1539   if (demux->priv->outputs) {
1540     GstEvent *eos = gst_event_new_eos ();
1541
1542     GST_DEBUG_OBJECT (stream, "Stream is EOS. Stopping.");
1543     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS;
1544
1545     gst_event_set_seqnum (eos, stream->demux->priv->segment_seqnum);
1546     gst_adaptive_demux2_stream_push_event (stream, eos);
1547   } else {
1548     GST_ERROR_OBJECT (demux, "Can't push EOS on non-exposed pad");
1549     gst_adaptive_demux2_stream_error (stream);
1550   }
1551 }
1552
1553 static gboolean
1554 gst_adaptive_demux2_stream_reload_manifest_cb (GstAdaptiveDemux2Stream * stream)
1555 {
1556   GstAdaptiveDemux *demux = stream->demux;
1557
1558   gboolean is_live = gst_adaptive_demux_is_live (demux);
1559
1560   stream->pending_cb_id = 0;
1561
1562   /* Refetch the playlist now after we waited */
1563   /* FIXME: Make this manifest update async and handle it on completion */
1564   if (!is_live && gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
1565     GST_DEBUG_OBJECT (demux, "Updated the playlist");
1566   }
1567
1568   /* We were called here from a timeout, so if the load function wants to loop
1569    * again, schedule an immediate callback but return G_SOURCE_REMOVE either
1570    * way */
1571   while (gst_adaptive_demux2_stream_next_download (stream));
1572
1573   return G_SOURCE_REMOVE;
1574 }
1575
1576 static gboolean
1577 gst_adaptive_demux2_stream_on_output_space_available_cb (GstAdaptiveDemux2Stream
1578     * stream)
1579 {
1580   /* If the state already moved on, the stream was stopped, or another track
1581    * already woke up and needed data */
1582   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE)
1583     return G_SOURCE_REMOVE;
1584
1585   while (gst_adaptive_demux2_stream_load_a_fragment (stream));
1586
1587   return G_SOURCE_REMOVE;
1588 }
1589
1590 void
1591 gst_adaptive_demux2_stream_on_output_space_available (GstAdaptiveDemux2Stream *
1592     stream)
1593 {
1594   GstAdaptiveDemux *demux = stream->demux;
1595   GList *iter;
1596
1597   for (iter = stream->tracks; iter; iter = iter->next) {
1598     GstAdaptiveDemuxTrack *tmp_track = (GstAdaptiveDemuxTrack *) iter->data;
1599     tmp_track->waiting_del_level = 0;
1600   }
1601
1602   GST_LOG_OBJECT (stream, "Scheduling output_space_available() call");
1603
1604   gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
1605       (GSourceFunc) gst_adaptive_demux2_stream_on_output_space_available_cb,
1606       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1607 }
1608
1609 void
1610 gst_adaptive_demux2_stream_on_manifest_update (GstAdaptiveDemux2Stream * stream)
1611 {
1612   GstAdaptiveDemux *demux = stream->demux;
1613
1614   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE)
1615     return;
1616
1617   g_assert (stream->pending_cb_id == 0);
1618
1619   GST_LOG_OBJECT (stream, "Scheduling load_a_fragment() call");
1620   stream->pending_cb_id =
1621       gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
1622       (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1623       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1624 }
1625
1626 static void
1627 gst_adaptive_demux2_stream_handle_playlist_eos (GstAdaptiveDemux2Stream *
1628     stream)
1629 {
1630   GstAdaptiveDemux *demux = stream->demux;
1631
1632   if (gst_adaptive_demux_is_live (demux) && (demux->segment.rate == 1.0
1633           || gst_adaptive_demux2_stream_in_live_seek_range (demux, stream))) {
1634
1635     if (!gst_adaptive_demux_has_next_period (demux)) {
1636       /* Wait only if we can ensure current manifest has been expired.
1637        * The meaning "we have next period" *WITH* EOS is that, current
1638        * period has been ended but we can continue to the next period */
1639       GST_DEBUG_OBJECT (stream,
1640           "Live playlist EOS - waiting for manifest update");
1641       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE;
1642       gst_adaptive_demux2_stream_wants_manifest_update (demux);
1643       return;
1644     }
1645
1646     if (stream->replaced)
1647       return;
1648   }
1649
1650   gst_adaptive_demux2_stream_end_of_manifest (stream);
1651 }
1652
1653 static gboolean
1654 gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream)
1655 {
1656   GstAdaptiveDemux *demux = stream->demux;
1657   gboolean live = gst_adaptive_demux_is_live (demux);
1658   GstFlowReturn ret = GST_FLOW_OK;
1659
1660   stream->pending_cb_id = 0;
1661
1662   GST_LOG_OBJECT (stream, "entering, state = %d.", stream->state);
1663
1664   switch (stream->state) {
1665     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART:
1666     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT:
1667     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_LIVE:
1668     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE:
1669     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE:
1670       /* Get information about the fragment to download */
1671       GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1672       ret = gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
1673       GST_DEBUG_OBJECT (stream,
1674           "Fragment info update result: %d %s", ret, gst_flow_get_name (ret));
1675
1676       if (ret == GST_FLOW_OK)
1677         stream->starting_fragment = TRUE;
1678       break;
1679     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING:
1680       break;
1681     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS:
1682       GST_ERROR_OBJECT (stream,
1683           "Unexpected stream state EOS. The stream should not be running now.");
1684       return FALSE;
1685     default:
1686       GST_ERROR_OBJECT (stream, "Unexpected stream state %d", stream->state);
1687       g_assert_not_reached ();
1688       break;
1689   }
1690
1691   if (ret == GST_FLOW_OK) {
1692     /* Wait for room in the output tracks */
1693     if (gst_adaptive_demux2_stream_wait_for_output_space (demux, stream,
1694             stream->fragment.duration)) {
1695       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE;
1696       return FALSE;
1697     }
1698   }
1699
1700   if (ret == GST_FLOW_OK) {
1701     /* wait for live fragments to be available */
1702     if (live) {
1703       GstClockTime wait_time =
1704           gst_adaptive_demux2_stream_get_fragment_waiting_time (demux, stream);
1705       if (wait_time > 0) {
1706         GST_DEBUG_OBJECT (stream,
1707             "Download waiting for %" GST_TIME_FORMAT,
1708             GST_TIME_ARGS (wait_time));
1709
1710         stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_LIVE;
1711
1712         GST_LOG_OBJECT (stream, "Scheduling delayed load_a_fragment() call");
1713         g_assert (stream->pending_cb_id == 0);
1714         stream->pending_cb_id =
1715             gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1716             wait_time, (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1717             gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1718         return FALSE;
1719       }
1720     }
1721
1722     if (gst_adaptive_demux2_stream_download_fragment (stream) != GST_FLOW_OK) {
1723       GST_ERROR_OBJECT (demux,
1724           "Failed to begin fragment download for stream %p", stream);
1725       return FALSE;
1726     }
1727   }
1728
1729   /* Cast to int avoids a compiler warning that
1730    * GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC is not in the GstFlowReturn enum */
1731   switch ((int) ret) {
1732     case GST_FLOW_OK:
1733       break;                    /* all is good, let's go */
1734     case GST_FLOW_EOS:
1735       GST_DEBUG_OBJECT (stream, "EOS, checking to stop download loop");
1736       gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1737       return FALSE;
1738     case GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC:
1739       GST_DEBUG_OBJECT (stream, "Lost sync, asking reset to current position");
1740       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
1741       gst_adaptive_demux_handle_lost_sync (demux);
1742       return FALSE;
1743     case GST_FLOW_NOT_LINKED:
1744     {
1745       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS;
1746
1747       if (gst_adaptive_demux_period_combine_stream_flows (demux->input_period)
1748           == GST_FLOW_NOT_LINKED) {
1749         GST_ELEMENT_FLOW_ERROR (demux, ret);
1750       }
1751     }
1752       break;
1753
1754     case GST_FLOW_FLUSHING:
1755       /* Flushing is normal, the target track might have been unselected */
1756       if (G_UNLIKELY (stream->cancelled))
1757         return FALSE;
1758
1759     default:
1760       if (ret <= GST_FLOW_ERROR) {
1761         GST_WARNING_OBJECT (demux, "Error while downloading fragment");
1762         if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
1763           gst_adaptive_demux2_stream_error (stream);
1764           return FALSE;
1765         }
1766
1767         g_clear_error (&stream->last_error);
1768
1769         /* First try to update the playlist for non-live playlists
1770          * in case the URIs have changed in the meantime. But only
1771          * try it the first time, after that we're going to wait a
1772          * a bit to not flood the server */
1773         if (stream->download_error_count == 1
1774             && !gst_adaptive_demux_is_live (demux)) {
1775           /* TODO hlsdemux had more options to this function (boolean and err) */
1776           if (gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
1777             /* Retry immediately, the playlist actually has changed */
1778             GST_DEBUG_OBJECT (demux, "Updated the playlist");
1779             return TRUE;
1780           }
1781         }
1782
1783         /* Wait half the fragment duration before retrying */
1784         GST_LOG_OBJECT (stream, "Scheduling delayed reload_manifest_cb() call");
1785         g_assert (stream->pending_cb_id == 0);
1786         stream->pending_cb_id =
1787             gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1788             stream->fragment.duration / 2,
1789             (GSourceFunc) gst_adaptive_demux2_stream_reload_manifest_cb,
1790             gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1791         return FALSE;
1792       }
1793       break;
1794   }
1795
1796   return FALSE;
1797 }
1798
1799 static gboolean
1800 gst_adaptive_demux2_stream_next_download (GstAdaptiveDemux2Stream * stream)
1801 {
1802   GstAdaptiveDemux *demux = stream->demux;
1803   gboolean end_of_manifest = FALSE;
1804
1805   GST_LOG_OBJECT (stream, "Looking for next download");
1806
1807   /* Restarting download, figure out new position
1808    * FIXME : Move this to a separate function ? */
1809   if (G_UNLIKELY (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART)) {
1810     GstClockTimeDiff stream_time = 0;
1811
1812     GST_DEBUG_OBJECT (stream, "Activating stream after restart");
1813
1814     if (stream->parsebin_sink != NULL) {
1815       /* If the parsebin already exists, we need to clear it out (if it doesn't,
1816        * this is the first time we've used this stream, so it's all good) */
1817       gst_adaptive_demux2_stream_push_event (stream,
1818           gst_event_new_flush_start ());
1819       gst_adaptive_demux2_stream_push_event (stream,
1820           gst_event_new_flush_stop (FALSE));
1821     }
1822
1823     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1824     stream_time = stream->start_position;
1825
1826     GST_DEBUG_OBJECT (stream, "Restarting stream at "
1827         "stream position %" GST_STIME_FORMAT, GST_STIME_ARGS (stream_time));
1828
1829     if (GST_CLOCK_STIME_IS_VALID (stream_time)) {
1830       /* TODO check return */
1831       gst_adaptive_demux2_stream_seek (demux, stream, demux->segment.rate >= 0,
1832           0, stream_time, &stream_time);
1833       stream->current_position = stream->start_position;
1834
1835       GST_DEBUG_OBJECT (stream,
1836           "stream_time after restart seek: %" GST_STIME_FORMAT
1837           " position %" GST_STIME_FORMAT, GST_STIME_ARGS (stream_time),
1838           GST_STIME_ARGS (stream->current_position));
1839     }
1840
1841     /* Trigger (re)computation of the parsebin input segment */
1842     stream->compute_segment = TRUE;
1843
1844     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1845
1846     stream->discont = TRUE;
1847     stream->need_header = TRUE;
1848     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
1849   }
1850
1851   /* Check if we're done with our segment */
1852   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1853   if (demux->segment.rate > 0) {
1854     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop)
1855         && stream->current_position >= demux->segment.stop) {
1856       end_of_manifest = TRUE;
1857     }
1858   } else {
1859     if (GST_CLOCK_TIME_IS_VALID (demux->segment.start)
1860         && stream->current_position <= demux->segment.start) {
1861       end_of_manifest = TRUE;
1862     }
1863   }
1864   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1865
1866   if (end_of_manifest) {
1867     gst_adaptive_demux2_stream_end_of_manifest (stream);
1868     return FALSE;
1869   }
1870   return gst_adaptive_demux2_stream_load_a_fragment (stream);
1871 }
1872
1873 static gboolean
1874 gst_adaptive_demux2_stream_can_start (GstAdaptiveDemux2Stream * stream)
1875 {
1876   GstAdaptiveDemux *demux = stream->demux;
1877   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1878
1879   if (!klass->stream_can_start)
1880     return TRUE;
1881   return klass->stream_can_start (demux, stream);
1882 }
1883
1884 /**
1885  * gst_adaptive_demux2_stream_start:
1886  * @stream: a #GstAdaptiveDemux2Stream
1887  *
1888  * Start the given @stream. Should be called by subclasses that previously
1889  * returned %FALSE in `GstAdaptiveDemux::stream_can_start()`
1890  */
1891 void
1892 gst_adaptive_demux2_stream_start (GstAdaptiveDemux2Stream * stream)
1893 {
1894   GstAdaptiveDemux *demux;
1895
1896   g_return_if_fail (stream && stream->demux);
1897
1898   demux = stream->demux;
1899
1900   if (stream->pending_cb_id != 0 || stream->download_active) {
1901     /* There is already something active / pending on this stream */
1902     GST_LOG_OBJECT (stream, "Stream already running");
1903     return;
1904   }
1905
1906   /* Some streams require a delayed start, i.e. they need more information
1907    * before they can actually be started */
1908   if (!gst_adaptive_demux2_stream_can_start (stream)) {
1909     GST_LOG_OBJECT (stream, "Stream will be started asynchronously");
1910     return;
1911   }
1912
1913   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS) {
1914     GST_LOG_OBJECT (stream, "Stream is EOS already");
1915     return;
1916   }
1917
1918   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED ||
1919       stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART) {
1920     GST_LOG_OBJECT (stream, "Activating stream. Current state %d",
1921         stream->state);
1922     stream->cancelled = FALSE;
1923     stream->replaced = FALSE;
1924     stream->last_ret = GST_FLOW_OK;
1925
1926     if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)
1927       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
1928   }
1929
1930   GST_LOG_OBJECT (stream, "Scheduling next_download() call");
1931   stream->pending_cb_id =
1932       gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
1933       (GSourceFunc) gst_adaptive_demux2_stream_next_download,
1934       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1935 }
1936
1937 void
1938 gst_adaptive_demux2_stream_stop (GstAdaptiveDemux2Stream * stream)
1939 {
1940   GstAdaptiveDemux *demux = stream->demux;
1941
1942   GST_DEBUG_OBJECT (stream, "Stopping stream (from state %d)", stream->state);
1943   stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
1944
1945   if (stream->pending_cb_id != 0) {
1946     gst_adaptive_demux_loop_cancel_call (demux->priv->scheduler_task,
1947         stream->pending_cb_id);
1948     stream->pending_cb_id = 0;
1949   }
1950
1951   /* Cancel and drop the existing download request */
1952   downloadhelper_cancel_request (demux->download_helper,
1953       stream->download_request);
1954   download_request_unref (stream->download_request);
1955   stream->downloading_header = stream->downloading_index = FALSE;
1956   stream->download_request = download_request_new ();
1957   stream->download_active = FALSE;
1958 }
1959
1960 gboolean
1961 gst_adaptive_demux2_stream_is_running (GstAdaptiveDemux2Stream * stream)
1962 {
1963   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)
1964     return FALSE;
1965   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART)
1966     return FALSE;
1967   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS)
1968     return FALSE;
1969   return TRUE;
1970 }
1971
1972 gboolean
1973 gst_adaptive_demux2_stream_is_selected_locked (GstAdaptiveDemux2Stream * stream)
1974 {
1975   GList *tmp;
1976
1977   for (tmp = stream->tracks; tmp; tmp = tmp->next) {
1978     GstAdaptiveDemuxTrack *track = tmp->data;
1979     if (track->selected)
1980       return TRUE;
1981   }
1982
1983   return FALSE;
1984 }
1985
1986 /**
1987  * gst_adaptive_demux2_stream_is_selected:
1988  * @stream: A #GstAdaptiveDemux2Stream
1989  *
1990  * Returns: %TRUE if any of the tracks targetted by @stream is selected
1991  */
1992 gboolean
1993 gst_adaptive_demux2_stream_is_selected (GstAdaptiveDemux2Stream * stream)
1994 {
1995   gboolean ret;
1996
1997   g_return_val_if_fail (stream && stream->demux, FALSE);
1998
1999   TRACKS_LOCK (stream->demux);
2000   ret = gst_adaptive_demux2_stream_is_selected_locked (stream);
2001   TRACKS_UNLOCK (stream->demux);
2002
2003   return ret;
2004 }