adaptivedemux2: Rework input download wakeups
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / ext / adaptivedemux2 / gstadaptivedemux-stream.c
1 /* GStreamer
2  *
3  * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4  *   Author: Thiago Santos <thiagoss@osg.samsung.com>
5  *
6  * Copyright (C) 2021-2022 Centricular Ltd
7  *   Author: Edward Hervey <edward@centricular.com>
8  *   Author: Jan Schmidt <jan@centricular.com>
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29
30 #include "gstadaptivedemux.h"
31 #include "gstadaptivedemux-private.h"
32
33 #include <glib/gi18n-lib.h>
34 #include <gst/app/gstappsrc.h>
35
36 GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
37 #define GST_CAT_DEFAULT adaptivedemux2_debug
38
39 static void gst_adaptive_demux2_stream_finalize (GObject * object);
40 static void gst_adaptive_demux2_stream_error (GstAdaptiveDemux2Stream * stream);
41
42 #define gst_adaptive_demux2_stream_parent_class parent_class
43 G_DEFINE_ABSTRACT_TYPE (GstAdaptiveDemux2Stream, gst_adaptive_demux2_stream,
44     GST_TYPE_OBJECT);
45
46 static void
47 gst_adaptive_demux2_stream_class_init (GstAdaptiveDemux2StreamClass * klass)
48 {
49   GObjectClass *gobject_class = (GObjectClass *) klass;
50
51   gobject_class->finalize = gst_adaptive_demux2_stream_finalize;
52 }
53
54 static GType tsdemux_type = 0;
55
56 static void
57 gst_adaptive_demux2_stream_init (GstAdaptiveDemux2Stream * stream)
58 {
59   stream->download_request = download_request_new ();
60   stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
61   stream->last_ret = GST_FLOW_OK;
62   stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
63
64   stream->fragment_bitrates =
65       g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS);
66
67   stream->start_position = stream->current_position = GST_CLOCK_TIME_NONE;
68
69   gst_segment_init (&stream->parse_segment, GST_FORMAT_TIME);
70 }
71
72 /* must be called with manifest_lock taken.
73  * It will temporarily drop the manifest_lock in order to join the task.
74  * It will join only the old_streams (the demux->streams are joined by
75  * gst_adaptive_demux_stop_tasks before gst_adaptive_demux2_stream_free is
76  * called)
77  */
78 static void
79 gst_adaptive_demux2_stream_finalize (GObject * object)
80 {
81   GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) object;
82
83   GST_LOG_OBJECT (object, "Finalizing");
84
85   if (stream->download_request)
86     download_request_unref (stream->download_request);
87
88   stream->cancelled = TRUE;
89   g_clear_error (&stream->last_error);
90
91   gst_adaptive_demux2_stream_fragment_clear (&stream->fragment);
92
93   if (stream->pending_events) {
94     g_list_free_full (stream->pending_events, (GDestroyNotify) gst_event_unref);
95     stream->pending_events = NULL;
96   }
97
98   if (stream->parsebin_sink) {
99     gst_object_unref (stream->parsebin_sink);
100     stream->parsebin_sink = NULL;
101   }
102
103   if (stream->pad_added_id)
104     g_signal_handler_disconnect (stream->parsebin, stream->pad_added_id);
105   if (stream->pad_removed_id)
106     g_signal_handler_disconnect (stream->parsebin, stream->pad_removed_id);
107
108   if (stream->parsebin != NULL) {
109     GST_LOG_OBJECT (stream, "Removing parsebin");
110     gst_bin_remove (GST_BIN_CAST (stream->demux), stream->parsebin);
111     gst_element_set_state (stream->parsebin, GST_STATE_NULL);
112     gst_object_unref (stream->parsebin);
113     stream->parsebin = NULL;
114   }
115
116   g_free (stream->fragment_bitrates);
117
118   g_list_free_full (stream->tracks,
119       (GDestroyNotify) gst_adaptive_demux_track_unref);
120
121   if (stream->pending_caps)
122     gst_caps_unref (stream->pending_caps);
123
124   g_clear_pointer (&stream->pending_tags, gst_tag_list_unref);
125   g_clear_pointer (&stream->stream_collection, gst_object_unref);
126
127   G_OBJECT_CLASS (parent_class)->finalize (object);
128 }
129
130 /**
131  * gst_adaptive_demux2_stream_add_track:
132  * @stream: A #GstAdaptiveDemux2Stream
133  * @track: (transfer none): A #GstAdaptiveDemuxTrack to assign to the @stream
134  *
135  * This function is called when a subclass knows of a target @track that this
136  * @stream can provide.
137  */
138 gboolean
139 gst_adaptive_demux2_stream_add_track (GstAdaptiveDemux2Stream * stream,
140     GstAdaptiveDemuxTrack * track)
141 {
142   g_return_val_if_fail (track != NULL, FALSE);
143
144   GST_DEBUG_OBJECT (stream->demux, "stream:%p track:%s", stream,
145       track->stream_id);
146   if (g_list_find (stream->tracks, track)) {
147     GST_DEBUG_OBJECT (stream->demux,
148         "track '%s' already handled by this stream", track->stream_id);
149     return FALSE;
150   }
151
152   stream->tracks =
153       g_list_append (stream->tracks, gst_adaptive_demux_track_ref (track));
154   if (stream->demux) {
155     g_assert (stream->period);
156     gst_adaptive_demux_period_add_track (stream->period, track);
157   }
158   return TRUE;
159 }
160
161 static gboolean
162 gst_adaptive_demux2_stream_next_download (GstAdaptiveDemux2Stream * stream);
163 static gboolean
164 gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream);
165 static void
166 gst_adaptive_demux2_stream_handle_playlist_eos (GstAdaptiveDemux2Stream *
167     stream);
168 static GstFlowReturn
169 gst_adaptive_demux2_stream_begin_download_uri (GstAdaptiveDemux * demux,
170     GstAdaptiveDemux2Stream * stream, const gchar * uri, gint64 start,
171     gint64 end);
172
173 #ifndef GST_DISABLE_GST_DEBUG
174 static const char *
175 uritype (GstAdaptiveDemux2Stream * s)
176 {
177   if (s->downloading_header)
178     return "header";
179   if (s->downloading_index)
180     return "index";
181   return "fragment";
182 }
183 #endif
184
185 /* Schedules another chunked download (returns TRUE) or FALSE if no more chunks */
186 static gboolean
187 schedule_another_chunk (GstAdaptiveDemux2Stream * stream)
188 {
189   GstAdaptiveDemux *demux = stream->demux;
190   DownloadRequest *request = stream->download_request;
191   GstFlowReturn ret;
192
193   gchar *uri = request->uri;
194   gint64 range_start = request->range_start;
195   gint64 range_end = request->range_end;
196   gint64 chunk_size;
197   gint64 chunk_end;
198
199   if (range_end == -1)
200     return FALSE;               /* This was a request to the end, no more to load */
201
202   /* The size of the request that just completed: */
203   chunk_size = range_end + 1 - range_start;
204
205   if (request->content_received < chunk_size)
206     return FALSE;               /* Short read - we're done */
207
208   /* Accumulate the data we just fetched, to figure out the next
209    * request start position and update the target chunk size from
210    * the updated stream fragment info */
211   range_start += chunk_size;
212   range_end = stream->fragment.range_end;
213   chunk_size = stream->fragment.chunk_size;
214
215   if (chunk_size == 0)
216     return FALSE;               /* Sub-class doesn't want another chunk */
217
218   /* HTTP ranges are inclusive for the end */
219   if (chunk_size != -1) {
220     chunk_end = range_start + chunk_size - 1;
221     if (range_end != -1 && range_end < chunk_end)
222       chunk_end = range_end;
223   } else {
224     chunk_end = range_end;
225   }
226
227   GST_DEBUG_OBJECT (stream,
228       "Starting next chunk %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT
229       " chunk_size %" G_GINT64_FORMAT, uri, range_start, chunk_end, chunk_size);
230
231   ret =
232       gst_adaptive_demux2_stream_begin_download_uri (demux, stream, uri,
233       range_start, chunk_end);
234   if (ret != GST_FLOW_OK) {
235     GST_DEBUG_OBJECT (stream,
236         "Stopping stream due to begin download failure - ret %s",
237         gst_flow_get_name (ret));
238     gst_adaptive_demux2_stream_stop (stream);
239     return FALSE;
240   }
241
242   return TRUE;
243 }
244
245 static void
246 drain_inactive_tracks (GstAdaptiveDemux * demux,
247     GstAdaptiveDemux2Stream * stream)
248 {
249   GList *iter;
250
251   TRACKS_LOCK (demux);
252   for (iter = stream->tracks; iter; iter = iter->next) {
253     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
254     if (!track->selected) {
255       gst_adaptive_demux_track_drain_to (track,
256           demux->priv->global_output_position);
257     }
258   }
259
260   TRACKS_UNLOCK (demux);
261 }
262
263 /* Called to complete a download, either due to failure or completion
264  * Should set up the next download if necessary */
265 static void
266 gst_adaptive_demux2_stream_finish_download (GstAdaptiveDemux2Stream *
267     stream, GstFlowReturn ret, GError * err)
268 {
269   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux);
270   GstAdaptiveDemux *demux = stream->demux;
271
272   GST_DEBUG_OBJECT (stream,
273       "%s download finish: %d %s - err: %p", uritype (stream), ret,
274       gst_flow_get_name (ret), err);
275
276   stream->download_finished = TRUE;
277
278   /* finish_fragment might call gst_adaptive_demux2_stream_advance_fragment,
279    * which can look at the last_ret - so make sure it's stored before calling that.
280    * Also, for not-linked or other errors passed in that are going to make
281    * this stream stop, we'll need to store it */
282   stream->last_ret = ret;
283
284   if (err) {
285     g_clear_error (&stream->last_error);
286     stream->last_error = g_error_copy (err);
287   }
288
289   /* For actual errors, stop now, no need to call finish_fragment and get
290    * confused if it returns a non-error status, but if EOS was passed in,
291    * continue and check whether finish_fragment() says we've finished
292    * the whole manifest or just this fragment */
293   if (ret < 0 && ret != GST_FLOW_EOS) {
294     GST_INFO_OBJECT (stream,
295         "Stopping stream due to error ret %s", gst_flow_get_name (ret));
296     gst_adaptive_demux2_stream_stop (stream);
297     return;
298   }
299
300   /* Handle all the possible flow returns here: */
301   if (ret == GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC) {
302     /* We lost sync, seek back to live and return */
303     GST_WARNING_OBJECT (stream, "Lost sync when downloading");
304     gst_adaptive_demux_handle_lost_sync (demux);
305     return;
306   } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_END_OF_FRAGMENT) {
307     /* The sub-class wants to stop the fragment immediately */
308     stream->fragment.finished = TRUE;
309     ret = klass->finish_fragment (demux, stream);
310
311     GST_DEBUG_OBJECT (stream, "finish_fragment ret %d %s", ret,
312         gst_flow_get_name (ret));
313   } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_RESTART_FRAGMENT) {
314     GST_DEBUG_OBJECT (stream, "Restarting download as requested");
315     /* Just mark the fragment as finished */
316     stream->fragment.finished = TRUE;
317     ret = GST_FLOW_OK;
318   } else if (!klass->need_another_chunk || stream->fragment.chunk_size == -1
319       || !klass->need_another_chunk (stream)
320       || stream->fragment.chunk_size == 0) {
321     stream->fragment.finished = TRUE;
322     ret = klass->finish_fragment (stream->demux, stream);
323
324     GST_DEBUG_OBJECT (stream, "finish_fragment ret %d %s", ret,
325         gst_flow_get_name (ret));
326   } else if (stream->fragment.chunk_size != 0
327       && schedule_another_chunk (stream)) {
328     /* Another download has already begun, no need to queue anything below */
329     return;
330   }
331
332   /* For HLS, we might be enqueueing data into tracks that aren't
333    * selected. Drain those ones out */
334   drain_inactive_tracks (stream->demux, stream);
335
336   /* Now that we've called finish_fragment we can clear these flags the
337    * sub-class might have checked */
338   if (stream->downloading_header) {
339     stream->need_header = FALSE;
340     stream->downloading_header = FALSE;
341   } else if (stream->downloading_index) {
342     stream->need_index = FALSE;
343     stream->downloading_index = FALSE;
344     /* Restart the fragment again now that header + index were loaded
345      * so that get_fragment_info() will be called again */
346     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
347   } else {
348     /* Finishing a fragment data download. Try for another */
349     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
350   }
351
352   /* if GST_FLOW_EOS was passed in that means this download is finished,
353    * but it's the result returned from finish_fragment() we really care
354    * about, as that tells us if the manifest has run out of fragments
355    * to load */
356   if (ret == GST_FLOW_EOS) {
357     stream->last_ret = ret;
358
359     gst_adaptive_demux2_stream_handle_playlist_eos (stream);
360     return;
361   }
362
363   /* Now finally, if ret is anything other than success, we should stop this
364    * stream */
365   if (ret < 0) {
366     GST_DEBUG_OBJECT (stream,
367         "Stopping stream due to finish fragment ret %s",
368         gst_flow_get_name (ret));
369     gst_adaptive_demux2_stream_stop (stream);
370     return;
371   }
372
373   /* Clear the last_ret marker before starting a fresh download */
374   stream->last_ret = GST_FLOW_OK;
375
376   GST_LOG_OBJECT (stream, "Scheduling next_download() call");
377   stream->pending_cb_id =
378       gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
379       (GSourceFunc) gst_adaptive_demux2_stream_next_download,
380       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
381 }
382
383 /* Must be called from the scheduler context */
384 void
385 gst_adaptive_demux2_stream_parse_error (GstAdaptiveDemux2Stream * stream,
386     GError * err)
387 {
388   GstAdaptiveDemux *demux = stream->demux;
389
390   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
391     return;
392
393   downloadhelper_cancel_request (demux->download_helper,
394       stream->download_request);
395
396   /* cancellation is async, so recycle our download request to avoid races */
397   download_request_unref (stream->download_request);
398   stream->download_request = download_request_new ();
399
400   gst_adaptive_demux2_stream_finish_download (stream, GST_FLOW_CUSTOM_ERROR,
401       err);
402 }
403
404 static void
405 gst_adaptive_demux2_stream_prepare_segment (GstAdaptiveDemux * demux,
406     GstAdaptiveDemux2Stream * stream, gboolean first_and_live)
407 {
408   GstClockTime period_start = gst_adaptive_demux_get_period_start_time (demux);
409   GstClockTime offset =
410       gst_adaptive_demux2_stream_get_presentation_offset (demux, stream);
411
412   stream->parse_segment = demux->segment;
413
414   /* The demuxer segment is just built from seek events, but for each stream
415    * we have to adjust segments according to the current period and the
416    * stream specific presentation time offset.
417    *
418    * For each period, buffer timestamps start again from 0. Additionally the
419    * buffer timestamps are shifted by the stream specific presentation time
420    * offset, so the first buffer timestamp of a period is 0 + presentation
421    * time offset. If the stream contains timestamps itself, this is also
422    * supposed to be the presentation time stored inside the stream.
423    *
424    * The stream time over periods is supposed to be continuous, that is the
425    * buffer timestamp 0 + presentation time offset should map to the start
426    * time of the current period.
427    *
428    *
429    * The adjustment of the stream segments as such works the following.
430    *
431    * If the demuxer segment start is bigger than the period start, this
432    * means that we have to drop some media at the beginning of the current
433    * period, e.g. because a seek into the middle of the period has
434    * happened. The amount of media to drop is the difference between the
435    * period start and the demuxer segment start, and as each period starts
436    * again from 0, this difference is going to be the actual stream's
437    * segment start. As all timestamps of the stream are shifted by the
438    * presentation time offset, we will also have to move the segment start
439    * by that offset.
440    *
441    * Likewise, the demuxer segment stop value is adjusted in the same
442    * fashion.
443    *
444    * Now the running time and stream time at the stream's segment start has
445    * to be the one that is stored inside the demuxer's segment, which means
446    * that segment.base and segment.time have to be copied over (done just
447    * above)
448    *
449    *
450    * If the demuxer segment start is smaller than the period start time,
451    * this means that the whole period is inside the segment. As each period
452    * starts timestamps from 0, and additionally timestamps are shifted by
453    * the presentation time offset, the stream's first timestamp (and as such
454    * the stream's segment start) has to be the presentation time offset.
455    * The stream time at the segment start is supposed to be the stream time
456    * of the period start according to the demuxer segment, so the stream
457    * segment's time would be set to that. The same goes for the stream
458    * segment's base, which is supposed to be the running time of the period
459    * start according to the demuxer's segment.
460    *
461    * The same logic applies for negative rates with the segment stop and
462    * the period stop time (which gets clamped).
463    *
464    *
465    * For the first case where not the complete period is inside the segment,
466    * the segment time and base as calculated by the second case would be
467    * equivalent.
468    */
469   GST_DEBUG_OBJECT (demux, "Using demux segment %" GST_SEGMENT_FORMAT,
470       &demux->segment);
471   GST_DEBUG_OBJECT (demux,
472       "period_start: %" GST_TIME_FORMAT " offset: %" GST_TIME_FORMAT,
473       GST_TIME_ARGS (period_start), GST_TIME_ARGS (offset));
474   /* note for readers:
475    * Since stream->parse_segment is initially a copy of demux->segment,
476    * only the values that need updating are modified below. */
477   if (first_and_live) {
478     /* If first and live, demuxer did seek to the current position already */
479     stream->parse_segment.start = demux->segment.start - period_start + offset;
480     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
481       stream->parse_segment.stop = demux->segment.stop - period_start + offset;
482     /* FIXME : Do we need to handle negative rates for this ? */
483     stream->parse_segment.position = stream->parse_segment.start;
484   } else if (demux->segment.start > period_start) {
485     /* seek within a period */
486     stream->parse_segment.start = demux->segment.start - period_start + offset;
487     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
488       stream->parse_segment.stop = demux->segment.stop - period_start + offset;
489     if (stream->parse_segment.rate >= 0)
490       stream->parse_segment.position = offset;
491     else
492       stream->parse_segment.position = stream->parse_segment.stop;
493   } else {
494     stream->parse_segment.start = offset;
495     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
496       stream->parse_segment.stop = demux->segment.stop - period_start + offset;
497     if (stream->parse_segment.rate >= 0) {
498       stream->parse_segment.position = offset;
499       stream->parse_segment.base =
500           gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
501           period_start);
502     } else {
503       stream->parse_segment.position = stream->parse_segment.stop;
504       stream->parse_segment.base =
505           gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
506           period_start + demux->segment.stop - demux->segment.start);
507     }
508     stream->parse_segment.time =
509         gst_segment_to_stream_time (&demux->segment, GST_FORMAT_TIME,
510         period_start);
511   }
512
513   stream->send_segment = TRUE;
514
515   GST_DEBUG_OBJECT (stream, "Prepared segment %" GST_SEGMENT_FORMAT,
516       &stream->parse_segment);
517 }
518
519 /* Segment lock hold */
520 static void
521 update_buffer_pts_and_demux_position_locked (GstAdaptiveDemux * demux,
522     GstAdaptiveDemux2Stream * stream, GstBuffer * buffer)
523 {
524   GstClockTimeDiff pos;
525
526   GST_DEBUG_OBJECT (stream, "stream->fragment.stream_time %" GST_STIME_FORMAT,
527       GST_STIME_ARGS (stream->fragment.stream_time));
528
529   pos = stream->fragment.stream_time;
530
531   if (GST_CLOCK_STIME_IS_VALID (pos)) {
532     GstClockTime offset =
533         gst_adaptive_demux2_stream_get_presentation_offset (demux, stream);
534
535     pos += offset;
536
537     if (pos < 0) {
538       GST_WARNING_OBJECT (stream, "Clamping segment and buffer position to 0");
539       pos = 0;
540     }
541
542     GST_BUFFER_PTS (buffer) = pos;
543   } else {
544     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
545   }
546
547   GST_DEBUG_OBJECT (stream, "Buffer/stream position is now: %" GST_TIME_FORMAT,
548       GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
549 }
550
551 /* Must be called from the scheduler context */
552 GstFlowReturn
553 gst_adaptive_demux2_stream_push_buffer (GstAdaptiveDemux2Stream * stream,
554     GstBuffer * buffer)
555 {
556   GstAdaptiveDemux *demux = stream->demux;
557   GstFlowReturn ret = GST_FLOW_OK;
558   gboolean discont = FALSE;
559   /* Pending events */
560   GstEvent *pending_caps = NULL, *pending_segment = NULL, *pending_tags =
561       NULL, *stream_start = NULL, *buffer_gap = NULL;
562   GList *pending_events = NULL;
563
564   if (stream->compute_segment) {
565     gst_adaptive_demux2_stream_prepare_segment (demux, stream,
566         stream->first_and_live);
567     stream->compute_segment = FALSE;
568     stream->first_and_live = FALSE;
569   }
570
571   if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DROPPABLE)) {
572     GST_DEBUG_OBJECT (stream, "Creating gap event for droppable buffer");
573     buffer_gap =
574         gst_event_new_gap (GST_BUFFER_PTS (buffer),
575         GST_BUFFER_DURATION (buffer));
576   }
577
578   if (stream->first_fragment_buffer) {
579     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
580     if (demux->segment.rate < 0)
581       /* Set DISCONT flag for every first buffer in reverse playback mode
582        * as each fragment for its own has to be reversed */
583       discont = TRUE;
584     update_buffer_pts_and_demux_position_locked (demux, stream, buffer);
585     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
586
587     GST_LOG_OBJECT (stream, "Handling initial buffer %" GST_PTR_FORMAT, buffer);
588
589     /* Do we need to inject STREAM_START and SEGMENT events ?
590      *
591      * This can happen when a stream is restarted, and also when switching to a
592      * variant which needs a header (in which case downloading_header will be
593      * TRUE)
594      */
595     if (G_UNLIKELY (stream->send_segment || stream->downloading_header)) {
596       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
597       pending_segment = gst_event_new_segment (&stream->parse_segment);
598       gst_event_set_seqnum (pending_segment, demux->priv->segment_seqnum);
599       stream->send_segment = FALSE;
600       GST_DEBUG_OBJECT (stream, "Sending %" GST_PTR_FORMAT, pending_segment);
601       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
602       stream_start = gst_event_new_stream_start ("bogus");
603       if (demux->have_group_id)
604         gst_event_set_group_id (stream_start, demux->group_id);
605     }
606   } else {
607     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
608   }
609   stream->first_fragment_buffer = FALSE;
610
611   if (stream->discont) {
612     discont = TRUE;
613     stream->discont = FALSE;
614   }
615
616   if (discont) {
617     GST_DEBUG_OBJECT (stream, "Marking fragment as discontinuous");
618     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
619   } else {
620     GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
621   }
622
623   GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
624   GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
625   if (G_UNLIKELY (stream->pending_caps)) {
626     pending_caps = gst_event_new_caps (stream->pending_caps);
627     gst_caps_unref (stream->pending_caps);
628     stream->pending_caps = NULL;
629   }
630
631   if (G_UNLIKELY (stream->pending_tags)) {
632     GstTagList *tags = stream->pending_tags;
633
634     stream->pending_tags = NULL;
635
636     if (tags)
637       pending_tags = gst_event_new_tag (tags);
638   }
639   if (G_UNLIKELY (stream->pending_events)) {
640     pending_events = stream->pending_events;
641     stream->pending_events = NULL;
642   }
643
644   /* Do not push events or buffers holding the manifest lock */
645   if (G_UNLIKELY (stream_start)) {
646     GST_DEBUG_OBJECT (stream,
647         "Setting stream start: %" GST_PTR_FORMAT, stream_start);
648     gst_pad_send_event (stream->parsebin_sink, stream_start);
649   }
650   if (G_UNLIKELY (pending_caps)) {
651     GST_DEBUG_OBJECT (stream,
652         "Setting pending caps: %" GST_PTR_FORMAT, pending_caps);
653     gst_pad_send_event (stream->parsebin_sink, pending_caps);
654   }
655   if (G_UNLIKELY (pending_segment)) {
656     GST_DEBUG_OBJECT (stream,
657         "Sending pending seg: %" GST_PTR_FORMAT, pending_segment);
658     gst_pad_send_event (stream->parsebin_sink, pending_segment);
659   }
660   if (G_UNLIKELY (pending_tags)) {
661     GST_DEBUG_OBJECT (stream,
662         "Sending pending tags: %" GST_PTR_FORMAT, pending_tags);
663     gst_pad_send_event (stream->parsebin_sink, pending_tags);
664   }
665   while (pending_events != NULL) {
666     GstEvent *event = pending_events->data;
667
668     GST_DEBUG_OBJECT (stream, "Sending pending event: %" GST_PTR_FORMAT, event);
669     if (!gst_pad_send_event (stream->parsebin_sink, event))
670       GST_ERROR_OBJECT (stream, "Failed to send pending event");
671
672     pending_events = g_list_delete_link (pending_events, pending_events);
673   }
674
675   GST_DEBUG_OBJECT (stream,
676       "About to push buffer of size %" G_GSIZE_FORMAT " offset %"
677       G_GUINT64_FORMAT, gst_buffer_get_size (buffer),
678       GST_BUFFER_OFFSET (buffer));
679
680   ret = gst_pad_chain (stream->parsebin_sink, buffer);
681
682   if (buffer_gap) {
683     GST_DEBUG_OBJECT (stream, "Sending %" GST_PTR_FORMAT, buffer_gap);
684     gst_pad_send_event (stream->parsebin_sink, buffer_gap);
685   }
686
687   if (G_UNLIKELY (stream->cancelled)) {
688     GST_LOG_OBJECT (demux, "Stream was cancelled");
689     return GST_FLOW_FLUSHING;
690   }
691
692   GST_LOG_OBJECT (stream, "Push result: %d %s", ret, gst_flow_get_name (ret));
693
694   return ret;
695 }
696
697 static GstFlowReturn
698 gst_adaptive_demux2_stream_parse_buffer (GstAdaptiveDemux2Stream * stream,
699     GstBuffer * buffer)
700 {
701   GstAdaptiveDemux *demux = stream->demux;
702   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
703   GstFlowReturn ret = GST_FLOW_OK;
704
705   /* do not make any changes if the stream is cancelled */
706   if (G_UNLIKELY (stream->cancelled)) {
707     gst_buffer_unref (buffer);
708     return GST_FLOW_FLUSHING;
709   }
710
711   /* starting_fragment is set to TRUE at the beginning of
712    * _stream_download_fragment()
713    * /!\ If there is a header/index being downloaded, then this will
714    * be TRUE for the first one ... but FALSE for the remaining ones,
715    * including the *actual* fragment ! */
716   if (stream->starting_fragment) {
717     stream->starting_fragment = FALSE;
718     if (klass->start_fragment != NULL && !klass->start_fragment (demux, stream))
719       return GST_FLOW_ERROR;
720   }
721
722   stream->download_total_bytes += gst_buffer_get_size (buffer);
723
724   GST_TRACE_OBJECT (stream,
725       "Received %s buffer of size %" G_GSIZE_FORMAT, uritype (stream),
726       gst_buffer_get_size (buffer));
727
728   ret = klass->data_received (demux, stream, buffer);
729
730   if (ret != GST_FLOW_OK) {
731     GST_DEBUG_OBJECT (stream, "data_received returned %s",
732         gst_flow_get_name (ret));
733
734     if (ret == GST_FLOW_FLUSHING) {
735       /* do not make any changes if the stream is cancelled */
736       if (G_UNLIKELY (stream->cancelled)) {
737         return ret;
738       }
739     }
740
741     if (ret < GST_FLOW_EOS) {
742       GstEvent *eos = gst_event_new_eos ();
743       GST_ELEMENT_FLOW_ERROR (demux, ret);
744
745       GST_DEBUG_OBJECT (stream, "Pushing EOS to parser");
746
747       /* TODO push this on all pads */
748       gst_event_set_seqnum (eos, stream->demux->priv->segment_seqnum);
749       gst_pad_send_event (stream->parsebin_sink, eos);
750       ret = GST_FLOW_ERROR;
751
752       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_ERRORED;
753     }
754   }
755
756   return ret;
757 }
758
759 /* Calculate the low and high download buffering watermarks
760  * in time as MAX (low-watermark-time, low-watermark-fragments) and
761  * MIN (high-watermark-time, high-watermark-fragments) respectively
762  */
763 static void
764 calculate_track_thresholds (GstAdaptiveDemux * demux,
765     GstClockTime fragment_duration, GstClockTime * low_threshold,
766     GstClockTime * high_threshold)
767 {
768   GST_OBJECT_LOCK (demux);
769   *low_threshold = demux->buffering_low_watermark_fragments * fragment_duration;
770   if (*low_threshold == 0 ||
771       (demux->buffering_low_watermark_time != 0
772           && demux->buffering_low_watermark_time > *low_threshold)) {
773     *low_threshold = demux->buffering_low_watermark_time;
774   }
775
776   *high_threshold =
777       demux->buffering_high_watermark_fragments * fragment_duration;
778   if (*high_threshold == 0 || (demux->buffering_high_watermark_time != 0
779           && demux->buffering_high_watermark_time < *high_threshold)) {
780     *high_threshold = demux->buffering_high_watermark_time;
781   }
782
783   /* Make sure the low and high thresholds are less than the maximum buffering
784    * time */
785   if (*high_threshold == 0 ||
786       (demux->max_buffering_time != 0
787           && demux->max_buffering_time < *high_threshold)) {
788     *high_threshold = demux->max_buffering_time;
789   }
790
791   if (*low_threshold == 0 ||
792       (demux->max_buffering_time != 0
793           && demux->max_buffering_time < *low_threshold)) {
794     *low_threshold = demux->max_buffering_time;
795   }
796
797   /* Make sure the high threshold is higher than (or equal to) the low threshold.
798    * It's OK if they are the same, as the minimum download is 1 fragment */
799   if (*high_threshold == 0 ||
800       (*low_threshold != 0 && *low_threshold > *high_threshold)) {
801     *high_threshold = *low_threshold;
802   }
803   GST_OBJECT_UNLOCK (demux);
804 }
805
806 static gboolean
807 gst_adaptive_demux2_stream_wait_for_output_space (GstAdaptiveDemux * demux,
808     GstAdaptiveDemux2Stream * stream, GstClockTime fragment_duration)
809 {
810   gboolean need_to_wait = TRUE;
811   gboolean have_any_tracks = FALSE;
812   gboolean have_active_tracks = FALSE;
813   gboolean have_filled_inactive = FALSE;
814   gboolean update_buffering = FALSE;
815
816   GstClockTime low_threshold = 0, high_threshold = 0;
817   GList *iter;
818
819   calculate_track_thresholds (demux, fragment_duration,
820       &low_threshold, &high_threshold);
821
822   /* If there are no tracks at all, don't wait. If there are no active
823    * tracks, keep filling until at least one track is full. If there
824    * are active tracks, require that they are all full */
825   TRACKS_LOCK (demux);
826   for (iter = stream->tracks; iter; iter = iter->next) {
827     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
828
829     /* Update the buffering threshold */
830     if (low_threshold != track->buffering_threshold) {
831       /* The buffering threshold for this track changed, make sure to
832        * re-check buffering status */
833       update_buffering = TRUE;
834       track->buffering_threshold = low_threshold;
835     }
836
837     have_any_tracks = TRUE;
838     if (track->active)
839       have_active_tracks = TRUE;
840
841     if (track->level_time < high_threshold) {
842       if (track->active) {
843         need_to_wait = FALSE;
844         GST_DEBUG_OBJECT (stream,
845             "track %s has level %" GST_TIME_FORMAT
846             " - needs more data (target %" GST_TIME_FORMAT
847             ") (fragment duration %" GST_TIME_FORMAT ")",
848             track->stream_id, GST_TIME_ARGS (track->level_time),
849             GST_TIME_ARGS (high_threshold), GST_TIME_ARGS (fragment_duration));
850         continue;
851       }
852     } else if (!track->active) {        /* track is over threshold and inactive */
853       have_filled_inactive = TRUE;
854     }
855
856     GST_DEBUG_OBJECT (stream,
857         "track %s active (%d) has level %" GST_TIME_FORMAT,
858         track->stream_id, track->active, GST_TIME_ARGS (track->level_time));
859   }
860
861   /* If there are no tracks, don't wait (we might need data to create them),
862    * or if there are active tracks that need more data to hit the threshold,
863    * don't wait. Otherwise it means all active tracks are full and we should wait */
864   if (!have_any_tracks) {
865     GST_DEBUG_OBJECT (stream, "no tracks created yet - not waiting");
866     need_to_wait = FALSE;
867   } else if (!have_active_tracks && !have_filled_inactive) {
868     GST_DEBUG_OBJECT (stream,
869         "have only inactive tracks that need more data - not waiting");
870     need_to_wait = FALSE;
871   }
872
873   if (need_to_wait) {
874     stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
875
876     for (iter = stream->tracks; iter; iter = iter->next) {
877       GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
878
879       GST_DEBUG_OBJECT (stream,
880           "Waiting for queued data on track %s to drop below %"
881           GST_TIME_FORMAT " (fragment duration %" GST_TIME_FORMAT ")",
882           track->stream_id, GST_TIME_ARGS (high_threshold),
883           GST_TIME_ARGS (fragment_duration));
884
885       /* we want to get woken up when the global output position reaches
886        * a point where the input is closer than "high_threshold" to needing
887        * output, so we can put more data in */
888       GstClockTimeDiff wakeup_time = track->input_time - high_threshold;
889
890       if (stream->next_input_wakeup_time == GST_CLOCK_STIME_NONE ||
891           wakeup_time < stream->next_input_wakeup_time) {
892         stream->next_input_wakeup_time = wakeup_time;
893
894         GST_DEBUG_OBJECT (stream,
895             "Track %s level %" GST_TIME_FORMAT ". Input at position %"
896             GST_TIME_FORMAT " next wakeup should be %" GST_TIME_FORMAT " now %"
897             GST_TIME_FORMAT, track->stream_id,
898             GST_TIME_ARGS (track->level_time),
899             GST_TIME_ARGS (track->input_time), GST_TIME_ARGS (wakeup_time),
900             GST_TIME_ARGS (demux->priv->global_output_position));
901       }
902     }
903
904     if (stream->next_input_wakeup_time != GST_CLOCK_TIME_NONE) {
905       GST_DEBUG_OBJECT (stream,
906           "Next input wakeup time is now %" GST_TIME_FORMAT,
907           GST_TIME_ARGS (stream->next_input_wakeup_time));
908
909       /* If this stream needs waking up sooner than any other current one,
910        * update the period wakeup time, which is what the output loop
911        * will check */
912       GstAdaptiveDemuxPeriod *period = stream->period;
913       if (period->next_input_wakeup_time == GST_CLOCK_STIME_NONE ||
914           period->next_input_wakeup_time > stream->next_input_wakeup_time) {
915         period->next_input_wakeup_time = stream->next_input_wakeup_time;
916       }
917     }
918   }
919
920   if (update_buffering) {
921     demux_update_buffering_locked (demux);
922     demux_post_buffering_locked (demux);
923   }
924
925   TRACKS_UNLOCK (demux);
926
927   return need_to_wait;
928 }
929
930 static GstAdaptiveDemuxTrack *
931 match_parsebin_to_track (GstAdaptiveDemux2Stream * stream, GstPad * pad)
932 {
933   GList *tmp;
934   GstAdaptiveDemuxTrack *found_track = NULL, *first_matched_track = NULL;
935   gint num_possible_tracks = 0;
936   GstStream *gst_stream;
937   const gchar *internal_stream_id;
938   GstStreamType stream_type;
939
940   gst_stream = gst_pad_get_stream (pad);
941
942   /* FIXME: Edward: Added assertion because I don't see in what cases we would
943    * end up with a pad from parsebin which wouldn't have an associated
944    * GstStream. */
945   g_assert (gst_stream);
946
947   internal_stream_id = gst_stream_get_stream_id (gst_stream);
948   stream_type = gst_stream_get_stream_type (gst_stream);
949
950   GST_DEBUG_OBJECT (pad,
951       "Trying to match pad from parsebin with internal streamid %s and caps %"
952       GST_PTR_FORMAT, GST_STR_NULL (internal_stream_id),
953       gst_stream_get_caps (gst_stream));
954
955   /* Try to match directly by the track's pending upstream_stream_id */
956   for (tmp = stream->tracks; tmp; tmp = tmp->next) {
957     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
958
959     if (stream_type != GST_STREAM_TYPE_UNKNOWN && track->type != stream_type)
960       continue;
961
962     GST_DEBUG_OBJECT (pad, "track upstream_stream_id: %s",
963         track->upstream_stream_id);
964
965     if (first_matched_track == NULL)
966       first_matched_track = track;
967     num_possible_tracks++;
968
969     /* If this track has a desired upstream stream id, match on it */
970     if (track->upstream_stream_id == NULL ||
971         g_strcmp0 (track->upstream_stream_id, internal_stream_id)) {
972       /* This is not the track for this pad */
973       continue;
974     }
975
976     /* Remove pending upstream id (we have matched it for the pending
977      * stream_id) */
978     g_free (track->upstream_stream_id);
979     track->upstream_stream_id = NULL;
980     found_track = track;
981     break;
982   }
983
984   if (found_track == NULL) {
985     /* If we arrive here, it means the stream is switching pads after
986      * the stream has already started running */
987     /* No track is currently waiting for this particular stream id -
988      * try and match an existing linked track. If there's only 1 possible,
989      * take it. */
990     if (num_possible_tracks == 1 && first_matched_track != NULL) {
991       GST_LOG_OBJECT (pad, "Only one possible track to link to");
992       found_track = first_matched_track;
993     }
994   }
995
996   if (found_track == NULL) {
997     /* TODO: There are multiple possible tracks, need to match based
998      * on language code and caps. Have you found a stream like this? */
999     GST_FIXME_OBJECT (pad, "Need to match track based on caps and language");
1000   }
1001
1002   if (found_track != NULL) {
1003     if (!gst_pad_is_linked (found_track->sinkpad)) {
1004       GST_LOG_OBJECT (pad, "Linking to track pad %" GST_PTR_FORMAT,
1005           found_track->sinkpad);
1006
1007       if (gst_pad_link (pad, found_track->sinkpad) != GST_PAD_LINK_OK) {
1008         GST_ERROR_OBJECT (pad, "Couldn't connect to track sinkpad");
1009         /* FIXME : Do something if we can't link ? */
1010       }
1011     } else {
1012       /* Store pad as pending link */
1013       GST_LOG_OBJECT (pad,
1014           "Remembering pad to be linked when current pad is unlinked");
1015       g_assert (found_track->pending_srcpad == NULL);
1016       found_track->pending_srcpad = gst_object_ref (pad);
1017     }
1018   }
1019
1020   if (gst_stream)
1021     gst_object_unref (gst_stream);
1022
1023   return found_track;
1024 }
1025
1026 static void
1027 parsebin_pad_removed_cb (GstElement * parsebin, GstPad * pad,
1028     GstAdaptiveDemux2Stream * stream)
1029 {
1030   GList *iter;
1031   GST_DEBUG_OBJECT (stream, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1032
1033   /* Remove from pending source pad */
1034   TRACKS_LOCK (stream->demux);
1035   for (iter = stream->tracks; iter; iter = iter->next) {
1036     GstAdaptiveDemuxTrack *track = iter->data;
1037     if (track->pending_srcpad == pad) {
1038       gst_object_unref (track->pending_srcpad);
1039       track->pending_srcpad = NULL;
1040       break;
1041     }
1042   }
1043   TRACKS_UNLOCK (stream->demux);
1044 }
1045
1046 static void
1047 parsebin_pad_added_cb (GstElement * parsebin, GstPad * pad,
1048     GstAdaptiveDemux2Stream * stream)
1049 {
1050   if (!GST_PAD_IS_SRC (pad))
1051     return;
1052
1053   GST_DEBUG_OBJECT (stream, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1054
1055   if (!match_parsebin_to_track (stream, pad))
1056     GST_WARNING_OBJECT (pad, "Found no track to handle pad");
1057
1058   GST_DEBUG_OBJECT (stream->demux, "Done linking");
1059 }
1060
1061 static void
1062 parsebin_deep_element_added_cb (GstBin * parsebin, GstBin * unused,
1063     GstElement * element, GstAdaptiveDemux * demux)
1064 {
1065   if (G_OBJECT_TYPE (element) == tsdemux_type) {
1066     GST_DEBUG_OBJECT (demux, "Overriding tsdemux ignore-pcr to TRUE");
1067     g_object_set (element, "ignore-pcr", TRUE, NULL);
1068   }
1069 }
1070
1071 /* must be called with manifest_lock taken */
1072 static gboolean
1073 gst_adaptive_demux2_stream_create_parser (GstAdaptiveDemux2Stream * stream)
1074 {
1075   GstAdaptiveDemux *demux = stream->demux;
1076
1077   if (stream->parsebin == NULL) {
1078     GstEvent *event;
1079
1080     GST_DEBUG_OBJECT (demux, "Setting up new parsing source");
1081
1082     /* Workaround to detect if tsdemux is being used */
1083     if (tsdemux_type == 0) {
1084       GstElement *element = gst_element_factory_make ("tsdemux", NULL);
1085       if (element) {
1086         tsdemux_type = G_OBJECT_TYPE (element);
1087         gst_object_unref (element);
1088       }
1089     }
1090
1091     stream->parsebin = gst_element_factory_make ("parsebin", NULL);
1092     if (tsdemux_type)
1093       g_signal_connect (stream->parsebin, "deep-element-added",
1094           (GCallback) parsebin_deep_element_added_cb, demux);
1095     gst_bin_add (GST_BIN_CAST (demux), gst_object_ref (stream->parsebin));
1096     stream->parsebin_sink =
1097         gst_element_get_static_pad (stream->parsebin, "sink");
1098     stream->pad_added_id = g_signal_connect (stream->parsebin, "pad-added",
1099         G_CALLBACK (parsebin_pad_added_cb), stream);
1100     stream->pad_removed_id = g_signal_connect (stream->parsebin, "pad-removed",
1101         G_CALLBACK (parsebin_pad_removed_cb), stream);
1102
1103     event = gst_event_new_stream_start ("bogus");
1104     if (demux->have_group_id)
1105       gst_event_set_group_id (event, demux->group_id);
1106
1107     gst_pad_send_event (stream->parsebin_sink, event);
1108
1109     /* Not sure if these need to be outside the manifest lock: */
1110     gst_element_sync_state_with_parent (stream->parsebin);
1111     stream->last_status_code = 200;     /* default to OK */
1112   }
1113   return TRUE;
1114 }
1115
1116 static void
1117 on_download_cancellation (DownloadRequest * request, DownloadRequestState state,
1118     GstAdaptiveDemux2Stream * stream)
1119 {
1120 }
1121
1122 static void
1123 on_download_error (DownloadRequest * request, DownloadRequestState state,
1124     GstAdaptiveDemux2Stream * stream)
1125 {
1126   GstAdaptiveDemux *demux = stream->demux;
1127   guint last_status_code = request->status_code;
1128   gboolean live;
1129
1130   stream->download_active = FALSE;
1131   stream->last_status_code = last_status_code;
1132
1133   GST_DEBUG_OBJECT (stream,
1134       "Download finished with error, request state %d http status %u, dc %d",
1135       request->state, last_status_code, stream->download_error_count);
1136
1137   live = gst_adaptive_demux_is_live (demux);
1138   if (((last_status_code / 100 == 4 && live)
1139           || last_status_code / 100 == 5)) {
1140     /* 4xx/5xx */
1141     /* if current position is before available start, switch to next */
1142     if (!gst_adaptive_demux2_stream_has_next_fragment (demux, stream))
1143       goto flushing;
1144
1145     if (live) {
1146       gint64 range_start, range_stop;
1147
1148       if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
1149               &range_stop))
1150         goto flushing;
1151
1152       if (demux->segment.position < range_start) {
1153         GstFlowReturn ret;
1154
1155         GST_DEBUG_OBJECT (stream, "Retrying once with next segment");
1156         gst_adaptive_demux2_stream_finish_download (stream, GST_FLOW_EOS, NULL);
1157
1158         GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1159
1160         ret = gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
1161         GST_DEBUG_OBJECT (stream, "update_fragment_info ret: %s",
1162             gst_flow_get_name (ret));
1163
1164         if (ret == GST_FLOW_OK)
1165           goto again;
1166
1167       } else if (demux->segment.position > range_stop) {
1168         /* wait a bit to be in range, we don't have any locks at that point */
1169         GstClockTime wait_time =
1170             gst_adaptive_demux2_stream_get_fragment_waiting_time (demux,
1171             stream);
1172         if (wait_time > 0) {
1173           GST_DEBUG_OBJECT (stream,
1174               "Download waiting for %" GST_TIME_FORMAT,
1175               GST_TIME_ARGS (wait_time));
1176           g_assert (stream->pending_cb_id == 0);
1177           GST_LOG_OBJECT (stream, "Scheduling delayed load_a_fragment() call");
1178           stream->pending_cb_id =
1179               gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1180               wait_time,
1181               (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1182               gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1183         }
1184       }
1185     }
1186
1187   flushing:
1188     if (stream->download_error_count >= MAX_DOWNLOAD_ERROR_COUNT) {
1189       /* looks like there is no way of knowing when a live stream has ended
1190        * Have to assume we are falling behind and cause a manifest reload */
1191       GST_DEBUG_OBJECT (stream, "Converting error of live stream to EOS");
1192       gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1193       return;
1194     }
1195   } else if (!gst_adaptive_demux2_stream_has_next_fragment (demux, stream)) {
1196     /* If this is the last fragment, consider failures EOS and not actual
1197      * errors. Due to rounding errors in the durations, the last fragment
1198      * might not actually exist */
1199     GST_DEBUG_OBJECT (stream, "Converting error for last fragment to EOS");
1200     gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1201     return;
1202   } else {
1203     /* retry same segment */
1204     if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
1205       gst_adaptive_demux2_stream_error (stream);
1206       return;
1207     }
1208     goto again;
1209   }
1210
1211 again:
1212   /* wait a short time in case the server needs a bit to recover */
1213   GST_LOG_OBJECT (stream,
1214       "Scheduling delayed load_a_fragment() call to retry in 10 milliseconds");
1215   g_assert (stream->pending_cb_id == 0);
1216   stream->pending_cb_id = gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task, 10 * GST_MSECOND,  /* Retry in 10 ms */
1217       (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1218       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1219 }
1220
1221 static void
1222 update_stream_bitrate (GstAdaptiveDemux2Stream * stream,
1223     DownloadRequest * request)
1224 {
1225   GstClockTimeDiff last_download_duration;
1226   guint64 fragment_bytes_downloaded = request->content_received;
1227
1228   /* The stream last_download time tracks the full download time for now */
1229   stream->last_download_time =
1230       GST_CLOCK_DIFF (request->download_request_time,
1231       request->download_end_time);
1232
1233   /* Here we only track the time the data took to arrive and ignore request delay, so we can estimate bitrate */
1234   last_download_duration =
1235       GST_CLOCK_DIFF (request->download_start_time, request->download_end_time);
1236
1237   /* If the entire response arrived in the first buffer
1238    * though, include the request time to get a valid
1239    * bitrate estimate */
1240   if (last_download_duration < 2 * stream->last_download_time)
1241     last_download_duration = stream->last_download_time;
1242
1243   if (last_download_duration > 0) {
1244     stream->last_bitrate =
1245         gst_util_uint64_scale (fragment_bytes_downloaded,
1246         8 * GST_SECOND, last_download_duration);
1247
1248     GST_DEBUG_OBJECT (stream,
1249         "Updated stream bitrate. %" G_GUINT64_FORMAT
1250         " bytes. download time %" GST_TIME_FORMAT " bitrate %"
1251         G_GUINT64_FORMAT " bps", fragment_bytes_downloaded,
1252         GST_TIME_ARGS (last_download_duration), stream->last_bitrate);
1253   }
1254 }
1255
1256 static void
1257 on_download_progress (DownloadRequest * request, DownloadRequestState state,
1258     GstAdaptiveDemux2Stream * stream)
1259 {
1260   GstAdaptiveDemux *demux = stream->demux;
1261   GstBuffer *buffer = download_request_take_buffer (request);
1262
1263   if (buffer) {
1264     GstFlowReturn ret;
1265
1266     GST_DEBUG_OBJECT (stream,
1267         "Handling buffer of %" G_GSIZE_FORMAT
1268         " bytes of ongoing download progress - %" G_GUINT64_FORMAT " / %"
1269         G_GUINT64_FORMAT " bytes", gst_buffer_get_size (buffer),
1270         request->content_received, request->content_length);
1271
1272     /* Drop the request lock when parsing data. FIXME: Check and comment why this is needed */
1273     download_request_unlock (request);
1274     ret = gst_adaptive_demux2_stream_parse_buffer (stream, buffer);
1275     download_request_lock (request);
1276
1277     if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
1278       return;
1279
1280     if (ret != GST_FLOW_OK) {
1281       GST_DEBUG_OBJECT (stream,
1282           "Buffer parsing returned: %d %s. Aborting download", ret,
1283           gst_flow_get_name (ret));
1284
1285       if (!stream->downloading_header && !stream->downloading_index)
1286         update_stream_bitrate (stream, request);
1287
1288       downloadhelper_cancel_request (demux->download_helper, request);
1289
1290       /* cancellation is async, so recycle our download request to avoid races */
1291       download_request_unref (stream->download_request);
1292       stream->download_request = download_request_new ();
1293
1294       gst_adaptive_demux2_stream_finish_download (stream, ret, NULL);
1295     }
1296   }
1297 }
1298
1299 static void
1300 on_download_complete (DownloadRequest * request, DownloadRequestState state,
1301     GstAdaptiveDemux2Stream * stream)
1302 {
1303   GstFlowReturn ret = GST_FLOW_OK;
1304   GstBuffer *buffer;
1305
1306   stream->download_active = FALSE;
1307
1308   if (G_UNLIKELY (stream->cancelled))
1309     return;
1310
1311   GST_DEBUG_OBJECT (stream,
1312       "Stream %p %s download for %s is complete with state %d",
1313       stream, uritype (stream), request->uri, request->state);
1314
1315   /* Update bitrate for fragment downloads */
1316   if (!stream->downloading_header && !stream->downloading_index)
1317     update_stream_bitrate (stream, request);
1318
1319   buffer = download_request_take_buffer (request);
1320   if (buffer)
1321     ret = gst_adaptive_demux2_stream_parse_buffer (stream, buffer);
1322
1323   GST_DEBUG_OBJECT (stream,
1324       "%s download finished: %s ret %d %s. Stream state %d", uritype (stream),
1325       request->uri, ret, gst_flow_get_name (ret), stream->state);
1326
1327   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
1328     return;
1329
1330   g_assert (stream->pending_cb_id == 0);
1331   gst_adaptive_demux2_stream_finish_download (stream, ret, NULL);
1332 }
1333
1334 /* must be called from the scheduler context
1335  *
1336  * Will submit the request only, which will complete asynchronously
1337  */
1338 static GstFlowReturn
1339 gst_adaptive_demux2_stream_begin_download_uri (GstAdaptiveDemux * demux,
1340     GstAdaptiveDemux2Stream * stream, const gchar * uri, gint64 start,
1341     gint64 end)
1342 {
1343   DownloadRequest *request = stream->download_request;
1344
1345   GST_DEBUG_OBJECT (demux,
1346       "Downloading %s uri: %s, range:%" G_GINT64_FORMAT " - %" G_GINT64_FORMAT,
1347       uritype (stream), uri, start, end);
1348
1349   if (!gst_adaptive_demux2_stream_create_parser (stream))
1350     return GST_FLOW_ERROR;
1351
1352   /* Configure our download request */
1353   download_request_set_uri (request, uri, start, end);
1354
1355   if (stream->downloading_header || stream->downloading_index) {
1356     download_request_set_callbacks (request,
1357         (DownloadRequestEventCallback) on_download_complete,
1358         (DownloadRequestEventCallback) on_download_error,
1359         (DownloadRequestEventCallback) on_download_cancellation,
1360         (DownloadRequestEventCallback) NULL, stream);
1361   } else {
1362     download_request_set_callbacks (request,
1363         (DownloadRequestEventCallback) on_download_complete,
1364         (DownloadRequestEventCallback) on_download_error,
1365         (DownloadRequestEventCallback) on_download_cancellation,
1366         (DownloadRequestEventCallback) on_download_progress, stream);
1367   }
1368
1369   if (!downloadhelper_submit_request (demux->download_helper,
1370           demux->manifest_uri, DOWNLOAD_FLAG_NONE, request, NULL))
1371     return GST_FLOW_ERROR;
1372
1373   stream->download_active = TRUE;
1374
1375   return GST_FLOW_OK;
1376 }
1377
1378 /* must be called from the scheduler context */
1379 static GstFlowReturn
1380 gst_adaptive_demux2_stream_download_fragment (GstAdaptiveDemux2Stream * stream)
1381 {
1382   GstAdaptiveDemux *demux = stream->demux;
1383   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1384   gchar *url = NULL;
1385
1386   /* FIXME :  */
1387   /* THERE ARE THREE DIFFERENT VARIABLES FOR THE "BEGINNING" OF A FRAGMENT ! */
1388   if (stream->starting_fragment) {
1389     GST_DEBUG_OBJECT (stream, "Downloading %s%s%s",
1390         stream->fragment.uri ? "FRAGMENT " : "",
1391         stream->need_header && stream->fragment.header_uri ? "HEADER " : "",
1392         stream->need_index && stream->fragment.index_uri ? "INDEX" : "");
1393
1394     if (stream->fragment.uri == NULL && stream->fragment.header_uri == NULL &&
1395         stream->fragment.index_uri == NULL)
1396       goto no_url_error;
1397
1398     stream->first_fragment_buffer = TRUE;
1399     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING;
1400   }
1401
1402   if (stream->need_header && stream->fragment.header_uri != NULL) {
1403
1404     /* Set the need_index flag when we start the header if we'll also need the index */
1405     stream->need_index = (stream->fragment.index_uri != NULL);
1406
1407     GST_DEBUG_OBJECT (stream, "Fetching header %s %" G_GINT64_FORMAT "-%"
1408         G_GINT64_FORMAT, stream->fragment.header_uri,
1409         stream->fragment.header_range_start, stream->fragment.header_range_end);
1410
1411     stream->downloading_header = TRUE;
1412
1413     return gst_adaptive_demux2_stream_begin_download_uri (demux, stream,
1414         stream->fragment.header_uri, stream->fragment.header_range_start,
1415         stream->fragment.header_range_end);
1416   }
1417
1418   /* check if we have an index */
1419   if (stream->need_index && stream->fragment.index_uri != NULL) {
1420     GST_DEBUG_OBJECT (stream,
1421         "Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
1422         stream->fragment.index_uri,
1423         stream->fragment.index_range_start, stream->fragment.index_range_end);
1424
1425     stream->downloading_index = TRUE;
1426
1427     return gst_adaptive_demux2_stream_begin_download_uri (demux, stream,
1428         stream->fragment.index_uri, stream->fragment.index_range_start,
1429         stream->fragment.index_range_end);
1430   }
1431
1432   url = stream->fragment.uri;
1433   GST_DEBUG_OBJECT (stream, "Got url '%s' for stream %p", url, stream);
1434   if (!url)
1435     return GST_FLOW_OK;
1436
1437   /* Download the actual fragment, either in chunks or in one go */
1438   stream->first_fragment_buffer = TRUE;
1439
1440   if (klass->need_another_chunk && klass->need_another_chunk (stream)
1441       && stream->fragment.chunk_size != 0) {
1442     /* Handle chunk downloading */
1443     gint64 range_start = stream->fragment.range_start;
1444     gint64 range_end = stream->fragment.range_end;
1445     gint chunk_size = stream->fragment.chunk_size;
1446     gint64 chunk_end;
1447
1448     /* HTTP ranges are inclusive for the end */
1449     if (chunk_size != -1) {
1450       chunk_end = range_start + chunk_size - 1;
1451       if (range_end != -1 && range_end < chunk_end)
1452         chunk_end = range_end;
1453     } else {
1454       chunk_end = range_end;
1455     }
1456
1457     GST_DEBUG_OBJECT (stream,
1458         "Starting chunked download %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
1459         url, range_start, chunk_end);
1460     return gst_adaptive_demux2_stream_begin_download_uri (demux, stream, url,
1461         range_start, chunk_end);
1462   }
1463
1464   /* regular single chunk download */
1465   stream->fragment.chunk_size = 0;
1466
1467   return gst_adaptive_demux2_stream_begin_download_uri (demux, stream, url,
1468       stream->fragment.range_start, stream->fragment.range_end);
1469
1470 no_url_error:
1471   {
1472     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1473         (_("Failed to get fragment URL.")),
1474         ("An error happened when getting fragment URL"));
1475     return GST_FLOW_ERROR;
1476   }
1477 }
1478
1479 static gboolean
1480 gst_adaptive_demux2_stream_push_event (GstAdaptiveDemux2Stream * stream,
1481     GstEvent * event)
1482 {
1483   gboolean ret = TRUE;
1484   GstPad *pad;
1485
1486   /* If there's a parsebin, push the event through it */
1487   if (stream->parsebin_sink != NULL) {
1488     pad = gst_object_ref (stream->parsebin_sink);
1489     GST_DEBUG_OBJECT (pad, "Pushing event %" GST_PTR_FORMAT, event);
1490     ret = gst_pad_send_event (pad, gst_event_ref (event));
1491     gst_object_unref (pad);
1492   }
1493
1494   /* If the event is EOS, ensure that all tracks are EOS. This catches
1495    * the case where the parsebin hasn't parsed anything yet (we switched
1496    * to a never before used track right near EOS, or it didn't parse enough
1497    * to create pads and be able to send EOS through to the tracks.
1498    *
1499    * We don't need to care about any other events
1500    */
1501   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
1502     GList *iter;
1503
1504     for (iter = stream->tracks; iter; iter = iter->next) {
1505       GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
1506       ret &= gst_pad_send_event (track->sinkpad, gst_event_ref (event));
1507     }
1508   }
1509
1510   gst_event_unref (event);
1511   return ret;
1512 }
1513
1514 static void
1515 gst_adaptive_demux2_stream_error (GstAdaptiveDemux2Stream * stream)
1516 {
1517   GstAdaptiveDemux *demux = stream->demux;
1518   GstMessage *msg;
1519   GstStructure *details;
1520
1521   details = gst_structure_new_empty ("details");
1522   gst_structure_set (details, "http-status-code", G_TYPE_UINT,
1523       stream->last_status_code, NULL);
1524
1525   stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_ERRORED;
1526
1527   if (stream->last_error) {
1528     gchar *debug = g_strdup_printf ("Error on stream %s",
1529         GST_OBJECT_NAME (stream));
1530     msg =
1531         gst_message_new_error_with_details (GST_OBJECT_CAST (demux),
1532         stream->last_error, debug, details);
1533     GST_ERROR_OBJECT (stream, "Download error: %s",
1534         stream->last_error->message);
1535     g_free (debug);
1536   } else {
1537     GError *err = g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_NOT_FOUND,
1538         _("Couldn't download fragments"));
1539     msg =
1540         gst_message_new_error_with_details (GST_OBJECT_CAST (demux), err,
1541         "Fragment downloading has failed consecutive times", details);
1542     g_error_free (err);
1543     GST_ERROR_OBJECT (stream,
1544         "Download error: Couldn't download fragments, too many failures");
1545   }
1546
1547   gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
1548 }
1549
1550 /* Called when a stream reaches the end of a playback segment */
1551 static void
1552 gst_adaptive_demux2_stream_end_of_manifest (GstAdaptiveDemux2Stream * stream)
1553 {
1554   GstAdaptiveDemux *demux = stream->demux;
1555   GstFlowReturn combined =
1556       gst_adaptive_demux_period_combine_stream_flows (demux->input_period);
1557
1558   GST_DEBUG_OBJECT (stream, "Combined flow %s", gst_flow_get_name (combined));
1559
1560   if (gst_adaptive_demux_has_next_period (demux)) {
1561     if (combined == GST_FLOW_EOS) {
1562       GST_DEBUG_OBJECT (stream, "Next period available, advancing");
1563       gst_adaptive_demux_advance_period (demux);
1564     } else {
1565       /* Ensure the 'has_next_period' flag is set on the period before
1566        * pushing EOS to the stream, so that the output loop knows not
1567        * to actually output the event */
1568       GST_DEBUG_OBJECT (stream, "Marking current period has a next one");
1569       demux->input_period->has_next_period = TRUE;
1570     }
1571   }
1572
1573   if (demux->priv->outputs) {
1574     GstEvent *eos = gst_event_new_eos ();
1575
1576     GST_DEBUG_OBJECT (stream, "Stream is EOS. Stopping.");
1577     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS;
1578
1579     gst_event_set_seqnum (eos, stream->demux->priv->segment_seqnum);
1580     gst_adaptive_demux2_stream_push_event (stream, eos);
1581   } else {
1582     GST_ERROR_OBJECT (demux, "Can't push EOS on non-exposed pad");
1583     gst_adaptive_demux2_stream_error (stream);
1584   }
1585 }
1586
1587 static gboolean
1588 gst_adaptive_demux2_stream_reload_manifest_cb (GstAdaptiveDemux2Stream * stream)
1589 {
1590   GstAdaptiveDemux *demux = stream->demux;
1591
1592   gboolean is_live = gst_adaptive_demux_is_live (demux);
1593
1594   stream->pending_cb_id = 0;
1595
1596   /* Refetch the playlist now after we waited */
1597   /* FIXME: Make this manifest update async and handle it on completion */
1598   if (!is_live && gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
1599     GST_DEBUG_OBJECT (demux, "Updated the playlist");
1600   }
1601
1602   /* We were called here from a timeout, so if the load function wants to loop
1603    * again, schedule an immediate callback but return G_SOURCE_REMOVE either
1604    * way */
1605   while (gst_adaptive_demux2_stream_next_download (stream));
1606
1607   return G_SOURCE_REMOVE;
1608 }
1609
1610 static gboolean
1611 gst_adaptive_demux2_stream_on_output_space_available_cb (GstAdaptiveDemux2Stream
1612     * stream)
1613 {
1614   /* If the state already moved on, the stream was stopped, or another track
1615    * already woke up and needed data */
1616   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE)
1617     return G_SOURCE_REMOVE;
1618
1619   GstAdaptiveDemux *demux = stream->demux;
1620   TRACKS_LOCK (demux);
1621
1622   GList *iter;
1623   for (iter = stream->tracks; iter; iter = iter->next) {
1624     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
1625
1626     /* We need to recompute the track's level_time value, as the
1627      * global output position may have advanced and reduced the
1628      * value, even without anything being dequeued yet */
1629     gst_adaptive_demux_track_update_level_locked (track);
1630
1631     GST_DEBUG_OBJECT (stream, "track %s woken level %" GST_TIME_FORMAT
1632         " input position %" GST_TIME_FORMAT " at %" GST_TIME_FORMAT,
1633         track->stream_id, GST_TIME_ARGS (track->level_time),
1634         GST_TIME_ARGS (track->input_time),
1635         GST_TIME_ARGS (demux->priv->global_output_position));
1636   }
1637   TRACKS_UNLOCK (demux);
1638
1639   while (gst_adaptive_demux2_stream_load_a_fragment (stream));
1640
1641   return G_SOURCE_REMOVE;
1642 }
1643
1644 void
1645 gst_adaptive_demux2_stream_on_output_space_available (GstAdaptiveDemux2Stream *
1646     stream)
1647 {
1648   GstAdaptiveDemux *demux = stream->demux;
1649
1650   stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
1651
1652   GST_LOG_OBJECT (stream, "Scheduling output_space_available() call");
1653
1654   gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
1655       (GSourceFunc) gst_adaptive_demux2_stream_on_output_space_available_cb,
1656       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1657 }
1658
1659 void
1660 gst_adaptive_demux2_stream_on_manifest_update (GstAdaptiveDemux2Stream * stream)
1661 {
1662   GstAdaptiveDemux *demux = stream->demux;
1663
1664   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE)
1665     return;
1666
1667   g_assert (stream->pending_cb_id == 0);
1668
1669   GST_LOG_OBJECT (stream, "Scheduling load_a_fragment() call");
1670   stream->pending_cb_id =
1671       gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
1672       (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1673       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1674 }
1675
1676 static void
1677 gst_adaptive_demux2_stream_handle_playlist_eos (GstAdaptiveDemux2Stream *
1678     stream)
1679 {
1680   GstAdaptiveDemux *demux = stream->demux;
1681
1682   if (gst_adaptive_demux_is_live (demux) && (demux->segment.rate == 1.0
1683           || gst_adaptive_demux2_stream_in_live_seek_range (demux, stream))) {
1684
1685     if (!gst_adaptive_demux_has_next_period (demux)) {
1686       /* Wait only if we can ensure current manifest has been expired.
1687        * The meaning "we have next period" *WITH* EOS is that, current
1688        * period has been ended but we can continue to the next period */
1689       GST_DEBUG_OBJECT (stream,
1690           "Live playlist EOS - waiting for manifest update");
1691       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE;
1692       /* Clear the stream last_ret EOS state, since we're not actually EOS */
1693       if (stream->last_ret == GST_FLOW_EOS)
1694         stream->last_ret = GST_FLOW_OK;
1695       gst_adaptive_demux2_stream_wants_manifest_update (demux);
1696       return;
1697     }
1698
1699     if (stream->replaced)
1700       return;
1701   }
1702
1703   gst_adaptive_demux2_stream_end_of_manifest (stream);
1704 }
1705
1706 static gboolean
1707 gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream)
1708 {
1709   GstAdaptiveDemux *demux = stream->demux;
1710   gboolean live = gst_adaptive_demux_is_live (demux);
1711   GstFlowReturn ret = GST_FLOW_OK;
1712
1713   stream->pending_cb_id = 0;
1714
1715   GST_LOG_OBJECT (stream, "entering, state = %d.", stream->state);
1716
1717   switch (stream->state) {
1718     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART:
1719     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT:
1720     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_LIVE:
1721     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE:
1722     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE:
1723       /* Get information about the fragment to download */
1724       GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1725       ret = gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
1726       GST_DEBUG_OBJECT (stream,
1727           "Fragment info update result: %d %s", ret, gst_flow_get_name (ret));
1728
1729       if (ret == GST_FLOW_OK)
1730         stream->starting_fragment = TRUE;
1731       break;
1732     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING:
1733       break;
1734     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS:
1735       GST_ERROR_OBJECT (stream,
1736           "Unexpected stream state EOS. The stream should not be running now.");
1737       return FALSE;
1738     default:
1739       GST_ERROR_OBJECT (stream, "Unexpected stream state %d", stream->state);
1740       g_assert_not_reached ();
1741       break;
1742   }
1743
1744   if (ret == GST_FLOW_OK) {
1745     /* Wait for room in the output tracks */
1746     if (gst_adaptive_demux2_stream_wait_for_output_space (demux, stream,
1747             stream->fragment.duration)) {
1748       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE;
1749       return FALSE;
1750     }
1751   }
1752
1753   if (ret == GST_FLOW_OK) {
1754     /* wait for live fragments to be available */
1755     if (live) {
1756       GstClockTime wait_time =
1757           gst_adaptive_demux2_stream_get_fragment_waiting_time (demux, stream);
1758       if (wait_time > 0) {
1759         GST_DEBUG_OBJECT (stream,
1760             "Download waiting for %" GST_TIME_FORMAT,
1761             GST_TIME_ARGS (wait_time));
1762
1763         stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_LIVE;
1764
1765         GST_LOG_OBJECT (stream, "Scheduling delayed load_a_fragment() call");
1766         g_assert (stream->pending_cb_id == 0);
1767         stream->pending_cb_id =
1768             gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1769             wait_time, (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1770             gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1771         return FALSE;
1772       }
1773     }
1774
1775     if (gst_adaptive_demux2_stream_download_fragment (stream) != GST_FLOW_OK) {
1776       GST_ERROR_OBJECT (demux,
1777           "Failed to begin fragment download for stream %p", stream);
1778       return FALSE;
1779     }
1780   }
1781
1782   /* Cast to int avoids a compiler warning that
1783    * GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC is not in the GstFlowReturn enum */
1784   switch ((int) ret) {
1785     case GST_FLOW_OK:
1786       break;                    /* all is good, let's go */
1787     case GST_FLOW_EOS:
1788       GST_DEBUG_OBJECT (stream, "EOS, checking to stop download loop");
1789       stream->last_ret = ret;
1790       gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1791       return FALSE;
1792     case GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC:
1793       GST_DEBUG_OBJECT (stream, "Lost sync, asking reset to current position");
1794       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
1795       gst_adaptive_demux_handle_lost_sync (demux);
1796       return FALSE;
1797     case GST_FLOW_NOT_LINKED:
1798     {
1799       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS;
1800
1801       if (gst_adaptive_demux_period_combine_stream_flows (demux->input_period)
1802           == GST_FLOW_NOT_LINKED) {
1803         GST_ELEMENT_FLOW_ERROR (demux, ret);
1804       }
1805     }
1806       break;
1807
1808     case GST_FLOW_FLUSHING:
1809       /* Flushing is normal, the target track might have been unselected */
1810       if (G_UNLIKELY (stream->cancelled))
1811         return FALSE;
1812
1813     default:
1814       if (ret <= GST_FLOW_ERROR) {
1815         GST_WARNING_OBJECT (demux, "Error while downloading fragment");
1816         if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
1817           gst_adaptive_demux2_stream_error (stream);
1818           return FALSE;
1819         }
1820
1821         g_clear_error (&stream->last_error);
1822
1823         /* First try to update the playlist for non-live playlists
1824          * in case the URIs have changed in the meantime. But only
1825          * try it the first time, after that we're going to wait a
1826          * a bit to not flood the server */
1827         if (stream->download_error_count == 1
1828             && !gst_adaptive_demux_is_live (demux)) {
1829           /* TODO hlsdemux had more options to this function (boolean and err) */
1830           if (gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
1831             /* Retry immediately, the playlist actually has changed */
1832             GST_DEBUG_OBJECT (demux, "Updated the playlist");
1833             return TRUE;
1834           }
1835         }
1836
1837         /* Wait half the fragment duration before retrying */
1838         GST_LOG_OBJECT (stream, "Scheduling delayed reload_manifest_cb() call");
1839         g_assert (stream->pending_cb_id == 0);
1840         stream->pending_cb_id =
1841             gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1842             stream->fragment.duration / 2,
1843             (GSourceFunc) gst_adaptive_demux2_stream_reload_manifest_cb,
1844             gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1845         return FALSE;
1846       }
1847       break;
1848   }
1849
1850   return FALSE;
1851 }
1852
1853 static gboolean
1854 gst_adaptive_demux2_stream_next_download (GstAdaptiveDemux2Stream * stream)
1855 {
1856   GstAdaptiveDemux *demux = stream->demux;
1857   gboolean end_of_manifest = FALSE;
1858
1859   GST_LOG_OBJECT (stream, "Looking for next download");
1860
1861   /* Restarting download, figure out new position
1862    * FIXME : Move this to a separate function ? */
1863   if (G_UNLIKELY (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART)) {
1864     GstClockTimeDiff stream_time = 0;
1865
1866     GST_DEBUG_OBJECT (stream, "Activating stream after restart");
1867
1868     if (stream->parsebin_sink != NULL) {
1869       /* If the parsebin already exists, we need to clear it out (if it doesn't,
1870        * this is the first time we've used this stream, so it's all good) */
1871       gst_adaptive_demux2_stream_push_event (stream,
1872           gst_event_new_flush_start ());
1873       gst_adaptive_demux2_stream_push_event (stream,
1874           gst_event_new_flush_stop (FALSE));
1875     }
1876
1877     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1878     stream_time = stream->start_position;
1879
1880     GST_DEBUG_OBJECT (stream, "Restarting stream at "
1881         "stream position %" GST_STIME_FORMAT, GST_STIME_ARGS (stream_time));
1882
1883     if (GST_CLOCK_STIME_IS_VALID (stream_time)) {
1884       /* TODO check return */
1885       gst_adaptive_demux2_stream_seek (demux, stream, demux->segment.rate >= 0,
1886           0, stream_time, &stream_time);
1887       stream->current_position = stream->start_position;
1888
1889       GST_DEBUG_OBJECT (stream,
1890           "stream_time after restart seek: %" GST_STIME_FORMAT
1891           " position %" GST_STIME_FORMAT, GST_STIME_ARGS (stream_time),
1892           GST_STIME_ARGS (stream->current_position));
1893     }
1894
1895     /* Trigger (re)computation of the parsebin input segment */
1896     stream->compute_segment = TRUE;
1897
1898     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1899
1900     stream->discont = TRUE;
1901     stream->need_header = TRUE;
1902     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
1903   }
1904
1905   /* Check if we're done with our segment */
1906   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1907   if (demux->segment.rate > 0) {
1908     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop)
1909         && stream->current_position >= demux->segment.stop) {
1910       end_of_manifest = TRUE;
1911     }
1912   } else {
1913     if (GST_CLOCK_TIME_IS_VALID (demux->segment.start)
1914         && stream->current_position <= demux->segment.start) {
1915       end_of_manifest = TRUE;
1916     }
1917   }
1918   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1919
1920   if (end_of_manifest) {
1921     gst_adaptive_demux2_stream_end_of_manifest (stream);
1922     return FALSE;
1923   }
1924   return gst_adaptive_demux2_stream_load_a_fragment (stream);
1925 }
1926
1927 static gboolean
1928 gst_adaptive_demux2_stream_can_start (GstAdaptiveDemux2Stream * stream)
1929 {
1930   GstAdaptiveDemux *demux = stream->demux;
1931   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1932
1933   if (!klass->stream_can_start)
1934     return TRUE;
1935   return klass->stream_can_start (demux, stream);
1936 }
1937
1938 /**
1939  * gst_adaptive_demux2_stream_start:
1940  * @stream: a #GstAdaptiveDemux2Stream
1941  *
1942  * Start the given @stream. Should be called by subclasses that previously
1943  * returned %FALSE in `GstAdaptiveDemux::stream_can_start()`
1944  */
1945 void
1946 gst_adaptive_demux2_stream_start (GstAdaptiveDemux2Stream * stream)
1947 {
1948   GstAdaptiveDemux *demux;
1949
1950   g_return_if_fail (stream && stream->demux);
1951
1952   demux = stream->demux;
1953
1954   if (stream->pending_cb_id != 0 || stream->download_active) {
1955     /* There is already something active / pending on this stream */
1956     GST_LOG_OBJECT (stream, "Stream already running");
1957     return;
1958   }
1959
1960   /* Some streams require a delayed start, i.e. they need more information
1961    * before they can actually be started */
1962   if (!gst_adaptive_demux2_stream_can_start (stream)) {
1963     GST_LOG_OBJECT (stream, "Stream will be started asynchronously");
1964     return;
1965   }
1966
1967   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS) {
1968     GST_LOG_OBJECT (stream, "Stream is EOS already");
1969     return;
1970   }
1971
1972   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED ||
1973       stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART) {
1974     GST_LOG_OBJECT (stream, "Activating stream. Current state %d",
1975         stream->state);
1976     stream->cancelled = FALSE;
1977     stream->replaced = FALSE;
1978     stream->last_ret = GST_FLOW_OK;
1979
1980     if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)
1981       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
1982   }
1983
1984   GST_LOG_OBJECT (stream, "Scheduling next_download() call");
1985   stream->pending_cb_id =
1986       gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
1987       (GSourceFunc) gst_adaptive_demux2_stream_next_download,
1988       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1989 }
1990
1991 void
1992 gst_adaptive_demux2_stream_stop (GstAdaptiveDemux2Stream * stream)
1993 {
1994   GstAdaptiveDemux *demux = stream->demux;
1995
1996   GST_DEBUG_OBJECT (stream, "Stopping stream (from state %d)", stream->state);
1997   stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
1998
1999   if (stream->pending_cb_id != 0) {
2000     gst_adaptive_demux_loop_cancel_call (demux->priv->scheduler_task,
2001         stream->pending_cb_id);
2002     stream->pending_cb_id = 0;
2003   }
2004
2005   /* Cancel and drop the existing download request */
2006   downloadhelper_cancel_request (demux->download_helper,
2007       stream->download_request);
2008   download_request_unref (stream->download_request);
2009   stream->downloading_header = stream->downloading_index = FALSE;
2010   stream->download_request = download_request_new ();
2011   stream->download_active = FALSE;
2012
2013   stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
2014 }
2015
2016 gboolean
2017 gst_adaptive_demux2_stream_is_running (GstAdaptiveDemux2Stream * stream)
2018 {
2019   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)
2020     return FALSE;
2021   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART)
2022     return FALSE;
2023   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS)
2024     return FALSE;
2025   return TRUE;
2026 }
2027
2028 gboolean
2029 gst_adaptive_demux2_stream_is_selected_locked (GstAdaptiveDemux2Stream * stream)
2030 {
2031   GList *tmp;
2032
2033   for (tmp = stream->tracks; tmp; tmp = tmp->next) {
2034     GstAdaptiveDemuxTrack *track = tmp->data;
2035     if (track->selected)
2036       return TRUE;
2037   }
2038
2039   return FALSE;
2040 }
2041
2042 /**
2043  * gst_adaptive_demux2_stream_is_selected:
2044  * @stream: A #GstAdaptiveDemux2Stream
2045  *
2046  * Returns: %TRUE if any of the tracks targetted by @stream is selected
2047  */
2048 gboolean
2049 gst_adaptive_demux2_stream_is_selected (GstAdaptiveDemux2Stream * stream)
2050 {
2051   gboolean ret;
2052
2053   g_return_val_if_fail (stream && stream->demux, FALSE);
2054
2055   TRACKS_LOCK (stream->demux);
2056   ret = gst_adaptive_demux2_stream_is_selected_locked (stream);
2057   TRACKS_UNLOCK (stream->demux);
2058
2059   return ret;
2060 }