Replace gst-i18n-*.h with gi18n-lib.h
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / ext / adaptivedemux2 / gstadaptivedemux-stream.c
1 /* GStreamer
2  *
3  * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4  *   Author: Thiago Santos <thiagoss@osg.samsung.com>
5  *
6  * Copyright (C) 2021-2022 Centricular Ltd
7  *   Author: Edward Hervey <edward@centricular.com>
8  *   Author: Jan Schmidt <jan@centricular.com>
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29
30 #include "gstadaptivedemux.h"
31 #include "gstadaptivedemux-private.h"
32
33 #include <glib/gi18n-lib.h>
34 #include <gst/app/gstappsrc.h>
35
36 GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
37 #define GST_CAT_DEFAULT adaptivedemux2_debug
38
39 static void gst_adaptive_demux2_stream_finalize (GObject * object);
40 static void gst_adaptive_demux2_stream_error (GstAdaptiveDemux2Stream * stream);
41
42 #define gst_adaptive_demux2_stream_parent_class parent_class
43 G_DEFINE_ABSTRACT_TYPE (GstAdaptiveDemux2Stream, gst_adaptive_demux2_stream,
44     GST_TYPE_OBJECT);
45
46 static void
47 gst_adaptive_demux2_stream_class_init (GstAdaptiveDemux2StreamClass * klass)
48 {
49   GObjectClass *gobject_class = (GObjectClass *) klass;
50
51   gobject_class->finalize = gst_adaptive_demux2_stream_finalize;
52 }
53
54 static GType tsdemux_type = 0;
55
56 static void
57 gst_adaptive_demux2_stream_init (GstAdaptiveDemux2Stream * stream)
58 {
59   stream->download_request = download_request_new ();
60   stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
61   stream->last_ret = GST_FLOW_OK;
62
63   stream->fragment_bitrates =
64       g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS);
65
66   gst_segment_init (&stream->parse_segment, GST_FORMAT_TIME);
67 }
68
69 /* must be called with manifest_lock taken.
70  * It will temporarily drop the manifest_lock in order to join the task.
71  * It will join only the old_streams (the demux->streams are joined by
72  * gst_adaptive_demux_stop_tasks before gst_adaptive_demux2_stream_free is
73  * called)
74  */
75 static void
76 gst_adaptive_demux2_stream_finalize (GObject * object)
77 {
78   GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) object;
79
80   GST_LOG_OBJECT (object, "Finalizing");
81
82   if (stream->download_request)
83     download_request_unref (stream->download_request);
84
85   stream->cancelled = TRUE;
86   g_clear_error (&stream->last_error);
87
88   gst_adaptive_demux2_stream_fragment_clear (&stream->fragment);
89
90   if (stream->pending_events) {
91     g_list_free_full (stream->pending_events, (GDestroyNotify) gst_event_unref);
92     stream->pending_events = NULL;
93   }
94
95   if (stream->parsebin_sink) {
96     gst_object_unref (stream->parsebin_sink);
97     stream->parsebin_sink = NULL;
98   }
99
100   if (stream->pad_added_id)
101     g_signal_handler_disconnect (stream->parsebin, stream->pad_added_id);
102   if (stream->pad_removed_id)
103     g_signal_handler_disconnect (stream->parsebin, stream->pad_removed_id);
104
105   g_free (stream->fragment_bitrates);
106
107   g_list_free_full (stream->tracks,
108       (GDestroyNotify) gst_adaptive_demux_track_unref);
109
110   if (stream->pending_caps)
111     gst_caps_unref (stream->pending_caps);
112
113   g_clear_pointer (&stream->pending_tags, gst_tag_list_unref);
114   g_clear_pointer (&stream->stream_collection, gst_object_unref);
115
116   G_OBJECT_CLASS (parent_class)->finalize (object);
117 }
118
119 /**
120  * gst_adaptive_demux2_stream_add_track:
121  * @stream: A #GstAdaptiveDemux2Stream
122  * @track: (transfer none): A #GstAdaptiveDemuxTrack to assign to the @stream
123  *
124  * This function is called when a subclass knows of a target @track that this
125  * @stream can provide.
126  */
127 gboolean
128 gst_adaptive_demux2_stream_add_track (GstAdaptiveDemux2Stream * stream,
129     GstAdaptiveDemuxTrack * track)
130 {
131   g_return_val_if_fail (track != NULL, FALSE);
132
133   GST_DEBUG_OBJECT (stream->demux, "stream:%p track:%s", stream,
134       track->stream_id);
135   if (g_list_find (stream->tracks, track)) {
136     GST_DEBUG_OBJECT (stream->demux,
137         "track '%s' already handled by this stream", track->stream_id);
138     return FALSE;
139   }
140
141   stream->tracks =
142       g_list_append (stream->tracks, gst_adaptive_demux_track_ref (track));
143   if (stream->demux) {
144     g_assert (stream->period);
145     gst_adaptive_demux_period_add_track (stream->period, track);
146   }
147   return TRUE;
148 }
149
150 static gboolean
151 gst_adaptive_demux2_stream_next_download (GstAdaptiveDemux2Stream * stream);
152 static gboolean
153 gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream);
154 static void
155 gst_adaptive_demux2_stream_handle_playlist_eos (GstAdaptiveDemux2Stream *
156     stream);
157 static GstFlowReturn
158 gst_adaptive_demux2_stream_begin_download_uri (GstAdaptiveDemux * demux,
159     GstAdaptiveDemux2Stream * stream, const gchar * uri, gint64 start,
160     gint64 end);
161
162 #ifndef GST_DISABLE_GST_DEBUG
163 static const char *
164 uritype (GstAdaptiveDemux2Stream * s)
165 {
166   if (s->downloading_header)
167     return "header";
168   if (s->downloading_index)
169     return "index";
170   return "fragment";
171 }
172 #endif
173
174 /* Schedules another chunked download (returns TRUE) or FALSE if no more chunks */
175 static gboolean
176 schedule_another_chunk (GstAdaptiveDemux2Stream * stream)
177 {
178   GstAdaptiveDemux *demux = stream->demux;
179   DownloadRequest *request = stream->download_request;
180   GstFlowReturn ret;
181
182   gchar *uri = request->uri;
183   gint64 range_start = request->range_start;
184   gint64 range_end = request->range_end;
185   gint64 chunk_size;
186   gint64 chunk_end;
187
188   if (range_end == -1)
189     return FALSE;               /* This was a request to the end, no more to load */
190
191   /* The size of the request that just completed: */
192   chunk_size = range_end + 1 - range_start;
193
194   if (request->content_received < chunk_size)
195     return FALSE;               /* Short read - we're done */
196
197   /* Accumulate the data we just fetched, to figure out the next
198    * request start position and update the target chunk size from
199    * the updated stream fragment info */
200   range_start += chunk_size;
201   range_end = stream->fragment.range_end;
202   chunk_size = stream->fragment.chunk_size;
203
204   if (chunk_size == 0)
205     return FALSE;               /* Sub-class doesn't want another chunk */
206
207   /* HTTP ranges are inclusive for the end */
208   if (chunk_size != -1) {
209     chunk_end = range_start + chunk_size - 1;
210     if (range_end != -1 && range_end < chunk_end)
211       chunk_end = range_end;
212   } else {
213     chunk_end = range_end;
214   }
215
216   GST_DEBUG_OBJECT (stream,
217       "Starting next chunk %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT
218       " chunk_size %" G_GINT64_FORMAT, uri, range_start, chunk_end, chunk_size);
219
220   ret =
221       gst_adaptive_demux2_stream_begin_download_uri (demux, stream, uri,
222       range_start, chunk_end);
223   if (ret != GST_FLOW_OK) {
224     GST_DEBUG_OBJECT (stream,
225         "Stopping stream due to begin download failure - ret %s",
226         gst_flow_get_name (ret));
227     gst_adaptive_demux2_stream_stop (stream);
228     return FALSE;
229   }
230
231   return TRUE;
232 }
233
234 static void
235 drain_inactive_tracks (GstAdaptiveDemux * demux,
236     GstAdaptiveDemux2Stream * stream)
237 {
238   GList *iter;
239
240   TRACKS_LOCK (demux);
241   for (iter = stream->tracks; iter; iter = iter->next) {
242     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
243     if (!track->selected) {
244       gst_adaptive_demux_track_drain_to (track,
245           demux->priv->global_output_position);
246     }
247   }
248
249   TRACKS_UNLOCK (demux);
250 }
251
252 /* Called to complete a download, either due to failure or completion
253  * Should set up the next download if necessary */
254 static void
255 gst_adaptive_demux2_stream_finish_download (GstAdaptiveDemux2Stream *
256     stream, GstFlowReturn ret, GError * err)
257 {
258   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux);
259   GstAdaptiveDemux *demux = stream->demux;
260
261   GST_DEBUG_OBJECT (stream,
262       "%s download finish: %d %s - err: %p", uritype (stream), ret,
263       gst_flow_get_name (ret), err);
264
265   stream->download_finished = TRUE;
266
267   /* finish_fragment might call gst_adaptive_demux2_stream_advance_fragment,
268    * which can look at the last_ret - so make sure it's stored before calling that.
269    * Also, for not-linked or other errors passed in that are going to make
270    * this stream stop, we'll need to store it */
271   stream->last_ret = ret;
272
273   if (err) {
274     g_clear_error (&stream->last_error);
275     stream->last_error = g_error_copy (err);
276   }
277
278   /* For actual errors, stop now, no need to call finish_fragment and get
279    * confused if it returns a non-error status, but if EOS was passed in,
280    * continue and check whether finish_fragment() says we've finished
281    * the whole manifest or just this fragment */
282   if (ret < 0 && ret != GST_FLOW_EOS) {
283     GST_INFO_OBJECT (stream,
284         "Stopping stream due to error ret %s", gst_flow_get_name (ret));
285     gst_adaptive_demux2_stream_stop (stream);
286     return;
287   }
288
289   /* Handle all the possible flow returns here: */
290   if (ret == GST_ADAPTIVE_DEMUX_FLOW_END_OF_FRAGMENT) {
291     /* The sub-class wants to stop the fragment immediately */
292     stream->fragment.finished = TRUE;
293     ret = klass->finish_fragment (demux, stream);
294
295     GST_DEBUG_OBJECT (stream, "finish_fragment ret %d %s", ret,
296         gst_flow_get_name (ret));
297   } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_RESTART_FRAGMENT) {
298     GST_DEBUG_OBJECT (stream, "Restarting download as requested");
299     /* Just mark the fragment as finished */
300     stream->fragment.finished = TRUE;
301     ret = GST_FLOW_OK;
302   } else if (!klass->need_another_chunk || stream->fragment.chunk_size == -1
303       || !klass->need_another_chunk (stream)
304       || stream->fragment.chunk_size == 0) {
305     stream->fragment.finished = TRUE;
306     ret = klass->finish_fragment (stream->demux, stream);
307
308     GST_DEBUG_OBJECT (stream, "finish_fragment ret %d %s", ret,
309         gst_flow_get_name (ret));
310   } else if (stream->fragment.chunk_size != 0
311       && schedule_another_chunk (stream)) {
312     /* Another download has already begun, no need to queue anything below */
313     return;
314   }
315
316   /* For HLS, we might be enqueueing data into tracks that aren't
317    * selected. Drain those ones out */
318   drain_inactive_tracks (stream->demux, stream);
319
320   /* Now that we've called finish_fragment we can clear these flags the
321    * sub-class might have checked */
322   if (stream->downloading_header) {
323     stream->need_header = FALSE;
324     stream->downloading_header = FALSE;
325   } else if (stream->downloading_index) {
326     stream->need_index = FALSE;
327     stream->downloading_index = FALSE;
328     /* Restart the fragment again now that header + index were loaded
329      * so that get_fragment_info() will be called again */
330     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
331   } else {
332     /* Finishing a fragment data download. Try for another */
333     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
334   }
335
336   /* if GST_FLOW_EOS was passed in that means this download is finished,
337    * but it's the result returned from finish_fragment() we really care
338    * about, as that tells us if the manifest has run out of fragments
339    * to load */
340   if (ret == GST_FLOW_EOS) {
341     stream->last_ret = ret;
342
343     gst_adaptive_demux2_stream_handle_playlist_eos (stream);
344     return;
345   }
346
347   /* Now finally, if ret is anything other than success, we should stop this
348    * stream */
349   if (ret < 0) {
350     GST_DEBUG_OBJECT (stream,
351         "Stopping stream due to finish fragment ret %s",
352         gst_flow_get_name (ret));
353     gst_adaptive_demux2_stream_stop (stream);
354     return;
355   }
356
357   /* Clear the last_ret marker before starting a fresh download */
358   stream->last_ret = GST_FLOW_OK;
359
360   GST_LOG_OBJECT (stream, "Scheduling next_download() call");
361   stream->pending_cb_id =
362       gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
363       (GSourceFunc) gst_adaptive_demux2_stream_next_download,
364       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
365 }
366
367 /* Must be called from the scheduler context */
368 void
369 gst_adaptive_demux2_stream_parse_error (GstAdaptiveDemux2Stream * stream,
370     GError * err)
371 {
372   GstAdaptiveDemux *demux = stream->demux;
373
374   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
375     return;
376
377   downloadhelper_cancel_request (demux->download_helper,
378       stream->download_request);
379
380   /* cancellation is async, so recycle our download request to avoid races */
381   download_request_unref (stream->download_request);
382   stream->download_request = download_request_new ();
383
384   gst_adaptive_demux2_stream_finish_download (stream, GST_FLOW_CUSTOM_ERROR,
385       err);
386 }
387
388 static void
389 gst_adaptive_demux2_stream_prepare_segment (GstAdaptiveDemux * demux,
390     GstAdaptiveDemux2Stream * stream, gboolean first_and_live)
391 {
392   GstClockTime period_start = gst_adaptive_demux_get_period_start_time (demux);
393   GstClockTime offset =
394       gst_adaptive_demux2_stream_get_presentation_offset (demux, stream);
395
396   stream->parse_segment = demux->segment;
397
398   /* The demuxer segment is just built from seek events, but for each stream
399    * we have to adjust segments according to the current period and the
400    * stream specific presentation time offset.
401    *
402    * For each period, buffer timestamps start again from 0. Additionally the
403    * buffer timestamps are shifted by the stream specific presentation time
404    * offset, so the first buffer timestamp of a period is 0 + presentation
405    * time offset. If the stream contains timestamps itself, this is also
406    * supposed to be the presentation time stored inside the stream.
407    *
408    * The stream time over periods is supposed to be continuous, that is the
409    * buffer timestamp 0 + presentation time offset should map to the start
410    * time of the current period.
411    *
412    *
413    * The adjustment of the stream segments as such works the following.
414    *
415    * If the demuxer segment start is bigger than the period start, this
416    * means that we have to drop some media at the beginning of the current
417    * period, e.g. because a seek into the middle of the period has
418    * happened. The amount of media to drop is the difference between the
419    * period start and the demuxer segment start, and as each period starts
420    * again from 0, this difference is going to be the actual stream's
421    * segment start. As all timestamps of the stream are shifted by the
422    * presentation time offset, we will also have to move the segment start
423    * by that offset.
424    *
425    * Likewise, the demuxer segment stop value is adjusted in the same
426    * fashion.
427    *
428    * Now the running time and stream time at the stream's segment start has
429    * to be the one that is stored inside the demuxer's segment, which means
430    * that segment.base and segment.time have to be copied over (done just
431    * above)
432    *
433    *
434    * If the demuxer segment start is smaller than the period start time,
435    * this means that the whole period is inside the segment. As each period
436    * starts timestamps from 0, and additionally timestamps are shifted by
437    * the presentation time offset, the stream's first timestamp (and as such
438    * the stream's segment start) has to be the presentation time offset.
439    * The stream time at the segment start is supposed to be the stream time
440    * of the period start according to the demuxer segment, so the stream
441    * segment's time would be set to that. The same goes for the stream
442    * segment's base, which is supposed to be the running time of the period
443    * start according to the demuxer's segment.
444    *
445    * The same logic applies for negative rates with the segment stop and
446    * the period stop time (which gets clamped).
447    *
448    *
449    * For the first case where not the complete period is inside the segment,
450    * the segment time and base as calculated by the second case would be
451    * equivalent.
452    */
453   GST_DEBUG_OBJECT (demux, "Using demux segment %" GST_SEGMENT_FORMAT,
454       &demux->segment);
455   GST_DEBUG_OBJECT (demux,
456       "period_start: %" GST_TIME_FORMAT " offset: %" GST_TIME_FORMAT,
457       GST_TIME_ARGS (period_start), GST_TIME_ARGS (offset));
458   /* note for readers:
459    * Since stream->parse_segment is initially a copy of demux->segment,
460    * only the values that need updating are modified below. */
461   if (first_and_live) {
462     /* If first and live, demuxer did seek to the current position already */
463     stream->parse_segment.start = demux->segment.start - period_start + offset;
464     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
465       stream->parse_segment.stop = demux->segment.stop - period_start + offset;
466     /* FIXME : Do we need to handle negative rates for this ? */
467     stream->parse_segment.position = stream->parse_segment.start;
468   } else if (demux->segment.start > period_start) {
469     /* seek within a period */
470     stream->parse_segment.start = demux->segment.start - period_start + offset;
471     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
472       stream->parse_segment.stop = demux->segment.stop - period_start + offset;
473     if (stream->parse_segment.rate >= 0)
474       stream->parse_segment.position = offset;
475     else
476       stream->parse_segment.position = stream->parse_segment.stop;
477   } else {
478     stream->parse_segment.start = offset;
479     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
480       stream->parse_segment.stop = demux->segment.stop - period_start + offset;
481     if (stream->parse_segment.rate >= 0) {
482       stream->parse_segment.position = offset;
483       stream->parse_segment.base =
484           gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
485           period_start);
486     } else {
487       stream->parse_segment.position = stream->parse_segment.stop;
488       stream->parse_segment.base =
489           gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
490           period_start + demux->segment.stop - demux->segment.start);
491     }
492     stream->parse_segment.time =
493         gst_segment_to_stream_time (&demux->segment, GST_FORMAT_TIME,
494         period_start);
495   }
496
497   stream->send_segment = TRUE;
498
499   GST_DEBUG_OBJECT (stream, "Prepared segment %" GST_SEGMENT_FORMAT,
500       &stream->parse_segment);
501 }
502
503 /* Segment lock hold */
504 static void
505 update_buffer_pts_and_demux_position_locked (GstAdaptiveDemux * demux,
506     GstAdaptiveDemux2Stream * stream, GstBuffer * buffer)
507 {
508   GstClockTimeDiff pos;
509
510   GST_DEBUG_OBJECT (stream, "stream->fragment.stream_time %" GST_STIME_FORMAT,
511       GST_STIME_ARGS (stream->fragment.stream_time));
512
513   pos = stream->fragment.stream_time;
514
515   if (GST_CLOCK_STIME_IS_VALID (pos)) {
516     GstClockTime offset =
517         gst_adaptive_demux2_stream_get_presentation_offset (demux, stream);
518
519     pos += offset;
520
521     if (pos < 0) {
522       GST_WARNING_OBJECT (stream, "Clamping segment and buffer position to 0");
523       pos = 0;
524     }
525
526     GST_BUFFER_PTS (buffer) = pos;
527   } else {
528     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
529   }
530
531   GST_DEBUG_OBJECT (stream, "Buffer/stream position is now: %" GST_TIME_FORMAT,
532       GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
533 }
534
535 /* Must be called from the scheduler context */
536 GstFlowReturn
537 gst_adaptive_demux2_stream_push_buffer (GstAdaptiveDemux2Stream * stream,
538     GstBuffer * buffer)
539 {
540   GstAdaptiveDemux *demux = stream->demux;
541   GstFlowReturn ret = GST_FLOW_OK;
542   gboolean discont = FALSE;
543   /* Pending events */
544   GstEvent *pending_caps = NULL, *pending_segment = NULL, *pending_tags =
545       NULL, *stream_start = NULL, *buffer_gap = NULL;
546   GList *pending_events = NULL;
547
548   if (stream->compute_segment) {
549     gst_adaptive_demux2_stream_prepare_segment (demux, stream,
550         stream->first_and_live);
551     stream->compute_segment = FALSE;
552     stream->first_and_live = FALSE;
553   }
554
555   if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DROPPABLE)) {
556     GST_DEBUG_OBJECT (stream, "Creating gap event for droppable buffer");
557     buffer_gap =
558         gst_event_new_gap (GST_BUFFER_PTS (buffer),
559         GST_BUFFER_DURATION (buffer));
560   }
561
562   if (stream->first_fragment_buffer) {
563     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
564     if (demux->segment.rate < 0)
565       /* Set DISCONT flag for every first buffer in reverse playback mode
566        * as each fragment for its own has to be reversed */
567       discont = TRUE;
568     update_buffer_pts_and_demux_position_locked (demux, stream, buffer);
569     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
570
571     GST_LOG_OBJECT (stream, "Handling initial buffer %" GST_PTR_FORMAT, buffer);
572
573     /* Do we need to inject STREAM_START and SEGMENT events ?
574      *
575      * This can happen when a stream is restarted, and also when switching to a
576      * variant which needs a header (in which case downloading_header will be
577      * TRUE)
578      */
579     if (G_UNLIKELY (stream->send_segment || stream->downloading_header)) {
580       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
581       pending_segment = gst_event_new_segment (&stream->parse_segment);
582       gst_event_set_seqnum (pending_segment, demux->priv->segment_seqnum);
583       stream->send_segment = FALSE;
584       GST_DEBUG_OBJECT (stream, "Sending %" GST_PTR_FORMAT, pending_segment);
585       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
586       stream_start = gst_event_new_stream_start ("bogus");
587       if (demux->have_group_id)
588         gst_event_set_group_id (stream_start, demux->group_id);
589     }
590   } else {
591     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
592   }
593   stream->first_fragment_buffer = FALSE;
594
595   if (stream->discont) {
596     discont = TRUE;
597     stream->discont = FALSE;
598   }
599
600   if (discont) {
601     GST_DEBUG_OBJECT (stream, "Marking fragment as discontinuous");
602     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
603   } else {
604     GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
605   }
606
607   GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
608   GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
609   if (G_UNLIKELY (stream->pending_caps)) {
610     pending_caps = gst_event_new_caps (stream->pending_caps);
611     gst_caps_unref (stream->pending_caps);
612     stream->pending_caps = NULL;
613   }
614
615   if (G_UNLIKELY (stream->pending_tags)) {
616     GstTagList *tags = stream->pending_tags;
617
618     stream->pending_tags = NULL;
619
620     if (tags)
621       pending_tags = gst_event_new_tag (tags);
622   }
623   if (G_UNLIKELY (stream->pending_events)) {
624     pending_events = stream->pending_events;
625     stream->pending_events = NULL;
626   }
627
628   /* Do not push events or buffers holding the manifest lock */
629   if (G_UNLIKELY (stream_start)) {
630     GST_DEBUG_OBJECT (stream,
631         "Setting stream start: %" GST_PTR_FORMAT, stream_start);
632     gst_pad_send_event (stream->parsebin_sink, stream_start);
633   }
634   if (G_UNLIKELY (pending_caps)) {
635     GST_DEBUG_OBJECT (stream,
636         "Setting pending caps: %" GST_PTR_FORMAT, pending_caps);
637     gst_pad_send_event (stream->parsebin_sink, pending_caps);
638   }
639   if (G_UNLIKELY (pending_segment)) {
640     GST_DEBUG_OBJECT (stream,
641         "Sending pending seg: %" GST_PTR_FORMAT, pending_segment);
642     gst_pad_send_event (stream->parsebin_sink, pending_segment);
643   }
644   if (G_UNLIKELY (pending_tags)) {
645     GST_DEBUG_OBJECT (stream,
646         "Sending pending tags: %" GST_PTR_FORMAT, pending_tags);
647     gst_pad_send_event (stream->parsebin_sink, pending_tags);
648   }
649   while (pending_events != NULL) {
650     GstEvent *event = pending_events->data;
651
652     GST_DEBUG_OBJECT (stream, "Sending pending event: %" GST_PTR_FORMAT, event);
653     if (!gst_pad_send_event (stream->parsebin_sink, event))
654       GST_ERROR_OBJECT (stream, "Failed to send pending event");
655
656     pending_events = g_list_delete_link (pending_events, pending_events);
657   }
658
659   GST_DEBUG_OBJECT (stream,
660       "About to push buffer of size %" G_GSIZE_FORMAT " offset %"
661       G_GUINT64_FORMAT, gst_buffer_get_size (buffer),
662       GST_BUFFER_OFFSET (buffer));
663
664   ret = gst_pad_chain (stream->parsebin_sink, buffer);
665
666   if (buffer_gap) {
667     GST_DEBUG_OBJECT (stream, "Sending %" GST_PTR_FORMAT, buffer_gap);
668     gst_pad_send_event (stream->parsebin_sink, buffer_gap);
669   }
670
671   if (G_UNLIKELY (stream->cancelled)) {
672     GST_LOG_OBJECT (demux, "Stream was cancelled");
673     return GST_FLOW_FLUSHING;
674   }
675
676   GST_LOG_OBJECT (stream, "Push result: %d %s", ret, gst_flow_get_name (ret));
677
678   return ret;
679 }
680
681 static GstFlowReturn
682 gst_adaptive_demux2_stream_parse_buffer (GstAdaptiveDemux2Stream * stream,
683     GstBuffer * buffer)
684 {
685   GstAdaptiveDemux *demux = stream->demux;
686   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
687   GstFlowReturn ret = GST_FLOW_OK;
688
689   /* do not make any changes if the stream is cancelled */
690   if (G_UNLIKELY (stream->cancelled)) {
691     gst_buffer_unref (buffer);
692     return GST_FLOW_FLUSHING;
693   }
694
695   /* starting_fragment is set to TRUE at the beginning of
696    * _stream_download_fragment()
697    * /!\ If there is a header/index being downloaded, then this will
698    * be TRUE for the first one ... but FALSE for the remaining ones,
699    * including the *actual* fragment ! */
700   if (stream->starting_fragment) {
701     stream->starting_fragment = FALSE;
702     if (klass->start_fragment != NULL && !klass->start_fragment (demux, stream))
703       return GST_FLOW_ERROR;
704   }
705
706   stream->download_total_bytes += gst_buffer_get_size (buffer);
707
708   GST_TRACE_OBJECT (stream,
709       "Received %s buffer of size %" G_GSIZE_FORMAT, uritype (stream),
710       gst_buffer_get_size (buffer));
711
712   ret = klass->data_received (demux, stream, buffer);
713
714   if (ret != GST_FLOW_OK) {
715     GST_DEBUG_OBJECT (stream, "data_received returned %s",
716         gst_flow_get_name (ret));
717
718     if (ret == GST_FLOW_FLUSHING) {
719       /* do not make any changes if the stream is cancelled */
720       if (G_UNLIKELY (stream->cancelled)) {
721         return ret;
722       }
723     }
724
725     if (ret < GST_FLOW_EOS) {
726       GstEvent *eos = gst_event_new_eos ();
727       GST_ELEMENT_FLOW_ERROR (demux, ret);
728
729       GST_DEBUG_OBJECT (stream, "Pushing EOS to parser");
730
731       /* TODO push this on all pads */
732       gst_event_set_seqnum (eos, stream->demux->priv->segment_seqnum);
733       gst_pad_send_event (stream->parsebin_sink, eos);
734       ret = GST_FLOW_ERROR;
735
736       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_ERRORED;
737     }
738   }
739
740   return ret;
741 }
742
743 /* Calculate the low and high download buffering watermarks
744  * in time as MAX (low-watermark-time, low-watermark-fragments) and
745  * MIN (high-watermark-time, high-watermark-fragments) respectively
746  */
747 static void
748 calculate_track_thresholds (GstAdaptiveDemux * demux,
749     GstClockTime fragment_duration, GstClockTime * low_threshold,
750     GstClockTime * high_threshold)
751 {
752   GST_OBJECT_LOCK (demux);
753   *low_threshold = demux->buffering_low_watermark_fragments * fragment_duration;
754   if (*low_threshold == 0 ||
755       (demux->buffering_low_watermark_time != 0
756           && demux->buffering_low_watermark_time > *low_threshold)) {
757     *low_threshold = demux->buffering_low_watermark_time;
758   }
759
760   *high_threshold =
761       demux->buffering_high_watermark_fragments * fragment_duration;
762   if (*high_threshold == 0 || (demux->buffering_high_watermark_time != 0
763           && demux->buffering_high_watermark_time < *high_threshold)) {
764     *high_threshold = demux->buffering_high_watermark_time;
765   }
766
767   /* Make sure the low and high thresholds are less than the maximum buffering
768    * time */
769   if (*high_threshold == 0 ||
770       (demux->max_buffering_time != 0
771           && demux->max_buffering_time < *high_threshold)) {
772     *high_threshold = demux->max_buffering_time;
773   }
774
775   if (*low_threshold == 0 ||
776       (demux->max_buffering_time != 0
777           && demux->max_buffering_time < *low_threshold)) {
778     *low_threshold = demux->max_buffering_time;
779   }
780
781   /* Make sure the high threshold is higher than (or equal to) the low threshold.
782    * It's OK if they are the same, as the minimum download is 1 fragment */
783   if (*high_threshold == 0 ||
784       (*low_threshold != 0 && *low_threshold > *high_threshold)) {
785     *high_threshold = *low_threshold;
786   }
787   GST_OBJECT_UNLOCK (demux);
788 }
789
790 static gboolean
791 gst_adaptive_demux2_stream_wait_for_output_space (GstAdaptiveDemux * demux,
792     GstAdaptiveDemux2Stream * stream, GstClockTime fragment_duration)
793 {
794   gboolean need_to_wait = TRUE;
795   gboolean have_any_tracks = FALSE;
796   gboolean have_active_tracks = FALSE;
797   gboolean have_filled_inactive = FALSE;
798   gboolean update_buffering = FALSE;
799
800   GstClockTime low_threshold = 0, high_threshold = 0;
801   GList *iter;
802
803   calculate_track_thresholds (demux, fragment_duration,
804       &low_threshold, &high_threshold);
805
806   /* If there are no tracks at all, don't wait. If there are no active
807    * tracks, keep filling until at least one track is full. If there
808    * are active tracks, require that they are all full */
809   TRACKS_LOCK (demux);
810   for (iter = stream->tracks; iter; iter = iter->next) {
811     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
812
813     /* Update the buffering threshold */
814     if (low_threshold != track->buffering_threshold) {
815       /* The buffering threshold for this track changed, make sure to
816        * re-check buffering status */
817       update_buffering = TRUE;
818       track->buffering_threshold = low_threshold;
819     }
820
821     have_any_tracks = TRUE;
822     if (track->active)
823       have_active_tracks = TRUE;
824
825     if (track->level_time < high_threshold) {
826       if (track->active) {
827         need_to_wait = FALSE;
828         GST_DEBUG_OBJECT (demux,
829             "stream %p track %s has level %" GST_TIME_FORMAT
830             " - needs more data (target %" GST_TIME_FORMAT
831             ") (fragment duration %" GST_TIME_FORMAT ")", stream,
832             track->stream_id, GST_TIME_ARGS (track->level_time),
833             GST_TIME_ARGS (high_threshold), GST_TIME_ARGS (fragment_duration));
834         continue;
835       }
836     } else if (!track->active) {        /* track is over threshold and inactive */
837       have_filled_inactive = TRUE;
838     }
839
840     GST_DEBUG_OBJECT (demux,
841         "stream %p track %s active (%d) has level %" GST_TIME_FORMAT,
842         stream, track->stream_id, track->active,
843         GST_TIME_ARGS (track->level_time));
844   }
845
846   /* If there are no tracks, don't wait (we might need data to create them),
847    * or if there are active tracks that need more data to hit the threshold,
848    * don't wait. Otherwise it means all active tracks are full and we should wait */
849   if (!have_any_tracks) {
850     GST_DEBUG_OBJECT (demux, "stream %p has no tracks - not waiting", stream);
851     need_to_wait = FALSE;
852   } else if (!have_active_tracks && !have_filled_inactive) {
853     GST_DEBUG_OBJECT (demux,
854         "stream %p has inactive tracks that need more data - not waiting",
855         stream);
856     need_to_wait = FALSE;
857   }
858
859   if (need_to_wait) {
860     for (iter = stream->tracks; iter; iter = iter->next) {
861       GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
862       track->waiting_del_level = high_threshold;
863       GST_DEBUG_OBJECT (demux,
864           "Waiting for queued data on stream %p track %s to drop below %"
865           GST_TIME_FORMAT " (fragment duration %" GST_TIME_FORMAT ")",
866           stream, track->stream_id, GST_TIME_ARGS (track->waiting_del_level),
867           GST_TIME_ARGS (fragment_duration));
868     }
869   }
870
871   if (update_buffering) {
872     demux_update_buffering_locked (demux);
873     demux_post_buffering_locked (demux);
874   }
875
876   TRACKS_UNLOCK (demux);
877
878   return need_to_wait;
879 }
880
881 static GstAdaptiveDemuxTrack *
882 match_parsebin_to_track (GstAdaptiveDemux2Stream * stream, GstPad * pad)
883 {
884   GList *tmp;
885   GstAdaptiveDemuxTrack *found_track = NULL, *first_matched_track = NULL;
886   gint num_possible_tracks = 0;
887   GstStream *gst_stream;
888   const gchar *internal_stream_id;
889   GstStreamType stream_type;
890
891   gst_stream = gst_pad_get_stream (pad);
892
893   /* FIXME: Edward: Added assertion because I don't see in what cases we would
894    * end up with a pad from parsebin which wouldn't have an associated
895    * GstStream. */
896   g_assert (gst_stream);
897
898   internal_stream_id = gst_stream_get_stream_id (gst_stream);
899   stream_type = gst_stream_get_stream_type (gst_stream);
900
901   GST_DEBUG_OBJECT (pad,
902       "Trying to match pad from parsebin with internal streamid %s and caps %"
903       GST_PTR_FORMAT, GST_STR_NULL (internal_stream_id),
904       gst_stream_get_caps (gst_stream));
905
906   /* Try to match directly by the track's pending upstream_stream_id */
907   for (tmp = stream->tracks; tmp; tmp = tmp->next) {
908     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
909
910     if (stream_type != GST_STREAM_TYPE_UNKNOWN && track->type != stream_type)
911       continue;
912
913     GST_DEBUG_OBJECT (pad, "track upstream_stream_id: %s",
914         track->upstream_stream_id);
915
916     if (first_matched_track == NULL)
917       first_matched_track = track;
918     num_possible_tracks++;
919
920     /* If this track has a desired upstream stream id, match on it */
921     if (track->upstream_stream_id == NULL ||
922         g_strcmp0 (track->upstream_stream_id, internal_stream_id)) {
923       /* This is not the track for this pad */
924       continue;
925     }
926
927     /* Remove pending upstream id (we have matched it for the pending
928      * stream_id) */
929     g_free (track->upstream_stream_id);
930     track->upstream_stream_id = NULL;
931     found_track = track;
932     break;
933   }
934
935   if (found_track == NULL) {
936     /* If we arrive here, it means the stream is switching pads after
937      * the stream has already started running */
938     /* No track is currently waiting for this particular stream id -
939      * try and match an existing linked track. If there's only 1 possible,
940      * take it. */
941     if (num_possible_tracks == 1 && first_matched_track != NULL) {
942       GST_LOG_OBJECT (pad, "Only one possible track to link to");
943       found_track = first_matched_track;
944     }
945   }
946
947   if (found_track == NULL) {
948     /* TODO: There are multiple possible tracks, need to match based
949      * on language code and caps. Have you found a stream like this? */
950     GST_FIXME_OBJECT (pad, "Need to match track based on caps and language");
951   }
952
953   if (found_track != NULL) {
954     if (!gst_pad_is_linked (found_track->sinkpad)) {
955       GST_LOG_OBJECT (pad, "Linking to track pad %" GST_PTR_FORMAT,
956           found_track->sinkpad);
957
958       if (gst_pad_link (pad, found_track->sinkpad) != GST_PAD_LINK_OK) {
959         GST_ERROR_OBJECT (pad, "Couldn't connect to track sinkpad");
960         /* FIXME : Do something if we can't link ? */
961       }
962     } else {
963       /* Store pad as pending link */
964       GST_LOG_OBJECT (pad,
965           "Remembering pad to be linked when current pad is unlinked");
966       g_assert (found_track->pending_srcpad == NULL);
967       found_track->pending_srcpad = gst_object_ref (pad);
968     }
969   }
970
971   if (gst_stream)
972     gst_object_unref (gst_stream);
973
974   return found_track;
975 }
976
977 static void
978 parsebin_pad_removed_cb (GstElement * parsebin, GstPad * pad,
979     GstAdaptiveDemux2Stream * stream)
980 {
981   GList *iter;
982   GST_DEBUG_OBJECT (stream, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
983
984   /* Remove from pending source pad */
985   TRACKS_LOCK (stream->demux);
986   for (iter = stream->tracks; iter; iter = iter->next) {
987     GstAdaptiveDemuxTrack *track = iter->data;
988     if (track->pending_srcpad == pad) {
989       gst_object_unref (track->pending_srcpad);
990       track->pending_srcpad = NULL;
991       break;
992     }
993   }
994   TRACKS_UNLOCK (stream->demux);
995 }
996
997 static void
998 parsebin_pad_added_cb (GstElement * parsebin, GstPad * pad,
999     GstAdaptiveDemux2Stream * stream)
1000 {
1001   if (!GST_PAD_IS_SRC (pad))
1002     return;
1003
1004   GST_DEBUG_OBJECT (stream, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1005
1006   if (!match_parsebin_to_track (stream, pad))
1007     GST_WARNING_OBJECT (pad, "Found no track to handle pad");
1008
1009   GST_DEBUG_OBJECT (stream->demux, "Done linking");
1010 }
1011
1012 static void
1013 parsebin_deep_element_added_cb (GstBin * parsebin, GstBin * unused,
1014     GstElement * element, GstAdaptiveDemux * demux)
1015 {
1016   if (G_OBJECT_TYPE (element) == tsdemux_type) {
1017     GST_DEBUG_OBJECT (demux, "Overriding tsdemux ignore-pcr to TRUE");
1018     g_object_set (element, "ignore-pcr", TRUE, NULL);
1019   }
1020 }
1021
1022 /* must be called with manifest_lock taken */
1023 static gboolean
1024 gst_adaptive_demux2_stream_create_parser (GstAdaptiveDemux2Stream * stream)
1025 {
1026   GstAdaptiveDemux *demux = stream->demux;
1027
1028   if (stream->parsebin == NULL) {
1029     GstEvent *event;
1030
1031     GST_DEBUG_OBJECT (demux, "Setting up new parsing source");
1032
1033     /* Workaround to detect if tsdemux is being used */
1034     if (tsdemux_type == 0) {
1035       GstElement *element = gst_element_factory_make ("tsdemux", NULL);
1036       if (element) {
1037         tsdemux_type = G_OBJECT_TYPE (element);
1038         gst_object_unref (element);
1039       }
1040     }
1041
1042     stream->parsebin = gst_element_factory_make ("parsebin", NULL);
1043     if (tsdemux_type)
1044       g_signal_connect (stream->parsebin, "deep-element-added",
1045           (GCallback) parsebin_deep_element_added_cb, demux);
1046     gst_bin_add (GST_BIN_CAST (demux), stream->parsebin);
1047     stream->parsebin_sink =
1048         gst_element_get_static_pad (stream->parsebin, "sink");
1049     stream->pad_added_id = g_signal_connect (stream->parsebin, "pad-added",
1050         G_CALLBACK (parsebin_pad_added_cb), stream);
1051     stream->pad_removed_id = g_signal_connect (stream->parsebin, "pad-removed",
1052         G_CALLBACK (parsebin_pad_removed_cb), stream);
1053
1054     event = gst_event_new_stream_start ("bogus");
1055     if (demux->have_group_id)
1056       gst_event_set_group_id (event, demux->group_id);
1057
1058     gst_pad_send_event (stream->parsebin_sink, event);
1059
1060     /* Not sure if these need to be outside the manifest lock: */
1061     gst_element_sync_state_with_parent (stream->parsebin);
1062     stream->last_status_code = 200;     /* default to OK */
1063   }
1064   return TRUE;
1065 }
1066
1067 static void
1068 on_download_cancellation (DownloadRequest * request, DownloadRequestState state,
1069     GstAdaptiveDemux2Stream * stream)
1070 {
1071 }
1072
1073 static void
1074 on_download_error (DownloadRequest * request, DownloadRequestState state,
1075     GstAdaptiveDemux2Stream * stream)
1076 {
1077   GstAdaptiveDemux *demux = stream->demux;
1078   guint last_status_code = request->status_code;
1079   gboolean live;
1080
1081   stream->download_active = FALSE;
1082   stream->last_status_code = last_status_code;
1083
1084   GST_DEBUG_OBJECT (stream,
1085       "Download finished with error, request state %d http status %u, dc %d",
1086       request->state, last_status_code, stream->download_error_count);
1087
1088   live = gst_adaptive_demux_is_live (demux);
1089   if (((last_status_code / 100 == 4 && live)
1090           || last_status_code / 100 == 5)) {
1091     /* 4xx/5xx */
1092     /* if current position is before available start, switch to next */
1093     if (!gst_adaptive_demux2_stream_has_next_fragment (demux, stream))
1094       goto flushing;
1095
1096     if (live) {
1097       gint64 range_start, range_stop;
1098
1099       if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
1100               &range_stop))
1101         goto flushing;
1102
1103       if (demux->segment.position < range_start) {
1104         GstFlowReturn ret;
1105
1106         GST_DEBUG_OBJECT (stream, "Retrying once with next segment");
1107         gst_adaptive_demux2_stream_finish_download (stream, GST_FLOW_EOS, NULL);
1108
1109         GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1110
1111         ret = gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
1112         GST_DEBUG_OBJECT (stream, "update_fragment_info ret: %s",
1113             gst_flow_get_name (ret));
1114
1115         if (ret == GST_FLOW_OK)
1116           goto again;
1117
1118       } else if (demux->segment.position > range_stop) {
1119         /* wait a bit to be in range, we don't have any locks at that point */
1120         GstClockTime wait_time =
1121             gst_adaptive_demux2_stream_get_fragment_waiting_time (demux,
1122             stream);
1123         if (wait_time > 0) {
1124           GST_DEBUG_OBJECT (stream,
1125               "Download waiting for %" GST_TIME_FORMAT,
1126               GST_TIME_ARGS (wait_time));
1127           g_assert (stream->pending_cb_id == 0);
1128           GST_LOG_OBJECT (stream, "Scheduling delayed load_a_fragment() call");
1129           stream->pending_cb_id =
1130               gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1131               wait_time,
1132               (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1133               gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1134         }
1135       }
1136     }
1137
1138   flushing:
1139     if (stream->download_error_count >= MAX_DOWNLOAD_ERROR_COUNT) {
1140       /* looks like there is no way of knowing when a live stream has ended
1141        * Have to assume we are falling behind and cause a manifest reload */
1142       GST_DEBUG_OBJECT (stream, "Converting error of live stream to EOS");
1143       gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1144       return;
1145     }
1146   } else if (!gst_adaptive_demux2_stream_has_next_fragment (demux, stream)) {
1147     /* If this is the last fragment, consider failures EOS and not actual
1148      * errors. Due to rounding errors in the durations, the last fragment
1149      * might not actually exist */
1150     GST_DEBUG_OBJECT (stream, "Converting error for last fragment to EOS");
1151     gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1152     return;
1153   } else {
1154     /* retry same segment */
1155     if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
1156       gst_adaptive_demux2_stream_error (stream);
1157       return;
1158     }
1159     goto again;
1160   }
1161
1162 again:
1163   /* wait a short time in case the server needs a bit to recover */
1164   GST_LOG_OBJECT (stream,
1165       "Scheduling delayed load_a_fragment() call to retry in 10 milliseconds");
1166   g_assert (stream->pending_cb_id == 0);
1167   stream->pending_cb_id = gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task, 10 * GST_MSECOND,  /* Retry in 10 ms */
1168       (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1169       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1170 }
1171
1172 static void
1173 update_stream_bitrate (GstAdaptiveDemux2Stream * stream,
1174     DownloadRequest * request)
1175 {
1176   GstClockTimeDiff last_download_duration;
1177   guint64 fragment_bytes_downloaded = request->content_received;
1178
1179   /* The stream last_download time tracks the full download time for now */
1180   stream->last_download_time =
1181       GST_CLOCK_DIFF (request->download_request_time,
1182       request->download_end_time);
1183
1184   /* Here we only track the time the data took to arrive and ignore request delay, so we can estimate bitrate */
1185   last_download_duration =
1186       GST_CLOCK_DIFF (request->download_start_time, request->download_end_time);
1187
1188   /* If the entire response arrived in the first buffer
1189    * though, include the request time to get a valid
1190    * bitrate estimate */
1191   if (last_download_duration < 2 * stream->last_download_time)
1192     last_download_duration = stream->last_download_time;
1193
1194   if (last_download_duration > 0) {
1195     stream->last_bitrate =
1196         gst_util_uint64_scale (fragment_bytes_downloaded,
1197         8 * GST_SECOND, last_download_duration);
1198
1199     GST_DEBUG_OBJECT (stream,
1200         "Updated stream bitrate. %" G_GUINT64_FORMAT
1201         " bytes. download time %" GST_TIME_FORMAT " bitrate %"
1202         G_GUINT64_FORMAT " bps", fragment_bytes_downloaded,
1203         GST_TIME_ARGS (last_download_duration), stream->last_bitrate);
1204   }
1205 }
1206
1207 static void
1208 on_download_progress (DownloadRequest * request, DownloadRequestState state,
1209     GstAdaptiveDemux2Stream * stream)
1210 {
1211   GstAdaptiveDemux *demux = stream->demux;
1212   GstBuffer *buffer = download_request_take_buffer (request);
1213
1214   if (buffer) {
1215     GstFlowReturn ret;
1216
1217     GST_DEBUG_OBJECT (stream,
1218         "Handling buffer of %" G_GSIZE_FORMAT
1219         " bytes of ongoing download progress - %" G_GUINT64_FORMAT " / %"
1220         G_GUINT64_FORMAT " bytes", gst_buffer_get_size (buffer),
1221         request->content_received, request->content_length);
1222
1223     /* Drop the request lock when parsing data. FIXME: Check and comment why this is needed */
1224     download_request_unlock (request);
1225     ret = gst_adaptive_demux2_stream_parse_buffer (stream, buffer);
1226     download_request_lock (request);
1227
1228     if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
1229       return;
1230
1231     if (ret != GST_FLOW_OK) {
1232       GST_DEBUG_OBJECT (stream,
1233           "Buffer parsing returned: %d %s. Aborting download", ret,
1234           gst_flow_get_name (ret));
1235
1236       if (!stream->downloading_header && !stream->downloading_index)
1237         update_stream_bitrate (stream, request);
1238
1239       downloadhelper_cancel_request (demux->download_helper, request);
1240
1241       /* cancellation is async, so recycle our download request to avoid races */
1242       download_request_unref (stream->download_request);
1243       stream->download_request = download_request_new ();
1244
1245       gst_adaptive_demux2_stream_finish_download (stream, ret, NULL);
1246     }
1247   }
1248 }
1249
1250 static void
1251 on_download_complete (DownloadRequest * request, DownloadRequestState state,
1252     GstAdaptiveDemux2Stream * stream)
1253 {
1254   GstFlowReturn ret = GST_FLOW_OK;
1255   GstBuffer *buffer;
1256
1257   stream->download_active = FALSE;
1258
1259   if (G_UNLIKELY (stream->cancelled))
1260     return;
1261
1262   GST_DEBUG_OBJECT (stream,
1263       "Stream %p %s download for %s is complete with state %d",
1264       stream, uritype (stream), request->uri, request->state);
1265
1266   /* Update bitrate for fragment downloads */
1267   if (!stream->downloading_header && !stream->downloading_index)
1268     update_stream_bitrate (stream, request);
1269
1270   buffer = download_request_take_buffer (request);
1271   if (buffer)
1272     ret = gst_adaptive_demux2_stream_parse_buffer (stream, buffer);
1273
1274   GST_DEBUG_OBJECT (stream,
1275       "%s download finished: %s ret %d %s. Stream state %d", uritype (stream),
1276       request->uri, ret, gst_flow_get_name (ret), stream->state);
1277
1278   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
1279     return;
1280
1281   g_assert (stream->pending_cb_id == 0);
1282   gst_adaptive_demux2_stream_finish_download (stream, ret, NULL);
1283 }
1284
1285 /* must be called from the scheduler context
1286  *
1287  * Will submit the request only, which will complete asynchronously
1288  */
1289 static GstFlowReturn
1290 gst_adaptive_demux2_stream_begin_download_uri (GstAdaptiveDemux * demux,
1291     GstAdaptiveDemux2Stream * stream, const gchar * uri, gint64 start,
1292     gint64 end)
1293 {
1294   DownloadRequest *request = stream->download_request;
1295
1296   GST_DEBUG_OBJECT (demux,
1297       "Downloading %s uri: %s, range:%" G_GINT64_FORMAT " - %" G_GINT64_FORMAT,
1298       uritype (stream), uri, start, end);
1299
1300   if (!gst_adaptive_demux2_stream_create_parser (stream))
1301     return GST_FLOW_ERROR;
1302
1303   /* Configure our download request */
1304   download_request_set_uri (request, uri, start, end);
1305
1306   if (stream->downloading_header || stream->downloading_index) {
1307     download_request_set_callbacks (request,
1308         (DownloadRequestEventCallback) on_download_complete,
1309         (DownloadRequestEventCallback) on_download_error,
1310         (DownloadRequestEventCallback) on_download_cancellation,
1311         (DownloadRequestEventCallback) NULL, stream);
1312   } else {
1313     download_request_set_callbacks (request,
1314         (DownloadRequestEventCallback) on_download_complete,
1315         (DownloadRequestEventCallback) on_download_error,
1316         (DownloadRequestEventCallback) on_download_cancellation,
1317         (DownloadRequestEventCallback) on_download_progress, stream);
1318   }
1319
1320   if (!downloadhelper_submit_request (demux->download_helper,
1321           demux->manifest_uri, DOWNLOAD_FLAG_NONE, request, NULL))
1322     return GST_FLOW_ERROR;
1323
1324   stream->download_active = TRUE;
1325
1326   return GST_FLOW_OK;
1327 }
1328
1329 /* must be called from the scheduler context */
1330 static GstFlowReturn
1331 gst_adaptive_demux2_stream_download_fragment (GstAdaptiveDemux2Stream * stream)
1332 {
1333   GstAdaptiveDemux *demux = stream->demux;
1334   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1335   gchar *url = NULL;
1336
1337   /* FIXME :  */
1338   /* THERE ARE THREE DIFFERENT VARIABLES FOR THE "BEGINNING" OF A FRAGMENT ! */
1339   if (stream->starting_fragment) {
1340     GST_DEBUG_OBJECT (stream, "Downloading %s%s%s",
1341         stream->fragment.uri ? "FRAGMENT " : "",
1342         stream->need_header && stream->fragment.header_uri ? "HEADER " : "",
1343         stream->need_index && stream->fragment.index_uri ? "INDEX" : "");
1344
1345     if (stream->fragment.uri == NULL && stream->fragment.header_uri == NULL &&
1346         stream->fragment.index_uri == NULL)
1347       goto no_url_error;
1348
1349     stream->first_fragment_buffer = TRUE;
1350     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING;
1351   }
1352
1353   if (stream->need_header && stream->fragment.header_uri != NULL) {
1354
1355     /* Set the need_index flag when we start the header if we'll also need the index */
1356     stream->need_index = (stream->fragment.index_uri != NULL);
1357
1358     GST_DEBUG_OBJECT (stream, "Fetching header %s %" G_GINT64_FORMAT "-%"
1359         G_GINT64_FORMAT, stream->fragment.header_uri,
1360         stream->fragment.header_range_start, stream->fragment.header_range_end);
1361
1362     stream->downloading_header = TRUE;
1363
1364     return gst_adaptive_demux2_stream_begin_download_uri (demux, stream,
1365         stream->fragment.header_uri, stream->fragment.header_range_start,
1366         stream->fragment.header_range_end);
1367   }
1368
1369   /* check if we have an index */
1370   if (stream->need_index && stream->fragment.index_uri != NULL) {
1371     GST_DEBUG_OBJECT (stream,
1372         "Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
1373         stream->fragment.index_uri,
1374         stream->fragment.index_range_start, stream->fragment.index_range_end);
1375
1376     stream->downloading_index = TRUE;
1377
1378     return gst_adaptive_demux2_stream_begin_download_uri (demux, stream,
1379         stream->fragment.index_uri, stream->fragment.index_range_start,
1380         stream->fragment.index_range_end);
1381   }
1382
1383   url = stream->fragment.uri;
1384   GST_DEBUG_OBJECT (stream, "Got url '%s' for stream %p", url, stream);
1385   if (!url)
1386     return GST_FLOW_OK;
1387
1388   /* Download the actual fragment, either in chunks or in one go */
1389   stream->first_fragment_buffer = TRUE;
1390
1391   if (klass->need_another_chunk && klass->need_another_chunk (stream)
1392       && stream->fragment.chunk_size != 0) {
1393     /* Handle chunk downloading */
1394     gint64 range_start = stream->fragment.range_start;
1395     gint64 range_end = stream->fragment.range_end;
1396     gint chunk_size = stream->fragment.chunk_size;
1397     gint64 chunk_end;
1398
1399     /* HTTP ranges are inclusive for the end */
1400     if (chunk_size != -1) {
1401       chunk_end = range_start + chunk_size - 1;
1402       if (range_end != -1 && range_end < chunk_end)
1403         chunk_end = range_end;
1404     } else {
1405       chunk_end = range_end;
1406     }
1407
1408     GST_DEBUG_OBJECT (stream,
1409         "Starting chunked download %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
1410         url, range_start, chunk_end);
1411     return gst_adaptive_demux2_stream_begin_download_uri (demux, stream, url,
1412         range_start, chunk_end);
1413   }
1414
1415   /* regular single chunk download */
1416   stream->fragment.chunk_size = 0;
1417
1418   return gst_adaptive_demux2_stream_begin_download_uri (demux, stream, url,
1419       stream->fragment.range_start, stream->fragment.range_end);
1420
1421 no_url_error:
1422   {
1423     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1424         (_("Failed to get fragment URL.")),
1425         ("An error happened when getting fragment URL"));
1426     return GST_FLOW_ERROR;
1427   }
1428 }
1429
1430 static gboolean
1431 gst_adaptive_demux2_stream_push_event (GstAdaptiveDemux2Stream * stream,
1432     GstEvent * event)
1433 {
1434   gboolean ret = TRUE;
1435   GstPad *pad;
1436
1437   /* If there's a parsebin, push the event through it */
1438   if (stream->parsebin_sink != NULL) {
1439     pad = gst_object_ref (stream->parsebin_sink);
1440     GST_DEBUG_OBJECT (pad, "Pushing event %" GST_PTR_FORMAT, event);
1441     ret = gst_pad_send_event (pad, gst_event_ref (event));
1442     gst_object_unref (pad);
1443   }
1444
1445   /* If the event is EOS, ensure that all tracks are EOS. This catches
1446    * the case where the parsebin hasn't parsed anything yet (we switched
1447    * to a never before used track right near EOS, or it didn't parse enough
1448    * to create pads and be able to send EOS through to the tracks.
1449    *
1450    * We don't need to care about any other events
1451    */
1452   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
1453     GstAdaptiveDemux *demux = stream->demux;
1454     GList *iter;
1455
1456     TRACKS_LOCK (demux);
1457     for (iter = stream->tracks; iter; iter = iter->next) {
1458       GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
1459       ret &= gst_pad_send_event (track->sinkpad, gst_event_ref (event));
1460     }
1461     TRACKS_UNLOCK (demux);
1462   }
1463
1464   gst_event_unref (event);
1465   return ret;
1466 }
1467
1468 static void
1469 gst_adaptive_demux2_stream_error (GstAdaptiveDemux2Stream * stream)
1470 {
1471   GstAdaptiveDemux *demux = stream->demux;
1472   GstMessage *msg;
1473   GstStructure *details;
1474
1475   details = gst_structure_new_empty ("details");
1476   gst_structure_set (details, "http-status-code", G_TYPE_UINT,
1477       stream->last_status_code, NULL);
1478
1479   stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_ERRORED;
1480
1481   if (stream->last_error) {
1482     gchar *debug = g_strdup_printf ("Error on stream %s",
1483         GST_OBJECT_NAME (stream));
1484     msg =
1485         gst_message_new_error_with_details (GST_OBJECT_CAST (demux),
1486         stream->last_error, debug, details);
1487     GST_ERROR_OBJECT (stream, "Download error: %s",
1488         stream->last_error->message);
1489     g_free (debug);
1490   } else {
1491     GError *err = g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_NOT_FOUND,
1492         _("Couldn't download fragments"));
1493     msg =
1494         gst_message_new_error_with_details (GST_OBJECT_CAST (demux), err,
1495         "Fragment downloading has failed consecutive times", details);
1496     g_error_free (err);
1497     GST_ERROR_OBJECT (stream,
1498         "Download error: Couldn't download fragments, too many failures");
1499   }
1500
1501   gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
1502 }
1503
1504 /* Called when a stream reaches the end of a playback segment */
1505 static void
1506 gst_adaptive_demux2_stream_end_of_manifest (GstAdaptiveDemux2Stream * stream)
1507 {
1508   GstAdaptiveDemux *demux = stream->demux;
1509   GstFlowReturn combined =
1510       gst_adaptive_demux_period_combine_stream_flows (demux->input_period);
1511
1512   GST_DEBUG_OBJECT (stream, "Combined flow %s", gst_flow_get_name (combined));
1513
1514   if (demux->priv->outputs) {
1515     GstEvent *eos = gst_event_new_eos ();
1516
1517     GST_DEBUG_OBJECT (stream, "Stream is EOS. Stopping.");
1518     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS;
1519
1520     gst_event_set_seqnum (eos, stream->demux->priv->segment_seqnum);
1521     gst_adaptive_demux2_stream_push_event (stream, eos);
1522   } else {
1523     GST_ERROR_OBJECT (demux, "Can't push EOS on non-exposed pad");
1524     gst_adaptive_demux2_stream_error (stream);
1525   }
1526
1527   if (combined == GST_FLOW_EOS && gst_adaptive_demux_has_next_period (demux)) {
1528     GST_DEBUG_OBJECT (stream, "Next period available, advancing");
1529     gst_adaptive_demux_advance_period (demux);
1530   }
1531 }
1532
1533 static gboolean
1534 gst_adaptive_demux2_stream_reload_manifest_cb (GstAdaptiveDemux2Stream * stream)
1535 {
1536   GstAdaptiveDemux *demux = stream->demux;
1537
1538   gboolean is_live = gst_adaptive_demux_is_live (demux);
1539
1540   stream->pending_cb_id = 0;
1541
1542   /* Refetch the playlist now after we waited */
1543   /* FIXME: Make this manifest update async and handle it on completion */
1544   if (!is_live && gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
1545     GST_DEBUG_OBJECT (demux, "Updated the playlist");
1546   }
1547
1548   /* We were called here from a timeout, so if the load function wants to loop
1549    * again, schedule an immediate callback but return G_SOURCE_REMOVE either
1550    * way */
1551   while (gst_adaptive_demux2_stream_next_download (stream));
1552
1553   return G_SOURCE_REMOVE;
1554 }
1555
1556 static gboolean
1557 gst_adaptive_demux2_stream_on_output_space_available_cb (GstAdaptiveDemux2Stream
1558     * stream)
1559 {
1560   /* If the state already moved on, the stream was stopped, or another track
1561    * already woke up and needed data */
1562   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE)
1563     return G_SOURCE_REMOVE;
1564
1565   while (gst_adaptive_demux2_stream_load_a_fragment (stream));
1566
1567   return G_SOURCE_REMOVE;
1568 }
1569
1570 void
1571 gst_adaptive_demux2_stream_on_output_space_available (GstAdaptiveDemux2Stream *
1572     stream)
1573 {
1574   GstAdaptiveDemux *demux = stream->demux;
1575   GList *iter;
1576
1577   for (iter = stream->tracks; iter; iter = iter->next) {
1578     GstAdaptiveDemuxTrack *tmp_track = (GstAdaptiveDemuxTrack *) iter->data;
1579     tmp_track->waiting_del_level = 0;
1580   }
1581
1582   GST_LOG_OBJECT (stream, "Scheduling output_space_available() call");
1583
1584   gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
1585       (GSourceFunc) gst_adaptive_demux2_stream_on_output_space_available_cb,
1586       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1587 }
1588
1589 void
1590 gst_adaptive_demux2_stream_on_manifest_update (GstAdaptiveDemux2Stream * stream)
1591 {
1592   GstAdaptiveDemux *demux = stream->demux;
1593
1594   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE)
1595     return;
1596
1597   g_assert (stream->pending_cb_id == 0);
1598
1599   GST_LOG_OBJECT (stream, "Scheduling load_a_fragment() call");
1600   stream->pending_cb_id =
1601       gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
1602       (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1603       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1604 }
1605
1606 static void
1607 gst_adaptive_demux2_stream_handle_playlist_eos (GstAdaptiveDemux2Stream *
1608     stream)
1609 {
1610   GstAdaptiveDemux *demux = stream->demux;
1611
1612   if (gst_adaptive_demux_is_live (demux) && (demux->segment.rate == 1.0
1613           || gst_adaptive_demux2_stream_in_live_seek_range (demux, stream))) {
1614
1615     if (!gst_adaptive_demux_has_next_period (demux)) {
1616       /* Wait only if we can ensure current manifest has been expired.
1617        * The meaning "we have next period" *WITH* EOS is that, current
1618        * period has been ended but we can continue to the next period */
1619       GST_DEBUG_OBJECT (stream,
1620           "Live playlist EOS - waiting for manifest update");
1621       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE;
1622       gst_adaptive_demux2_stream_wants_manifest_update (demux);
1623       return;
1624     }
1625
1626     if (stream->replaced)
1627       return;
1628   }
1629
1630   gst_adaptive_demux2_stream_end_of_manifest (stream);
1631 }
1632
1633 static gboolean
1634 gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream)
1635 {
1636   GstAdaptiveDemux *demux = stream->demux;
1637   gboolean live = gst_adaptive_demux_is_live (demux);
1638   GstFlowReturn ret = GST_FLOW_OK;
1639
1640   stream->pending_cb_id = 0;
1641
1642   GST_LOG_OBJECT (stream, "entering, state = %d.", stream->state);
1643
1644   switch (stream->state) {
1645     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART:
1646     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT:
1647     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_LIVE:
1648     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE:
1649     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE:
1650       /* Get information about the fragment to download */
1651       GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1652       ret = gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
1653       GST_DEBUG_OBJECT (stream,
1654           "Fragment info update result: %d %s", ret, gst_flow_get_name (ret));
1655
1656       if (ret == GST_FLOW_OK)
1657         stream->starting_fragment = TRUE;
1658       break;
1659     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING:
1660       break;
1661     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS:
1662       GST_ERROR_OBJECT (stream,
1663           "Unexpected stream state EOS. The stream should not be running now.");
1664       return FALSE;
1665     default:
1666       GST_ERROR_OBJECT (stream, "Unexpected stream state %d", stream->state);
1667       g_assert_not_reached ();
1668       break;
1669   }
1670
1671   if (ret == GST_FLOW_OK) {
1672     /* Wait for room in the output tracks */
1673     if (gst_adaptive_demux2_stream_wait_for_output_space (demux, stream,
1674             stream->fragment.duration)) {
1675       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE;
1676       return FALSE;
1677     }
1678   }
1679
1680   if (ret == GST_FLOW_OK) {
1681     /* wait for live fragments to be available */
1682     if (live) {
1683       GstClockTime wait_time =
1684           gst_adaptive_demux2_stream_get_fragment_waiting_time (demux, stream);
1685       if (wait_time > 0) {
1686         GST_DEBUG_OBJECT (stream,
1687             "Download waiting for %" GST_TIME_FORMAT,
1688             GST_TIME_ARGS (wait_time));
1689
1690         stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_LIVE;
1691
1692         GST_LOG_OBJECT (stream, "Scheduling delayed load_a_fragment() call");
1693         g_assert (stream->pending_cb_id == 0);
1694         stream->pending_cb_id =
1695             gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1696             wait_time, (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1697             gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1698         return FALSE;
1699       }
1700     }
1701
1702     if (gst_adaptive_demux2_stream_download_fragment (stream) != GST_FLOW_OK) {
1703       GST_ERROR_OBJECT (demux,
1704           "Failed to begin fragment download for stream %p", stream);
1705       return FALSE;
1706     }
1707   }
1708
1709   switch (ret) {
1710     case GST_FLOW_OK:
1711       break;                    /* all is good, let's go */
1712     case GST_FLOW_EOS:
1713       GST_DEBUG_OBJECT (stream, "EOS, checking to stop download loop");
1714       gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1715       return FALSE;
1716     case GST_FLOW_NOT_LINKED:
1717     {
1718       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS;
1719
1720       if (gst_adaptive_demux_period_combine_stream_flows (demux->input_period)
1721           == GST_FLOW_NOT_LINKED) {
1722         GST_ELEMENT_FLOW_ERROR (demux, ret);
1723       }
1724     }
1725       break;
1726
1727     case GST_FLOW_FLUSHING:
1728       /* Flushing is normal, the target track might have been unselected */
1729       if (G_UNLIKELY (stream->cancelled))
1730         return FALSE;
1731
1732     default:
1733       if (ret <= GST_FLOW_ERROR) {
1734         GST_WARNING_OBJECT (demux, "Error while downloading fragment");
1735         if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
1736           gst_adaptive_demux2_stream_error (stream);
1737           return FALSE;
1738         }
1739
1740         g_clear_error (&stream->last_error);
1741
1742         /* First try to update the playlist for non-live playlists
1743          * in case the URIs have changed in the meantime. But only
1744          * try it the first time, after that we're going to wait a
1745          * a bit to not flood the server */
1746         if (stream->download_error_count == 1
1747             && !gst_adaptive_demux_is_live (demux)) {
1748           /* TODO hlsdemux had more options to this function (boolean and err) */
1749           if (gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
1750             /* Retry immediately, the playlist actually has changed */
1751             GST_DEBUG_OBJECT (demux, "Updated the playlist");
1752             return TRUE;
1753           }
1754         }
1755
1756         /* Wait half the fragment duration before retrying */
1757         GST_LOG_OBJECT (stream, "Scheduling delayed reload_manifest_cb() call");
1758         g_assert (stream->pending_cb_id == 0);
1759         stream->pending_cb_id =
1760             gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1761             stream->fragment.duration / 2,
1762             (GSourceFunc) gst_adaptive_demux2_stream_reload_manifest_cb,
1763             gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1764         return FALSE;
1765       }
1766       break;
1767   }
1768
1769   return FALSE;
1770 }
1771
1772 static gboolean
1773 gst_adaptive_demux2_stream_next_download (GstAdaptiveDemux2Stream * stream)
1774 {
1775   GstAdaptiveDemux *demux = stream->demux;
1776   gboolean end_of_manifest = FALSE;
1777
1778   GST_LOG_OBJECT (stream, "Looking for next download");
1779
1780   /* Restarting download, figure out new position
1781    * FIXME : Move this to a separate function ? */
1782   if (G_UNLIKELY (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART)) {
1783     GstClockTimeDiff stream_time = 0;
1784
1785     GST_DEBUG_OBJECT (stream, "Activating stream after restart");
1786
1787     if (stream->parsebin_sink != NULL) {
1788       /* If the parsebin already exists, we need to clear it out (if it doesn't,
1789        * this is the first time we've used this stream, so it's all good) */
1790       gst_adaptive_demux2_stream_push_event (stream,
1791           gst_event_new_flush_start ());
1792       gst_adaptive_demux2_stream_push_event (stream,
1793           gst_event_new_flush_stop (FALSE));
1794     }
1795
1796     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1797     stream_time = stream->start_position;
1798
1799     GST_DEBUG_OBJECT (stream, "Restarting stream at "
1800         "stream position %" GST_STIME_FORMAT, GST_STIME_ARGS (stream_time));
1801
1802     if (GST_CLOCK_STIME_IS_VALID (stream_time)) {
1803       /* TODO check return */
1804       gst_adaptive_demux2_stream_seek (demux, stream, demux->segment.rate >= 0,
1805           0, stream_time, &stream_time);
1806       stream->current_position = stream->start_position;
1807
1808       GST_DEBUG_OBJECT (stream,
1809           "stream_time after restart seek: %" GST_STIME_FORMAT
1810           " position %" GST_STIME_FORMAT, GST_STIME_ARGS (stream_time),
1811           GST_STIME_ARGS (stream->current_position));
1812     }
1813
1814     /* Trigger (re)computation of the parsebin input segment */
1815     stream->compute_segment = TRUE;
1816
1817     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1818
1819     stream->discont = TRUE;
1820     stream->need_header = TRUE;
1821     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
1822   }
1823
1824   /* Check if we're done with our segment */
1825   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1826   if (demux->segment.rate > 0) {
1827     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop)
1828         && stream->current_position >= demux->segment.stop) {
1829       end_of_manifest = TRUE;
1830     }
1831   } else {
1832     if (GST_CLOCK_TIME_IS_VALID (demux->segment.start)
1833         && stream->current_position <= demux->segment.start) {
1834       end_of_manifest = TRUE;
1835     }
1836   }
1837   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1838
1839   if (end_of_manifest) {
1840     gst_adaptive_demux2_stream_end_of_manifest (stream);
1841     return FALSE;
1842   }
1843   return gst_adaptive_demux2_stream_load_a_fragment (stream);
1844 }
1845
1846 static gboolean
1847 gst_adaptive_demux2_stream_can_start (GstAdaptiveDemux2Stream * stream)
1848 {
1849   GstAdaptiveDemux *demux = stream->demux;
1850   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1851
1852   if (!klass->stream_can_start)
1853     return TRUE;
1854   return klass->stream_can_start (demux, stream);
1855 }
1856
1857 /**
1858  * gst_adaptive_demux2_stream_start:
1859  * @stream: a #GstAdaptiveDemux2Stream
1860  *
1861  * Start the given @stream. Should be called by subclasses that previously
1862  * returned %FALSE in `GstAdaptiveDemux::stream_can_start()`
1863  */
1864 void
1865 gst_adaptive_demux2_stream_start (GstAdaptiveDemux2Stream * stream)
1866 {
1867   GstAdaptiveDemux *demux;
1868
1869   g_return_if_fail (stream && stream->demux);
1870
1871   demux = stream->demux;
1872
1873   if (stream->pending_cb_id != 0 || stream->download_active) {
1874     /* There is already something active / pending on this stream */
1875     GST_LOG_OBJECT (stream, "Stream already running");
1876     return;
1877   }
1878
1879   /* Some streams require a delayed start, i.e. they need more information
1880    * before they can actually be started */
1881   if (!gst_adaptive_demux2_stream_can_start (stream)) {
1882     GST_LOG_OBJECT (stream, "Stream will be started asynchronously");
1883     return;
1884   }
1885
1886   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS) {
1887     GST_LOG_OBJECT (stream, "Stream is EOS already");
1888     return;
1889   }
1890
1891   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED ||
1892       stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART) {
1893     GST_LOG_OBJECT (stream, "Activating stream. Current state %d",
1894         stream->state);
1895     stream->cancelled = FALSE;
1896     stream->replaced = FALSE;
1897     stream->last_ret = GST_FLOW_OK;
1898
1899     if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)
1900       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
1901   }
1902
1903   GST_LOG_OBJECT (stream, "Scheduling next_download() call");
1904   stream->pending_cb_id =
1905       gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
1906       (GSourceFunc) gst_adaptive_demux2_stream_next_download,
1907       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1908 }
1909
1910 void
1911 gst_adaptive_demux2_stream_stop (GstAdaptiveDemux2Stream * stream)
1912 {
1913   GstAdaptiveDemux *demux = stream->demux;
1914
1915   GST_DEBUG_OBJECT (stream, "Stopping stream (from state %d)", stream->state);
1916   stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
1917
1918   if (stream->pending_cb_id != 0) {
1919     gst_adaptive_demux_loop_cancel_call (demux->priv->scheduler_task,
1920         stream->pending_cb_id);
1921     stream->pending_cb_id = 0;
1922   }
1923
1924   /* Cancel and drop the existing download request */
1925   downloadhelper_cancel_request (demux->download_helper,
1926       stream->download_request);
1927   download_request_unref (stream->download_request);
1928   stream->downloading_header = stream->downloading_index = FALSE;
1929   stream->download_request = download_request_new ();
1930   stream->download_active = FALSE;
1931 }
1932
1933 gboolean
1934 gst_adaptive_demux2_stream_is_running (GstAdaptiveDemux2Stream * stream)
1935 {
1936   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)
1937     return FALSE;
1938   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART)
1939     return FALSE;
1940   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS)
1941     return FALSE;
1942   return TRUE;
1943 }
1944
1945 gboolean
1946 gst_adaptive_demux2_stream_is_selected_locked (GstAdaptiveDemux2Stream * stream)
1947 {
1948   GList *tmp;
1949
1950   for (tmp = stream->tracks; tmp; tmp = tmp->next) {
1951     GstAdaptiveDemuxTrack *track = tmp->data;
1952     if (track->selected)
1953       return TRUE;
1954   }
1955
1956   return FALSE;
1957 }
1958
1959 /**
1960  * gst_adaptive_demux2_stream_is_selected:
1961  * @stream: A #GstAdaptiveDemux2Stream
1962  *
1963  * Returns: %TRUE if any of the tracks targetted by @stream is selected
1964  */
1965 gboolean
1966 gst_adaptive_demux2_stream_is_selected (GstAdaptiveDemux2Stream * stream)
1967 {
1968   gboolean ret;
1969
1970   g_return_val_if_fail (stream && stream->demux, FALSE);
1971
1972   TRACKS_LOCK (stream->demux);
1973   ret = gst_adaptive_demux2_stream_is_selected_locked (stream);
1974   TRACKS_UNLOCK (stream->demux);
1975
1976   return ret;
1977 }