adaptivedemux2: Initialize and use stream start/current position
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / ext / adaptivedemux2 / gstadaptivedemux-stream.c
1 /* GStreamer
2  *
3  * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4  *   Author: Thiago Santos <thiagoss@osg.samsung.com>
5  *
6  * Copyright (C) 2021-2022 Centricular Ltd
7  *   Author: Edward Hervey <edward@centricular.com>
8  *   Author: Jan Schmidt <jan@centricular.com>
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29
30 #include "gstadaptivedemux.h"
31 #include "gstadaptivedemux-private.h"
32
33 #include <glib/gi18n-lib.h>
34 #include <gst/app/gstappsrc.h>
35
36 GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
37 #define GST_CAT_DEFAULT adaptivedemux2_debug
38
39 static void gst_adaptive_demux2_stream_finalize (GObject * object);
40 static void gst_adaptive_demux2_stream_error (GstAdaptiveDemux2Stream * stream);
41
42 #define gst_adaptive_demux2_stream_parent_class parent_class
43 G_DEFINE_ABSTRACT_TYPE (GstAdaptiveDemux2Stream, gst_adaptive_demux2_stream,
44     GST_TYPE_OBJECT);
45
46 static void
47 gst_adaptive_demux2_stream_class_init (GstAdaptiveDemux2StreamClass * klass)
48 {
49   GObjectClass *gobject_class = (GObjectClass *) klass;
50
51   gobject_class->finalize = gst_adaptive_demux2_stream_finalize;
52 }
53
54 static GType tsdemux_type = 0;
55
56 static void
57 gst_adaptive_demux2_stream_init (GstAdaptiveDemux2Stream * stream)
58 {
59   stream->download_request = download_request_new ();
60   stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
61   stream->last_ret = GST_FLOW_OK;
62
63   stream->fragment_bitrates =
64       g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS);
65
66   stream->start_position = stream->current_position = GST_CLOCK_TIME_NONE;
67
68   gst_segment_init (&stream->parse_segment, GST_FORMAT_TIME);
69 }
70
71 /* must be called with manifest_lock taken.
72  * It will temporarily drop the manifest_lock in order to join the task.
73  * It will join only the old_streams (the demux->streams are joined by
74  * gst_adaptive_demux_stop_tasks before gst_adaptive_demux2_stream_free is
75  * called)
76  */
77 static void
78 gst_adaptive_demux2_stream_finalize (GObject * object)
79 {
80   GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) object;
81
82   GST_LOG_OBJECT (object, "Finalizing");
83
84   if (stream->download_request)
85     download_request_unref (stream->download_request);
86
87   stream->cancelled = TRUE;
88   g_clear_error (&stream->last_error);
89
90   gst_adaptive_demux2_stream_fragment_clear (&stream->fragment);
91
92   if (stream->pending_events) {
93     g_list_free_full (stream->pending_events, (GDestroyNotify) gst_event_unref);
94     stream->pending_events = NULL;
95   }
96
97   if (stream->parsebin_sink) {
98     gst_object_unref (stream->parsebin_sink);
99     stream->parsebin_sink = NULL;
100   }
101
102   if (stream->pad_added_id)
103     g_signal_handler_disconnect (stream->parsebin, stream->pad_added_id);
104   if (stream->pad_removed_id)
105     g_signal_handler_disconnect (stream->parsebin, stream->pad_removed_id);
106
107   g_free (stream->fragment_bitrates);
108
109   g_list_free_full (stream->tracks,
110       (GDestroyNotify) gst_adaptive_demux_track_unref);
111
112   if (stream->pending_caps)
113     gst_caps_unref (stream->pending_caps);
114
115   g_clear_pointer (&stream->pending_tags, gst_tag_list_unref);
116   g_clear_pointer (&stream->stream_collection, gst_object_unref);
117
118   G_OBJECT_CLASS (parent_class)->finalize (object);
119 }
120
121 /**
122  * gst_adaptive_demux2_stream_add_track:
123  * @stream: A #GstAdaptiveDemux2Stream
124  * @track: (transfer none): A #GstAdaptiveDemuxTrack to assign to the @stream
125  *
126  * This function is called when a subclass knows of a target @track that this
127  * @stream can provide.
128  */
129 gboolean
130 gst_adaptive_demux2_stream_add_track (GstAdaptiveDemux2Stream * stream,
131     GstAdaptiveDemuxTrack * track)
132 {
133   g_return_val_if_fail (track != NULL, FALSE);
134
135   GST_DEBUG_OBJECT (stream->demux, "stream:%p track:%s", stream,
136       track->stream_id);
137   if (g_list_find (stream->tracks, track)) {
138     GST_DEBUG_OBJECT (stream->demux,
139         "track '%s' already handled by this stream", track->stream_id);
140     return FALSE;
141   }
142
143   stream->tracks =
144       g_list_append (stream->tracks, gst_adaptive_demux_track_ref (track));
145   if (stream->demux) {
146     g_assert (stream->period);
147     gst_adaptive_demux_period_add_track (stream->period, track);
148   }
149   return TRUE;
150 }
151
152 static gboolean
153 gst_adaptive_demux2_stream_next_download (GstAdaptiveDemux2Stream * stream);
154 static gboolean
155 gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream);
156 static void
157 gst_adaptive_demux2_stream_handle_playlist_eos (GstAdaptiveDemux2Stream *
158     stream);
159 static GstFlowReturn
160 gst_adaptive_demux2_stream_begin_download_uri (GstAdaptiveDemux * demux,
161     GstAdaptiveDemux2Stream * stream, const gchar * uri, gint64 start,
162     gint64 end);
163
164 #ifndef GST_DISABLE_GST_DEBUG
165 static const char *
166 uritype (GstAdaptiveDemux2Stream * s)
167 {
168   if (s->downloading_header)
169     return "header";
170   if (s->downloading_index)
171     return "index";
172   return "fragment";
173 }
174 #endif
175
176 /* Schedules another chunked download (returns TRUE) or FALSE if no more chunks */
177 static gboolean
178 schedule_another_chunk (GstAdaptiveDemux2Stream * stream)
179 {
180   GstAdaptiveDemux *demux = stream->demux;
181   DownloadRequest *request = stream->download_request;
182   GstFlowReturn ret;
183
184   gchar *uri = request->uri;
185   gint64 range_start = request->range_start;
186   gint64 range_end = request->range_end;
187   gint64 chunk_size;
188   gint64 chunk_end;
189
190   if (range_end == -1)
191     return FALSE;               /* This was a request to the end, no more to load */
192
193   /* The size of the request that just completed: */
194   chunk_size = range_end + 1 - range_start;
195
196   if (request->content_received < chunk_size)
197     return FALSE;               /* Short read - we're done */
198
199   /* Accumulate the data we just fetched, to figure out the next
200    * request start position and update the target chunk size from
201    * the updated stream fragment info */
202   range_start += chunk_size;
203   range_end = stream->fragment.range_end;
204   chunk_size = stream->fragment.chunk_size;
205
206   if (chunk_size == 0)
207     return FALSE;               /* Sub-class doesn't want another chunk */
208
209   /* HTTP ranges are inclusive for the end */
210   if (chunk_size != -1) {
211     chunk_end = range_start + chunk_size - 1;
212     if (range_end != -1 && range_end < chunk_end)
213       chunk_end = range_end;
214   } else {
215     chunk_end = range_end;
216   }
217
218   GST_DEBUG_OBJECT (stream,
219       "Starting next chunk %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT
220       " chunk_size %" G_GINT64_FORMAT, uri, range_start, chunk_end, chunk_size);
221
222   ret =
223       gst_adaptive_demux2_stream_begin_download_uri (demux, stream, uri,
224       range_start, chunk_end);
225   if (ret != GST_FLOW_OK) {
226     GST_DEBUG_OBJECT (stream,
227         "Stopping stream due to begin download failure - ret %s",
228         gst_flow_get_name (ret));
229     gst_adaptive_demux2_stream_stop (stream);
230     return FALSE;
231   }
232
233   return TRUE;
234 }
235
236 static void
237 drain_inactive_tracks (GstAdaptiveDemux * demux,
238     GstAdaptiveDemux2Stream * stream)
239 {
240   GList *iter;
241
242   TRACKS_LOCK (demux);
243   for (iter = stream->tracks; iter; iter = iter->next) {
244     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
245     if (!track->selected) {
246       gst_adaptive_demux_track_drain_to (track,
247           demux->priv->global_output_position);
248     }
249   }
250
251   TRACKS_UNLOCK (demux);
252 }
253
254 /* Called to complete a download, either due to failure or completion
255  * Should set up the next download if necessary */
256 static void
257 gst_adaptive_demux2_stream_finish_download (GstAdaptiveDemux2Stream *
258     stream, GstFlowReturn ret, GError * err)
259 {
260   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux);
261   GstAdaptiveDemux *demux = stream->demux;
262
263   GST_DEBUG_OBJECT (stream,
264       "%s download finish: %d %s - err: %p", uritype (stream), ret,
265       gst_flow_get_name (ret), err);
266
267   stream->download_finished = TRUE;
268
269   /* finish_fragment might call gst_adaptive_demux2_stream_advance_fragment,
270    * which can look at the last_ret - so make sure it's stored before calling that.
271    * Also, for not-linked or other errors passed in that are going to make
272    * this stream stop, we'll need to store it */
273   stream->last_ret = ret;
274
275   if (err) {
276     g_clear_error (&stream->last_error);
277     stream->last_error = g_error_copy (err);
278   }
279
280   /* For actual errors, stop now, no need to call finish_fragment and get
281    * confused if it returns a non-error status, but if EOS was passed in,
282    * continue and check whether finish_fragment() says we've finished
283    * the whole manifest or just this fragment */
284   if (ret < 0 && ret != GST_FLOW_EOS) {
285     GST_INFO_OBJECT (stream,
286         "Stopping stream due to error ret %s", gst_flow_get_name (ret));
287     gst_adaptive_demux2_stream_stop (stream);
288     return;
289   }
290
291   /* Handle all the possible flow returns here: */
292   if (ret == GST_ADAPTIVE_DEMUX_FLOW_END_OF_FRAGMENT) {
293     /* The sub-class wants to stop the fragment immediately */
294     stream->fragment.finished = TRUE;
295     ret = klass->finish_fragment (demux, stream);
296
297     GST_DEBUG_OBJECT (stream, "finish_fragment ret %d %s", ret,
298         gst_flow_get_name (ret));
299   } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_RESTART_FRAGMENT) {
300     GST_DEBUG_OBJECT (stream, "Restarting download as requested");
301     /* Just mark the fragment as finished */
302     stream->fragment.finished = TRUE;
303     ret = GST_FLOW_OK;
304   } else if (!klass->need_another_chunk || stream->fragment.chunk_size == -1
305       || !klass->need_another_chunk (stream)
306       || stream->fragment.chunk_size == 0) {
307     stream->fragment.finished = TRUE;
308     ret = klass->finish_fragment (stream->demux, stream);
309
310     GST_DEBUG_OBJECT (stream, "finish_fragment ret %d %s", ret,
311         gst_flow_get_name (ret));
312   } else if (stream->fragment.chunk_size != 0
313       && schedule_another_chunk (stream)) {
314     /* Another download has already begun, no need to queue anything below */
315     return;
316   }
317
318   /* For HLS, we might be enqueueing data into tracks that aren't
319    * selected. Drain those ones out */
320   drain_inactive_tracks (stream->demux, stream);
321
322   /* Now that we've called finish_fragment we can clear these flags the
323    * sub-class might have checked */
324   if (stream->downloading_header) {
325     stream->need_header = FALSE;
326     stream->downloading_header = FALSE;
327   } else if (stream->downloading_index) {
328     stream->need_index = FALSE;
329     stream->downloading_index = FALSE;
330     /* Restart the fragment again now that header + index were loaded
331      * so that get_fragment_info() will be called again */
332     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
333   } else {
334     /* Finishing a fragment data download. Try for another */
335     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
336   }
337
338   /* if GST_FLOW_EOS was passed in that means this download is finished,
339    * but it's the result returned from finish_fragment() we really care
340    * about, as that tells us if the manifest has run out of fragments
341    * to load */
342   if (ret == GST_FLOW_EOS) {
343     stream->last_ret = ret;
344
345     gst_adaptive_demux2_stream_handle_playlist_eos (stream);
346     return;
347   }
348
349   /* Now finally, if ret is anything other than success, we should stop this
350    * stream */
351   if (ret < 0) {
352     GST_DEBUG_OBJECT (stream,
353         "Stopping stream due to finish fragment ret %s",
354         gst_flow_get_name (ret));
355     gst_adaptive_demux2_stream_stop (stream);
356     return;
357   }
358
359   /* Clear the last_ret marker before starting a fresh download */
360   stream->last_ret = GST_FLOW_OK;
361
362   GST_LOG_OBJECT (stream, "Scheduling next_download() call");
363   stream->pending_cb_id =
364       gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
365       (GSourceFunc) gst_adaptive_demux2_stream_next_download,
366       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
367 }
368
369 /* Must be called from the scheduler context */
370 void
371 gst_adaptive_demux2_stream_parse_error (GstAdaptiveDemux2Stream * stream,
372     GError * err)
373 {
374   GstAdaptiveDemux *demux = stream->demux;
375
376   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
377     return;
378
379   downloadhelper_cancel_request (demux->download_helper,
380       stream->download_request);
381
382   /* cancellation is async, so recycle our download request to avoid races */
383   download_request_unref (stream->download_request);
384   stream->download_request = download_request_new ();
385
386   gst_adaptive_demux2_stream_finish_download (stream, GST_FLOW_CUSTOM_ERROR,
387       err);
388 }
389
390 static void
391 gst_adaptive_demux2_stream_prepare_segment (GstAdaptiveDemux * demux,
392     GstAdaptiveDemux2Stream * stream, gboolean first_and_live)
393 {
394   GstClockTime period_start = gst_adaptive_demux_get_period_start_time (demux);
395   GstClockTime offset =
396       gst_adaptive_demux2_stream_get_presentation_offset (demux, stream);
397
398   stream->parse_segment = demux->segment;
399
400   /* The demuxer segment is just built from seek events, but for each stream
401    * we have to adjust segments according to the current period and the
402    * stream specific presentation time offset.
403    *
404    * For each period, buffer timestamps start again from 0. Additionally the
405    * buffer timestamps are shifted by the stream specific presentation time
406    * offset, so the first buffer timestamp of a period is 0 + presentation
407    * time offset. If the stream contains timestamps itself, this is also
408    * supposed to be the presentation time stored inside the stream.
409    *
410    * The stream time over periods is supposed to be continuous, that is the
411    * buffer timestamp 0 + presentation time offset should map to the start
412    * time of the current period.
413    *
414    *
415    * The adjustment of the stream segments as such works the following.
416    *
417    * If the demuxer segment start is bigger than the period start, this
418    * means that we have to drop some media at the beginning of the current
419    * period, e.g. because a seek into the middle of the period has
420    * happened. The amount of media to drop is the difference between the
421    * period start and the demuxer segment start, and as each period starts
422    * again from 0, this difference is going to be the actual stream's
423    * segment start. As all timestamps of the stream are shifted by the
424    * presentation time offset, we will also have to move the segment start
425    * by that offset.
426    *
427    * Likewise, the demuxer segment stop value is adjusted in the same
428    * fashion.
429    *
430    * Now the running time and stream time at the stream's segment start has
431    * to be the one that is stored inside the demuxer's segment, which means
432    * that segment.base and segment.time have to be copied over (done just
433    * above)
434    *
435    *
436    * If the demuxer segment start is smaller than the period start time,
437    * this means that the whole period is inside the segment. As each period
438    * starts timestamps from 0, and additionally timestamps are shifted by
439    * the presentation time offset, the stream's first timestamp (and as such
440    * the stream's segment start) has to be the presentation time offset.
441    * The stream time at the segment start is supposed to be the stream time
442    * of the period start according to the demuxer segment, so the stream
443    * segment's time would be set to that. The same goes for the stream
444    * segment's base, which is supposed to be the running time of the period
445    * start according to the demuxer's segment.
446    *
447    * The same logic applies for negative rates with the segment stop and
448    * the period stop time (which gets clamped).
449    *
450    *
451    * For the first case where not the complete period is inside the segment,
452    * the segment time and base as calculated by the second case would be
453    * equivalent.
454    */
455   GST_DEBUG_OBJECT (demux, "Using demux segment %" GST_SEGMENT_FORMAT,
456       &demux->segment);
457   GST_DEBUG_OBJECT (demux,
458       "period_start: %" GST_TIME_FORMAT " offset: %" GST_TIME_FORMAT,
459       GST_TIME_ARGS (period_start), GST_TIME_ARGS (offset));
460   /* note for readers:
461    * Since stream->parse_segment is initially a copy of demux->segment,
462    * only the values that need updating are modified below. */
463   if (first_and_live) {
464     /* If first and live, demuxer did seek to the current position already */
465     stream->parse_segment.start = demux->segment.start - period_start + offset;
466     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
467       stream->parse_segment.stop = demux->segment.stop - period_start + offset;
468     /* FIXME : Do we need to handle negative rates for this ? */
469     stream->parse_segment.position = stream->parse_segment.start;
470   } else if (demux->segment.start > period_start) {
471     /* seek within a period */
472     stream->parse_segment.start = demux->segment.start - period_start + offset;
473     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
474       stream->parse_segment.stop = demux->segment.stop - period_start + offset;
475     if (stream->parse_segment.rate >= 0)
476       stream->parse_segment.position = offset;
477     else
478       stream->parse_segment.position = stream->parse_segment.stop;
479   } else {
480     stream->parse_segment.start = offset;
481     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
482       stream->parse_segment.stop = demux->segment.stop - period_start + offset;
483     if (stream->parse_segment.rate >= 0) {
484       stream->parse_segment.position = offset;
485       stream->parse_segment.base =
486           gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
487           period_start);
488     } else {
489       stream->parse_segment.position = stream->parse_segment.stop;
490       stream->parse_segment.base =
491           gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
492           period_start + demux->segment.stop - demux->segment.start);
493     }
494     stream->parse_segment.time =
495         gst_segment_to_stream_time (&demux->segment, GST_FORMAT_TIME,
496         period_start);
497   }
498
499   stream->send_segment = TRUE;
500
501   GST_DEBUG_OBJECT (stream, "Prepared segment %" GST_SEGMENT_FORMAT,
502       &stream->parse_segment);
503 }
504
505 /* Segment lock hold */
506 static void
507 update_buffer_pts_and_demux_position_locked (GstAdaptiveDemux * demux,
508     GstAdaptiveDemux2Stream * stream, GstBuffer * buffer)
509 {
510   GstClockTimeDiff pos;
511
512   GST_DEBUG_OBJECT (stream, "stream->fragment.stream_time %" GST_STIME_FORMAT,
513       GST_STIME_ARGS (stream->fragment.stream_time));
514
515   pos = stream->fragment.stream_time;
516
517   if (GST_CLOCK_STIME_IS_VALID (pos)) {
518     GstClockTime offset =
519         gst_adaptive_demux2_stream_get_presentation_offset (demux, stream);
520
521     pos += offset;
522
523     if (pos < 0) {
524       GST_WARNING_OBJECT (stream, "Clamping segment and buffer position to 0");
525       pos = 0;
526     }
527
528     GST_BUFFER_PTS (buffer) = pos;
529   } else {
530     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
531   }
532
533   GST_DEBUG_OBJECT (stream, "Buffer/stream position is now: %" GST_TIME_FORMAT,
534       GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
535 }
536
537 /* Must be called from the scheduler context */
538 GstFlowReturn
539 gst_adaptive_demux2_stream_push_buffer (GstAdaptiveDemux2Stream * stream,
540     GstBuffer * buffer)
541 {
542   GstAdaptiveDemux *demux = stream->demux;
543   GstFlowReturn ret = GST_FLOW_OK;
544   gboolean discont = FALSE;
545   /* Pending events */
546   GstEvent *pending_caps = NULL, *pending_segment = NULL, *pending_tags =
547       NULL, *stream_start = NULL, *buffer_gap = NULL;
548   GList *pending_events = NULL;
549
550   if (stream->compute_segment) {
551     gst_adaptive_demux2_stream_prepare_segment (demux, stream,
552         stream->first_and_live);
553     stream->compute_segment = FALSE;
554     stream->first_and_live = FALSE;
555   }
556
557   if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DROPPABLE)) {
558     GST_DEBUG_OBJECT (stream, "Creating gap event for droppable buffer");
559     buffer_gap =
560         gst_event_new_gap (GST_BUFFER_PTS (buffer),
561         GST_BUFFER_DURATION (buffer));
562   }
563
564   if (stream->first_fragment_buffer) {
565     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
566     if (demux->segment.rate < 0)
567       /* Set DISCONT flag for every first buffer in reverse playback mode
568        * as each fragment for its own has to be reversed */
569       discont = TRUE;
570     update_buffer_pts_and_demux_position_locked (demux, stream, buffer);
571     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
572
573     GST_LOG_OBJECT (stream, "Handling initial buffer %" GST_PTR_FORMAT, buffer);
574
575     /* Do we need to inject STREAM_START and SEGMENT events ?
576      *
577      * This can happen when a stream is restarted, and also when switching to a
578      * variant which needs a header (in which case downloading_header will be
579      * TRUE)
580      */
581     if (G_UNLIKELY (stream->send_segment || stream->downloading_header)) {
582       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
583       pending_segment = gst_event_new_segment (&stream->parse_segment);
584       gst_event_set_seqnum (pending_segment, demux->priv->segment_seqnum);
585       stream->send_segment = FALSE;
586       GST_DEBUG_OBJECT (stream, "Sending %" GST_PTR_FORMAT, pending_segment);
587       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
588       stream_start = gst_event_new_stream_start ("bogus");
589       if (demux->have_group_id)
590         gst_event_set_group_id (stream_start, demux->group_id);
591     }
592   } else {
593     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
594   }
595   stream->first_fragment_buffer = FALSE;
596
597   if (stream->discont) {
598     discont = TRUE;
599     stream->discont = FALSE;
600   }
601
602   if (discont) {
603     GST_DEBUG_OBJECT (stream, "Marking fragment as discontinuous");
604     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
605   } else {
606     GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
607   }
608
609   GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
610   GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
611   if (G_UNLIKELY (stream->pending_caps)) {
612     pending_caps = gst_event_new_caps (stream->pending_caps);
613     gst_caps_unref (stream->pending_caps);
614     stream->pending_caps = NULL;
615   }
616
617   if (G_UNLIKELY (stream->pending_tags)) {
618     GstTagList *tags = stream->pending_tags;
619
620     stream->pending_tags = NULL;
621
622     if (tags)
623       pending_tags = gst_event_new_tag (tags);
624   }
625   if (G_UNLIKELY (stream->pending_events)) {
626     pending_events = stream->pending_events;
627     stream->pending_events = NULL;
628   }
629
630   /* Do not push events or buffers holding the manifest lock */
631   if (G_UNLIKELY (stream_start)) {
632     GST_DEBUG_OBJECT (stream,
633         "Setting stream start: %" GST_PTR_FORMAT, stream_start);
634     gst_pad_send_event (stream->parsebin_sink, stream_start);
635   }
636   if (G_UNLIKELY (pending_caps)) {
637     GST_DEBUG_OBJECT (stream,
638         "Setting pending caps: %" GST_PTR_FORMAT, pending_caps);
639     gst_pad_send_event (stream->parsebin_sink, pending_caps);
640   }
641   if (G_UNLIKELY (pending_segment)) {
642     GST_DEBUG_OBJECT (stream,
643         "Sending pending seg: %" GST_PTR_FORMAT, pending_segment);
644     gst_pad_send_event (stream->parsebin_sink, pending_segment);
645   }
646   if (G_UNLIKELY (pending_tags)) {
647     GST_DEBUG_OBJECT (stream,
648         "Sending pending tags: %" GST_PTR_FORMAT, pending_tags);
649     gst_pad_send_event (stream->parsebin_sink, pending_tags);
650   }
651   while (pending_events != NULL) {
652     GstEvent *event = pending_events->data;
653
654     GST_DEBUG_OBJECT (stream, "Sending pending event: %" GST_PTR_FORMAT, event);
655     if (!gst_pad_send_event (stream->parsebin_sink, event))
656       GST_ERROR_OBJECT (stream, "Failed to send pending event");
657
658     pending_events = g_list_delete_link (pending_events, pending_events);
659   }
660
661   GST_DEBUG_OBJECT (stream,
662       "About to push buffer of size %" G_GSIZE_FORMAT " offset %"
663       G_GUINT64_FORMAT, gst_buffer_get_size (buffer),
664       GST_BUFFER_OFFSET (buffer));
665
666   ret = gst_pad_chain (stream->parsebin_sink, buffer);
667
668   if (buffer_gap) {
669     GST_DEBUG_OBJECT (stream, "Sending %" GST_PTR_FORMAT, buffer_gap);
670     gst_pad_send_event (stream->parsebin_sink, buffer_gap);
671   }
672
673   if (G_UNLIKELY (stream->cancelled)) {
674     GST_LOG_OBJECT (demux, "Stream was cancelled");
675     return GST_FLOW_FLUSHING;
676   }
677
678   GST_LOG_OBJECT (stream, "Push result: %d %s", ret, gst_flow_get_name (ret));
679
680   return ret;
681 }
682
683 static GstFlowReturn
684 gst_adaptive_demux2_stream_parse_buffer (GstAdaptiveDemux2Stream * stream,
685     GstBuffer * buffer)
686 {
687   GstAdaptiveDemux *demux = stream->demux;
688   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
689   GstFlowReturn ret = GST_FLOW_OK;
690
691   /* do not make any changes if the stream is cancelled */
692   if (G_UNLIKELY (stream->cancelled)) {
693     gst_buffer_unref (buffer);
694     return GST_FLOW_FLUSHING;
695   }
696
697   /* starting_fragment is set to TRUE at the beginning of
698    * _stream_download_fragment()
699    * /!\ If there is a header/index being downloaded, then this will
700    * be TRUE for the first one ... but FALSE for the remaining ones,
701    * including the *actual* fragment ! */
702   if (stream->starting_fragment) {
703     stream->starting_fragment = FALSE;
704     if (klass->start_fragment != NULL && !klass->start_fragment (demux, stream))
705       return GST_FLOW_ERROR;
706   }
707
708   stream->download_total_bytes += gst_buffer_get_size (buffer);
709
710   GST_TRACE_OBJECT (stream,
711       "Received %s buffer of size %" G_GSIZE_FORMAT, uritype (stream),
712       gst_buffer_get_size (buffer));
713
714   ret = klass->data_received (demux, stream, buffer);
715
716   if (ret != GST_FLOW_OK) {
717     GST_DEBUG_OBJECT (stream, "data_received returned %s",
718         gst_flow_get_name (ret));
719
720     if (ret == GST_FLOW_FLUSHING) {
721       /* do not make any changes if the stream is cancelled */
722       if (G_UNLIKELY (stream->cancelled)) {
723         return ret;
724       }
725     }
726
727     if (ret < GST_FLOW_EOS) {
728       GstEvent *eos = gst_event_new_eos ();
729       GST_ELEMENT_FLOW_ERROR (demux, ret);
730
731       GST_DEBUG_OBJECT (stream, "Pushing EOS to parser");
732
733       /* TODO push this on all pads */
734       gst_event_set_seqnum (eos, stream->demux->priv->segment_seqnum);
735       gst_pad_send_event (stream->parsebin_sink, eos);
736       ret = GST_FLOW_ERROR;
737
738       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_ERRORED;
739     }
740   }
741
742   return ret;
743 }
744
745 /* Calculate the low and high download buffering watermarks
746  * in time as MAX (low-watermark-time, low-watermark-fragments) and
747  * MIN (high-watermark-time, high-watermark-fragments) respectively
748  */
749 static void
750 calculate_track_thresholds (GstAdaptiveDemux * demux,
751     GstClockTime fragment_duration, GstClockTime * low_threshold,
752     GstClockTime * high_threshold)
753 {
754   GST_OBJECT_LOCK (demux);
755   *low_threshold = demux->buffering_low_watermark_fragments * fragment_duration;
756   if (*low_threshold == 0 ||
757       (demux->buffering_low_watermark_time != 0
758           && demux->buffering_low_watermark_time > *low_threshold)) {
759     *low_threshold = demux->buffering_low_watermark_time;
760   }
761
762   *high_threshold =
763       demux->buffering_high_watermark_fragments * fragment_duration;
764   if (*high_threshold == 0 || (demux->buffering_high_watermark_time != 0
765           && demux->buffering_high_watermark_time < *high_threshold)) {
766     *high_threshold = demux->buffering_high_watermark_time;
767   }
768
769   /* Make sure the low and high thresholds are less than the maximum buffering
770    * time */
771   if (*high_threshold == 0 ||
772       (demux->max_buffering_time != 0
773           && demux->max_buffering_time < *high_threshold)) {
774     *high_threshold = demux->max_buffering_time;
775   }
776
777   if (*low_threshold == 0 ||
778       (demux->max_buffering_time != 0
779           && demux->max_buffering_time < *low_threshold)) {
780     *low_threshold = demux->max_buffering_time;
781   }
782
783   /* Make sure the high threshold is higher than (or equal to) the low threshold.
784    * It's OK if they are the same, as the minimum download is 1 fragment */
785   if (*high_threshold == 0 ||
786       (*low_threshold != 0 && *low_threshold > *high_threshold)) {
787     *high_threshold = *low_threshold;
788   }
789   GST_OBJECT_UNLOCK (demux);
790 }
791
792 static gboolean
793 gst_adaptive_demux2_stream_wait_for_output_space (GstAdaptiveDemux * demux,
794     GstAdaptiveDemux2Stream * stream, GstClockTime fragment_duration)
795 {
796   gboolean need_to_wait = TRUE;
797   gboolean have_any_tracks = FALSE;
798   gboolean have_active_tracks = FALSE;
799   gboolean have_filled_inactive = FALSE;
800   gboolean update_buffering = FALSE;
801
802   GstClockTime low_threshold = 0, high_threshold = 0;
803   GList *iter;
804
805   calculate_track_thresholds (demux, fragment_duration,
806       &low_threshold, &high_threshold);
807
808   /* If there are no tracks at all, don't wait. If there are no active
809    * tracks, keep filling until at least one track is full. If there
810    * are active tracks, require that they are all full */
811   TRACKS_LOCK (demux);
812   for (iter = stream->tracks; iter; iter = iter->next) {
813     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
814
815     /* Update the buffering threshold */
816     if (low_threshold != track->buffering_threshold) {
817       /* The buffering threshold for this track changed, make sure to
818        * re-check buffering status */
819       update_buffering = TRUE;
820       track->buffering_threshold = low_threshold;
821     }
822
823     have_any_tracks = TRUE;
824     if (track->active)
825       have_active_tracks = TRUE;
826
827     if (track->level_time < high_threshold) {
828       if (track->active) {
829         need_to_wait = FALSE;
830         GST_DEBUG_OBJECT (demux,
831             "stream %p track %s has level %" GST_TIME_FORMAT
832             " - needs more data (target %" GST_TIME_FORMAT
833             ") (fragment duration %" GST_TIME_FORMAT ")", stream,
834             track->stream_id, GST_TIME_ARGS (track->level_time),
835             GST_TIME_ARGS (high_threshold), GST_TIME_ARGS (fragment_duration));
836         continue;
837       }
838     } else if (!track->active) {        /* track is over threshold and inactive */
839       have_filled_inactive = TRUE;
840     }
841
842     GST_DEBUG_OBJECT (demux,
843         "stream %p track %s active (%d) has level %" GST_TIME_FORMAT,
844         stream, track->stream_id, track->active,
845         GST_TIME_ARGS (track->level_time));
846   }
847
848   /* If there are no tracks, don't wait (we might need data to create them),
849    * or if there are active tracks that need more data to hit the threshold,
850    * don't wait. Otherwise it means all active tracks are full and we should wait */
851   if (!have_any_tracks) {
852     GST_DEBUG_OBJECT (demux, "stream %p has no tracks - not waiting", stream);
853     need_to_wait = FALSE;
854   } else if (!have_active_tracks && !have_filled_inactive) {
855     GST_DEBUG_OBJECT (demux,
856         "stream %p has inactive tracks that need more data - not waiting",
857         stream);
858     need_to_wait = FALSE;
859   }
860
861   if (need_to_wait) {
862     for (iter = stream->tracks; iter; iter = iter->next) {
863       GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
864       track->waiting_del_level = high_threshold;
865       GST_DEBUG_OBJECT (demux,
866           "Waiting for queued data on stream %p track %s to drop below %"
867           GST_TIME_FORMAT " (fragment duration %" GST_TIME_FORMAT ")",
868           stream, track->stream_id, GST_TIME_ARGS (track->waiting_del_level),
869           GST_TIME_ARGS (fragment_duration));
870     }
871   }
872
873   if (update_buffering) {
874     demux_update_buffering_locked (demux);
875     demux_post_buffering_locked (demux);
876   }
877
878   TRACKS_UNLOCK (demux);
879
880   return need_to_wait;
881 }
882
883 static GstAdaptiveDemuxTrack *
884 match_parsebin_to_track (GstAdaptiveDemux2Stream * stream, GstPad * pad)
885 {
886   GList *tmp;
887   GstAdaptiveDemuxTrack *found_track = NULL, *first_matched_track = NULL;
888   gint num_possible_tracks = 0;
889   GstStream *gst_stream;
890   const gchar *internal_stream_id;
891   GstStreamType stream_type;
892
893   gst_stream = gst_pad_get_stream (pad);
894
895   /* FIXME: Edward: Added assertion because I don't see in what cases we would
896    * end up with a pad from parsebin which wouldn't have an associated
897    * GstStream. */
898   g_assert (gst_stream);
899
900   internal_stream_id = gst_stream_get_stream_id (gst_stream);
901   stream_type = gst_stream_get_stream_type (gst_stream);
902
903   GST_DEBUG_OBJECT (pad,
904       "Trying to match pad from parsebin with internal streamid %s and caps %"
905       GST_PTR_FORMAT, GST_STR_NULL (internal_stream_id),
906       gst_stream_get_caps (gst_stream));
907
908   /* Try to match directly by the track's pending upstream_stream_id */
909   for (tmp = stream->tracks; tmp; tmp = tmp->next) {
910     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
911
912     if (stream_type != GST_STREAM_TYPE_UNKNOWN && track->type != stream_type)
913       continue;
914
915     GST_DEBUG_OBJECT (pad, "track upstream_stream_id: %s",
916         track->upstream_stream_id);
917
918     if (first_matched_track == NULL)
919       first_matched_track = track;
920     num_possible_tracks++;
921
922     /* If this track has a desired upstream stream id, match on it */
923     if (track->upstream_stream_id == NULL ||
924         g_strcmp0 (track->upstream_stream_id, internal_stream_id)) {
925       /* This is not the track for this pad */
926       continue;
927     }
928
929     /* Remove pending upstream id (we have matched it for the pending
930      * stream_id) */
931     g_free (track->upstream_stream_id);
932     track->upstream_stream_id = NULL;
933     found_track = track;
934     break;
935   }
936
937   if (found_track == NULL) {
938     /* If we arrive here, it means the stream is switching pads after
939      * the stream has already started running */
940     /* No track is currently waiting for this particular stream id -
941      * try and match an existing linked track. If there's only 1 possible,
942      * take it. */
943     if (num_possible_tracks == 1 && first_matched_track != NULL) {
944       GST_LOG_OBJECT (pad, "Only one possible track to link to");
945       found_track = first_matched_track;
946     }
947   }
948
949   if (found_track == NULL) {
950     /* TODO: There are multiple possible tracks, need to match based
951      * on language code and caps. Have you found a stream like this? */
952     GST_FIXME_OBJECT (pad, "Need to match track based on caps and language");
953   }
954
955   if (found_track != NULL) {
956     if (!gst_pad_is_linked (found_track->sinkpad)) {
957       GST_LOG_OBJECT (pad, "Linking to track pad %" GST_PTR_FORMAT,
958           found_track->sinkpad);
959
960       if (gst_pad_link (pad, found_track->sinkpad) != GST_PAD_LINK_OK) {
961         GST_ERROR_OBJECT (pad, "Couldn't connect to track sinkpad");
962         /* FIXME : Do something if we can't link ? */
963       }
964     } else {
965       /* Store pad as pending link */
966       GST_LOG_OBJECT (pad,
967           "Remembering pad to be linked when current pad is unlinked");
968       g_assert (found_track->pending_srcpad == NULL);
969       found_track->pending_srcpad = gst_object_ref (pad);
970     }
971   }
972
973   if (gst_stream)
974     gst_object_unref (gst_stream);
975
976   return found_track;
977 }
978
979 static void
980 parsebin_pad_removed_cb (GstElement * parsebin, GstPad * pad,
981     GstAdaptiveDemux2Stream * stream)
982 {
983   GList *iter;
984   GST_DEBUG_OBJECT (stream, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
985
986   /* Remove from pending source pad */
987   TRACKS_LOCK (stream->demux);
988   for (iter = stream->tracks; iter; iter = iter->next) {
989     GstAdaptiveDemuxTrack *track = iter->data;
990     if (track->pending_srcpad == pad) {
991       gst_object_unref (track->pending_srcpad);
992       track->pending_srcpad = NULL;
993       break;
994     }
995   }
996   TRACKS_UNLOCK (stream->demux);
997 }
998
999 static void
1000 parsebin_pad_added_cb (GstElement * parsebin, GstPad * pad,
1001     GstAdaptiveDemux2Stream * stream)
1002 {
1003   if (!GST_PAD_IS_SRC (pad))
1004     return;
1005
1006   GST_DEBUG_OBJECT (stream, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1007
1008   if (!match_parsebin_to_track (stream, pad))
1009     GST_WARNING_OBJECT (pad, "Found no track to handle pad");
1010
1011   GST_DEBUG_OBJECT (stream->demux, "Done linking");
1012 }
1013
1014 static void
1015 parsebin_deep_element_added_cb (GstBin * parsebin, GstBin * unused,
1016     GstElement * element, GstAdaptiveDemux * demux)
1017 {
1018   if (G_OBJECT_TYPE (element) == tsdemux_type) {
1019     GST_DEBUG_OBJECT (demux, "Overriding tsdemux ignore-pcr to TRUE");
1020     g_object_set (element, "ignore-pcr", TRUE, NULL);
1021   }
1022 }
1023
1024 /* must be called with manifest_lock taken */
1025 static gboolean
1026 gst_adaptive_demux2_stream_create_parser (GstAdaptiveDemux2Stream * stream)
1027 {
1028   GstAdaptiveDemux *demux = stream->demux;
1029
1030   if (stream->parsebin == NULL) {
1031     GstEvent *event;
1032
1033     GST_DEBUG_OBJECT (demux, "Setting up new parsing source");
1034
1035     /* Workaround to detect if tsdemux is being used */
1036     if (tsdemux_type == 0) {
1037       GstElement *element = gst_element_factory_make ("tsdemux", NULL);
1038       if (element) {
1039         tsdemux_type = G_OBJECT_TYPE (element);
1040         gst_object_unref (element);
1041       }
1042     }
1043
1044     stream->parsebin = gst_element_factory_make ("parsebin", NULL);
1045     if (tsdemux_type)
1046       g_signal_connect (stream->parsebin, "deep-element-added",
1047           (GCallback) parsebin_deep_element_added_cb, demux);
1048     gst_bin_add (GST_BIN_CAST (demux), stream->parsebin);
1049     stream->parsebin_sink =
1050         gst_element_get_static_pad (stream->parsebin, "sink");
1051     stream->pad_added_id = g_signal_connect (stream->parsebin, "pad-added",
1052         G_CALLBACK (parsebin_pad_added_cb), stream);
1053     stream->pad_removed_id = g_signal_connect (stream->parsebin, "pad-removed",
1054         G_CALLBACK (parsebin_pad_removed_cb), stream);
1055
1056     event = gst_event_new_stream_start ("bogus");
1057     if (demux->have_group_id)
1058       gst_event_set_group_id (event, demux->group_id);
1059
1060     gst_pad_send_event (stream->parsebin_sink, event);
1061
1062     /* Not sure if these need to be outside the manifest lock: */
1063     gst_element_sync_state_with_parent (stream->parsebin);
1064     stream->last_status_code = 200;     /* default to OK */
1065   }
1066   return TRUE;
1067 }
1068
1069 static void
1070 on_download_cancellation (DownloadRequest * request, DownloadRequestState state,
1071     GstAdaptiveDemux2Stream * stream)
1072 {
1073 }
1074
1075 static void
1076 on_download_error (DownloadRequest * request, DownloadRequestState state,
1077     GstAdaptiveDemux2Stream * stream)
1078 {
1079   GstAdaptiveDemux *demux = stream->demux;
1080   guint last_status_code = request->status_code;
1081   gboolean live;
1082
1083   stream->download_active = FALSE;
1084   stream->last_status_code = last_status_code;
1085
1086   GST_DEBUG_OBJECT (stream,
1087       "Download finished with error, request state %d http status %u, dc %d",
1088       request->state, last_status_code, stream->download_error_count);
1089
1090   live = gst_adaptive_demux_is_live (demux);
1091   if (((last_status_code / 100 == 4 && live)
1092           || last_status_code / 100 == 5)) {
1093     /* 4xx/5xx */
1094     /* if current position is before available start, switch to next */
1095     if (!gst_adaptive_demux2_stream_has_next_fragment (demux, stream))
1096       goto flushing;
1097
1098     if (live) {
1099       gint64 range_start, range_stop;
1100
1101       if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
1102               &range_stop))
1103         goto flushing;
1104
1105       if (demux->segment.position < range_start) {
1106         GstFlowReturn ret;
1107
1108         GST_DEBUG_OBJECT (stream, "Retrying once with next segment");
1109         gst_adaptive_demux2_stream_finish_download (stream, GST_FLOW_EOS, NULL);
1110
1111         GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1112
1113         ret = gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
1114         GST_DEBUG_OBJECT (stream, "update_fragment_info ret: %s",
1115             gst_flow_get_name (ret));
1116
1117         if (ret == GST_FLOW_OK)
1118           goto again;
1119
1120       } else if (demux->segment.position > range_stop) {
1121         /* wait a bit to be in range, we don't have any locks at that point */
1122         GstClockTime wait_time =
1123             gst_adaptive_demux2_stream_get_fragment_waiting_time (demux,
1124             stream);
1125         if (wait_time > 0) {
1126           GST_DEBUG_OBJECT (stream,
1127               "Download waiting for %" GST_TIME_FORMAT,
1128               GST_TIME_ARGS (wait_time));
1129           g_assert (stream->pending_cb_id == 0);
1130           GST_LOG_OBJECT (stream, "Scheduling delayed load_a_fragment() call");
1131           stream->pending_cb_id =
1132               gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1133               wait_time,
1134               (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1135               gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1136         }
1137       }
1138     }
1139
1140   flushing:
1141     if (stream->download_error_count >= MAX_DOWNLOAD_ERROR_COUNT) {
1142       /* looks like there is no way of knowing when a live stream has ended
1143        * Have to assume we are falling behind and cause a manifest reload */
1144       GST_DEBUG_OBJECT (stream, "Converting error of live stream to EOS");
1145       gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1146       return;
1147     }
1148   } else if (!gst_adaptive_demux2_stream_has_next_fragment (demux, stream)) {
1149     /* If this is the last fragment, consider failures EOS and not actual
1150      * errors. Due to rounding errors in the durations, the last fragment
1151      * might not actually exist */
1152     GST_DEBUG_OBJECT (stream, "Converting error for last fragment to EOS");
1153     gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1154     return;
1155   } else {
1156     /* retry same segment */
1157     if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
1158       gst_adaptive_demux2_stream_error (stream);
1159       return;
1160     }
1161     goto again;
1162   }
1163
1164 again:
1165   /* wait a short time in case the server needs a bit to recover */
1166   GST_LOG_OBJECT (stream,
1167       "Scheduling delayed load_a_fragment() call to retry in 10 milliseconds");
1168   g_assert (stream->pending_cb_id == 0);
1169   stream->pending_cb_id = gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task, 10 * GST_MSECOND,  /* Retry in 10 ms */
1170       (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1171       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1172 }
1173
1174 static void
1175 update_stream_bitrate (GstAdaptiveDemux2Stream * stream,
1176     DownloadRequest * request)
1177 {
1178   GstClockTimeDiff last_download_duration;
1179   guint64 fragment_bytes_downloaded = request->content_received;
1180
1181   /* The stream last_download time tracks the full download time for now */
1182   stream->last_download_time =
1183       GST_CLOCK_DIFF (request->download_request_time,
1184       request->download_end_time);
1185
1186   /* Here we only track the time the data took to arrive and ignore request delay, so we can estimate bitrate */
1187   last_download_duration =
1188       GST_CLOCK_DIFF (request->download_start_time, request->download_end_time);
1189
1190   /* If the entire response arrived in the first buffer
1191    * though, include the request time to get a valid
1192    * bitrate estimate */
1193   if (last_download_duration < 2 * stream->last_download_time)
1194     last_download_duration = stream->last_download_time;
1195
1196   if (last_download_duration > 0) {
1197     stream->last_bitrate =
1198         gst_util_uint64_scale (fragment_bytes_downloaded,
1199         8 * GST_SECOND, last_download_duration);
1200
1201     GST_DEBUG_OBJECT (stream,
1202         "Updated stream bitrate. %" G_GUINT64_FORMAT
1203         " bytes. download time %" GST_TIME_FORMAT " bitrate %"
1204         G_GUINT64_FORMAT " bps", fragment_bytes_downloaded,
1205         GST_TIME_ARGS (last_download_duration), stream->last_bitrate);
1206   }
1207 }
1208
1209 static void
1210 on_download_progress (DownloadRequest * request, DownloadRequestState state,
1211     GstAdaptiveDemux2Stream * stream)
1212 {
1213   GstAdaptiveDemux *demux = stream->demux;
1214   GstBuffer *buffer = download_request_take_buffer (request);
1215
1216   if (buffer) {
1217     GstFlowReturn ret;
1218
1219     GST_DEBUG_OBJECT (stream,
1220         "Handling buffer of %" G_GSIZE_FORMAT
1221         " bytes of ongoing download progress - %" G_GUINT64_FORMAT " / %"
1222         G_GUINT64_FORMAT " bytes", gst_buffer_get_size (buffer),
1223         request->content_received, request->content_length);
1224
1225     /* Drop the request lock when parsing data. FIXME: Check and comment why this is needed */
1226     download_request_unlock (request);
1227     ret = gst_adaptive_demux2_stream_parse_buffer (stream, buffer);
1228     download_request_lock (request);
1229
1230     if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
1231       return;
1232
1233     if (ret != GST_FLOW_OK) {
1234       GST_DEBUG_OBJECT (stream,
1235           "Buffer parsing returned: %d %s. Aborting download", ret,
1236           gst_flow_get_name (ret));
1237
1238       if (!stream->downloading_header && !stream->downloading_index)
1239         update_stream_bitrate (stream, request);
1240
1241       downloadhelper_cancel_request (demux->download_helper, request);
1242
1243       /* cancellation is async, so recycle our download request to avoid races */
1244       download_request_unref (stream->download_request);
1245       stream->download_request = download_request_new ();
1246
1247       gst_adaptive_demux2_stream_finish_download (stream, ret, NULL);
1248     }
1249   }
1250 }
1251
1252 static void
1253 on_download_complete (DownloadRequest * request, DownloadRequestState state,
1254     GstAdaptiveDemux2Stream * stream)
1255 {
1256   GstFlowReturn ret = GST_FLOW_OK;
1257   GstBuffer *buffer;
1258
1259   stream->download_active = FALSE;
1260
1261   if (G_UNLIKELY (stream->cancelled))
1262     return;
1263
1264   GST_DEBUG_OBJECT (stream,
1265       "Stream %p %s download for %s is complete with state %d",
1266       stream, uritype (stream), request->uri, request->state);
1267
1268   /* Update bitrate for fragment downloads */
1269   if (!stream->downloading_header && !stream->downloading_index)
1270     update_stream_bitrate (stream, request);
1271
1272   buffer = download_request_take_buffer (request);
1273   if (buffer)
1274     ret = gst_adaptive_demux2_stream_parse_buffer (stream, buffer);
1275
1276   GST_DEBUG_OBJECT (stream,
1277       "%s download finished: %s ret %d %s. Stream state %d", uritype (stream),
1278       request->uri, ret, gst_flow_get_name (ret), stream->state);
1279
1280   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
1281     return;
1282
1283   g_assert (stream->pending_cb_id == 0);
1284   gst_adaptive_demux2_stream_finish_download (stream, ret, NULL);
1285 }
1286
1287 /* must be called from the scheduler context
1288  *
1289  * Will submit the request only, which will complete asynchronously
1290  */
1291 static GstFlowReturn
1292 gst_adaptive_demux2_stream_begin_download_uri (GstAdaptiveDemux * demux,
1293     GstAdaptiveDemux2Stream * stream, const gchar * uri, gint64 start,
1294     gint64 end)
1295 {
1296   DownloadRequest *request = stream->download_request;
1297
1298   GST_DEBUG_OBJECT (demux,
1299       "Downloading %s uri: %s, range:%" G_GINT64_FORMAT " - %" G_GINT64_FORMAT,
1300       uritype (stream), uri, start, end);
1301
1302   if (!gst_adaptive_demux2_stream_create_parser (stream))
1303     return GST_FLOW_ERROR;
1304
1305   /* Configure our download request */
1306   download_request_set_uri (request, uri, start, end);
1307
1308   if (stream->downloading_header || stream->downloading_index) {
1309     download_request_set_callbacks (request,
1310         (DownloadRequestEventCallback) on_download_complete,
1311         (DownloadRequestEventCallback) on_download_error,
1312         (DownloadRequestEventCallback) on_download_cancellation,
1313         (DownloadRequestEventCallback) NULL, stream);
1314   } else {
1315     download_request_set_callbacks (request,
1316         (DownloadRequestEventCallback) on_download_complete,
1317         (DownloadRequestEventCallback) on_download_error,
1318         (DownloadRequestEventCallback) on_download_cancellation,
1319         (DownloadRequestEventCallback) on_download_progress, stream);
1320   }
1321
1322   if (!downloadhelper_submit_request (demux->download_helper,
1323           demux->manifest_uri, DOWNLOAD_FLAG_NONE, request, NULL))
1324     return GST_FLOW_ERROR;
1325
1326   stream->download_active = TRUE;
1327
1328   return GST_FLOW_OK;
1329 }
1330
1331 /* must be called from the scheduler context */
1332 static GstFlowReturn
1333 gst_adaptive_demux2_stream_download_fragment (GstAdaptiveDemux2Stream * stream)
1334 {
1335   GstAdaptiveDemux *demux = stream->demux;
1336   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1337   gchar *url = NULL;
1338
1339   /* FIXME :  */
1340   /* THERE ARE THREE DIFFERENT VARIABLES FOR THE "BEGINNING" OF A FRAGMENT ! */
1341   if (stream->starting_fragment) {
1342     GST_DEBUG_OBJECT (stream, "Downloading %s%s%s",
1343         stream->fragment.uri ? "FRAGMENT " : "",
1344         stream->need_header && stream->fragment.header_uri ? "HEADER " : "",
1345         stream->need_index && stream->fragment.index_uri ? "INDEX" : "");
1346
1347     if (stream->fragment.uri == NULL && stream->fragment.header_uri == NULL &&
1348         stream->fragment.index_uri == NULL)
1349       goto no_url_error;
1350
1351     stream->first_fragment_buffer = TRUE;
1352     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING;
1353   }
1354
1355   if (stream->need_header && stream->fragment.header_uri != NULL) {
1356
1357     /* Set the need_index flag when we start the header if we'll also need the index */
1358     stream->need_index = (stream->fragment.index_uri != NULL);
1359
1360     GST_DEBUG_OBJECT (stream, "Fetching header %s %" G_GINT64_FORMAT "-%"
1361         G_GINT64_FORMAT, stream->fragment.header_uri,
1362         stream->fragment.header_range_start, stream->fragment.header_range_end);
1363
1364     stream->downloading_header = TRUE;
1365
1366     return gst_adaptive_demux2_stream_begin_download_uri (demux, stream,
1367         stream->fragment.header_uri, stream->fragment.header_range_start,
1368         stream->fragment.header_range_end);
1369   }
1370
1371   /* check if we have an index */
1372   if (stream->need_index && stream->fragment.index_uri != NULL) {
1373     GST_DEBUG_OBJECT (stream,
1374         "Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
1375         stream->fragment.index_uri,
1376         stream->fragment.index_range_start, stream->fragment.index_range_end);
1377
1378     stream->downloading_index = TRUE;
1379
1380     return gst_adaptive_demux2_stream_begin_download_uri (demux, stream,
1381         stream->fragment.index_uri, stream->fragment.index_range_start,
1382         stream->fragment.index_range_end);
1383   }
1384
1385   url = stream->fragment.uri;
1386   GST_DEBUG_OBJECT (stream, "Got url '%s' for stream %p", url, stream);
1387   if (!url)
1388     return GST_FLOW_OK;
1389
1390   /* Download the actual fragment, either in chunks or in one go */
1391   stream->first_fragment_buffer = TRUE;
1392
1393   if (klass->need_another_chunk && klass->need_another_chunk (stream)
1394       && stream->fragment.chunk_size != 0) {
1395     /* Handle chunk downloading */
1396     gint64 range_start = stream->fragment.range_start;
1397     gint64 range_end = stream->fragment.range_end;
1398     gint chunk_size = stream->fragment.chunk_size;
1399     gint64 chunk_end;
1400
1401     /* HTTP ranges are inclusive for the end */
1402     if (chunk_size != -1) {
1403       chunk_end = range_start + chunk_size - 1;
1404       if (range_end != -1 && range_end < chunk_end)
1405         chunk_end = range_end;
1406     } else {
1407       chunk_end = range_end;
1408     }
1409
1410     GST_DEBUG_OBJECT (stream,
1411         "Starting chunked download %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
1412         url, range_start, chunk_end);
1413     return gst_adaptive_demux2_stream_begin_download_uri (demux, stream, url,
1414         range_start, chunk_end);
1415   }
1416
1417   /* regular single chunk download */
1418   stream->fragment.chunk_size = 0;
1419
1420   return gst_adaptive_demux2_stream_begin_download_uri (demux, stream, url,
1421       stream->fragment.range_start, stream->fragment.range_end);
1422
1423 no_url_error:
1424   {
1425     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1426         (_("Failed to get fragment URL.")),
1427         ("An error happened when getting fragment URL"));
1428     return GST_FLOW_ERROR;
1429   }
1430 }
1431
1432 static gboolean
1433 gst_adaptive_demux2_stream_push_event (GstAdaptiveDemux2Stream * stream,
1434     GstEvent * event)
1435 {
1436   gboolean ret = TRUE;
1437   GstPad *pad;
1438
1439   /* If there's a parsebin, push the event through it */
1440   if (stream->parsebin_sink != NULL) {
1441     pad = gst_object_ref (stream->parsebin_sink);
1442     GST_DEBUG_OBJECT (pad, "Pushing event %" GST_PTR_FORMAT, event);
1443     ret = gst_pad_send_event (pad, gst_event_ref (event));
1444     gst_object_unref (pad);
1445   }
1446
1447   /* If the event is EOS, ensure that all tracks are EOS. This catches
1448    * the case where the parsebin hasn't parsed anything yet (we switched
1449    * to a never before used track right near EOS, or it didn't parse enough
1450    * to create pads and be able to send EOS through to the tracks.
1451    *
1452    * We don't need to care about any other events
1453    */
1454   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
1455     GstAdaptiveDemux *demux = stream->demux;
1456     GList *iter;
1457
1458     TRACKS_LOCK (demux);
1459     for (iter = stream->tracks; iter; iter = iter->next) {
1460       GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
1461       ret &= gst_pad_send_event (track->sinkpad, gst_event_ref (event));
1462     }
1463     TRACKS_UNLOCK (demux);
1464   }
1465
1466   gst_event_unref (event);
1467   return ret;
1468 }
1469
1470 static void
1471 gst_adaptive_demux2_stream_error (GstAdaptiveDemux2Stream * stream)
1472 {
1473   GstAdaptiveDemux *demux = stream->demux;
1474   GstMessage *msg;
1475   GstStructure *details;
1476
1477   details = gst_structure_new_empty ("details");
1478   gst_structure_set (details, "http-status-code", G_TYPE_UINT,
1479       stream->last_status_code, NULL);
1480
1481   stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_ERRORED;
1482
1483   if (stream->last_error) {
1484     gchar *debug = g_strdup_printf ("Error on stream %s",
1485         GST_OBJECT_NAME (stream));
1486     msg =
1487         gst_message_new_error_with_details (GST_OBJECT_CAST (demux),
1488         stream->last_error, debug, details);
1489     GST_ERROR_OBJECT (stream, "Download error: %s",
1490         stream->last_error->message);
1491     g_free (debug);
1492   } else {
1493     GError *err = g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_NOT_FOUND,
1494         _("Couldn't download fragments"));
1495     msg =
1496         gst_message_new_error_with_details (GST_OBJECT_CAST (demux), err,
1497         "Fragment downloading has failed consecutive times", details);
1498     g_error_free (err);
1499     GST_ERROR_OBJECT (stream,
1500         "Download error: Couldn't download fragments, too many failures");
1501   }
1502
1503   gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
1504 }
1505
1506 /* Called when a stream reaches the end of a playback segment */
1507 static void
1508 gst_adaptive_demux2_stream_end_of_manifest (GstAdaptiveDemux2Stream * stream)
1509 {
1510   GstAdaptiveDemux *demux = stream->demux;
1511   GstFlowReturn combined =
1512       gst_adaptive_demux_period_combine_stream_flows (demux->input_period);
1513
1514   GST_DEBUG_OBJECT (stream, "Combined flow %s", gst_flow_get_name (combined));
1515
1516   if (demux->priv->outputs) {
1517     GstEvent *eos = gst_event_new_eos ();
1518
1519     GST_DEBUG_OBJECT (stream, "Stream is EOS. Stopping.");
1520     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS;
1521
1522     gst_event_set_seqnum (eos, stream->demux->priv->segment_seqnum);
1523     gst_adaptive_demux2_stream_push_event (stream, eos);
1524   } else {
1525     GST_ERROR_OBJECT (demux, "Can't push EOS on non-exposed pad");
1526     gst_adaptive_demux2_stream_error (stream);
1527   }
1528
1529   if (combined == GST_FLOW_EOS && gst_adaptive_demux_has_next_period (demux)) {
1530     GST_DEBUG_OBJECT (stream, "Next period available, advancing");
1531     gst_adaptive_demux_advance_period (demux);
1532   }
1533 }
1534
1535 static gboolean
1536 gst_adaptive_demux2_stream_reload_manifest_cb (GstAdaptiveDemux2Stream * stream)
1537 {
1538   GstAdaptiveDemux *demux = stream->demux;
1539
1540   gboolean is_live = gst_adaptive_demux_is_live (demux);
1541
1542   stream->pending_cb_id = 0;
1543
1544   /* Refetch the playlist now after we waited */
1545   /* FIXME: Make this manifest update async and handle it on completion */
1546   if (!is_live && gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
1547     GST_DEBUG_OBJECT (demux, "Updated the playlist");
1548   }
1549
1550   /* We were called here from a timeout, so if the load function wants to loop
1551    * again, schedule an immediate callback but return G_SOURCE_REMOVE either
1552    * way */
1553   while (gst_adaptive_demux2_stream_next_download (stream));
1554
1555   return G_SOURCE_REMOVE;
1556 }
1557
1558 static gboolean
1559 gst_adaptive_demux2_stream_on_output_space_available_cb (GstAdaptiveDemux2Stream
1560     * stream)
1561 {
1562   /* If the state already moved on, the stream was stopped, or another track
1563    * already woke up and needed data */
1564   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE)
1565     return G_SOURCE_REMOVE;
1566
1567   while (gst_adaptive_demux2_stream_load_a_fragment (stream));
1568
1569   return G_SOURCE_REMOVE;
1570 }
1571
1572 void
1573 gst_adaptive_demux2_stream_on_output_space_available (GstAdaptiveDemux2Stream *
1574     stream)
1575 {
1576   GstAdaptiveDemux *demux = stream->demux;
1577   GList *iter;
1578
1579   for (iter = stream->tracks; iter; iter = iter->next) {
1580     GstAdaptiveDemuxTrack *tmp_track = (GstAdaptiveDemuxTrack *) iter->data;
1581     tmp_track->waiting_del_level = 0;
1582   }
1583
1584   GST_LOG_OBJECT (stream, "Scheduling output_space_available() call");
1585
1586   gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
1587       (GSourceFunc) gst_adaptive_demux2_stream_on_output_space_available_cb,
1588       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1589 }
1590
1591 void
1592 gst_adaptive_demux2_stream_on_manifest_update (GstAdaptiveDemux2Stream * stream)
1593 {
1594   GstAdaptiveDemux *demux = stream->demux;
1595
1596   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE)
1597     return;
1598
1599   g_assert (stream->pending_cb_id == 0);
1600
1601   GST_LOG_OBJECT (stream, "Scheduling load_a_fragment() call");
1602   stream->pending_cb_id =
1603       gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
1604       (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1605       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1606 }
1607
1608 static void
1609 gst_adaptive_demux2_stream_handle_playlist_eos (GstAdaptiveDemux2Stream *
1610     stream)
1611 {
1612   GstAdaptiveDemux *demux = stream->demux;
1613
1614   if (gst_adaptive_demux_is_live (demux) && (demux->segment.rate == 1.0
1615           || gst_adaptive_demux2_stream_in_live_seek_range (demux, stream))) {
1616
1617     if (!gst_adaptive_demux_has_next_period (demux)) {
1618       /* Wait only if we can ensure current manifest has been expired.
1619        * The meaning "we have next period" *WITH* EOS is that, current
1620        * period has been ended but we can continue to the next period */
1621       GST_DEBUG_OBJECT (stream,
1622           "Live playlist EOS - waiting for manifest update");
1623       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE;
1624       gst_adaptive_demux2_stream_wants_manifest_update (demux);
1625       return;
1626     }
1627
1628     if (stream->replaced)
1629       return;
1630   }
1631
1632   gst_adaptive_demux2_stream_end_of_manifest (stream);
1633 }
1634
1635 static gboolean
1636 gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream)
1637 {
1638   GstAdaptiveDemux *demux = stream->demux;
1639   gboolean live = gst_adaptive_demux_is_live (demux);
1640   GstFlowReturn ret = GST_FLOW_OK;
1641
1642   stream->pending_cb_id = 0;
1643
1644   GST_LOG_OBJECT (stream, "entering, state = %d.", stream->state);
1645
1646   switch (stream->state) {
1647     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART:
1648     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT:
1649     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_LIVE:
1650     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE:
1651     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE:
1652       /* Get information about the fragment to download */
1653       GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1654       ret = gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
1655       GST_DEBUG_OBJECT (stream,
1656           "Fragment info update result: %d %s", ret, gst_flow_get_name (ret));
1657
1658       if (ret == GST_FLOW_OK)
1659         stream->starting_fragment = TRUE;
1660       break;
1661     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING:
1662       break;
1663     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS:
1664       GST_ERROR_OBJECT (stream,
1665           "Unexpected stream state EOS. The stream should not be running now.");
1666       return FALSE;
1667     default:
1668       GST_ERROR_OBJECT (stream, "Unexpected stream state %d", stream->state);
1669       g_assert_not_reached ();
1670       break;
1671   }
1672
1673   if (ret == GST_FLOW_OK) {
1674     /* Wait for room in the output tracks */
1675     if (gst_adaptive_demux2_stream_wait_for_output_space (demux, stream,
1676             stream->fragment.duration)) {
1677       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE;
1678       return FALSE;
1679     }
1680   }
1681
1682   if (ret == GST_FLOW_OK) {
1683     /* wait for live fragments to be available */
1684     if (live) {
1685       GstClockTime wait_time =
1686           gst_adaptive_demux2_stream_get_fragment_waiting_time (demux, stream);
1687       if (wait_time > 0) {
1688         GST_DEBUG_OBJECT (stream,
1689             "Download waiting for %" GST_TIME_FORMAT,
1690             GST_TIME_ARGS (wait_time));
1691
1692         stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_LIVE;
1693
1694         GST_LOG_OBJECT (stream, "Scheduling delayed load_a_fragment() call");
1695         g_assert (stream->pending_cb_id == 0);
1696         stream->pending_cb_id =
1697             gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1698             wait_time, (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1699             gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1700         return FALSE;
1701       }
1702     }
1703
1704     if (gst_adaptive_demux2_stream_download_fragment (stream) != GST_FLOW_OK) {
1705       GST_ERROR_OBJECT (demux,
1706           "Failed to begin fragment download for stream %p", stream);
1707       return FALSE;
1708     }
1709   }
1710
1711   switch (ret) {
1712     case GST_FLOW_OK:
1713       break;                    /* all is good, let's go */
1714     case GST_FLOW_EOS:
1715       GST_DEBUG_OBJECT (stream, "EOS, checking to stop download loop");
1716       gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1717       return FALSE;
1718     case GST_FLOW_NOT_LINKED:
1719     {
1720       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS;
1721
1722       if (gst_adaptive_demux_period_combine_stream_flows (demux->input_period)
1723           == GST_FLOW_NOT_LINKED) {
1724         GST_ELEMENT_FLOW_ERROR (demux, ret);
1725       }
1726     }
1727       break;
1728
1729     case GST_FLOW_FLUSHING:
1730       /* Flushing is normal, the target track might have been unselected */
1731       if (G_UNLIKELY (stream->cancelled))
1732         return FALSE;
1733
1734     default:
1735       if (ret <= GST_FLOW_ERROR) {
1736         GST_WARNING_OBJECT (demux, "Error while downloading fragment");
1737         if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
1738           gst_adaptive_demux2_stream_error (stream);
1739           return FALSE;
1740         }
1741
1742         g_clear_error (&stream->last_error);
1743
1744         /* First try to update the playlist for non-live playlists
1745          * in case the URIs have changed in the meantime. But only
1746          * try it the first time, after that we're going to wait a
1747          * a bit to not flood the server */
1748         if (stream->download_error_count == 1
1749             && !gst_adaptive_demux_is_live (demux)) {
1750           /* TODO hlsdemux had more options to this function (boolean and err) */
1751           if (gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
1752             /* Retry immediately, the playlist actually has changed */
1753             GST_DEBUG_OBJECT (demux, "Updated the playlist");
1754             return TRUE;
1755           }
1756         }
1757
1758         /* Wait half the fragment duration before retrying */
1759         GST_LOG_OBJECT (stream, "Scheduling delayed reload_manifest_cb() call");
1760         g_assert (stream->pending_cb_id == 0);
1761         stream->pending_cb_id =
1762             gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1763             stream->fragment.duration / 2,
1764             (GSourceFunc) gst_adaptive_demux2_stream_reload_manifest_cb,
1765             gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1766         return FALSE;
1767       }
1768       break;
1769   }
1770
1771   return FALSE;
1772 }
1773
1774 static gboolean
1775 gst_adaptive_demux2_stream_next_download (GstAdaptiveDemux2Stream * stream)
1776 {
1777   GstAdaptiveDemux *demux = stream->demux;
1778   gboolean end_of_manifest = FALSE;
1779
1780   GST_LOG_OBJECT (stream, "Looking for next download");
1781
1782   /* Restarting download, figure out new position
1783    * FIXME : Move this to a separate function ? */
1784   if (G_UNLIKELY (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART)) {
1785     GstClockTimeDiff stream_time = 0;
1786
1787     GST_DEBUG_OBJECT (stream, "Activating stream after restart");
1788
1789     if (stream->parsebin_sink != NULL) {
1790       /* If the parsebin already exists, we need to clear it out (if it doesn't,
1791        * this is the first time we've used this stream, so it's all good) */
1792       gst_adaptive_demux2_stream_push_event (stream,
1793           gst_event_new_flush_start ());
1794       gst_adaptive_demux2_stream_push_event (stream,
1795           gst_event_new_flush_stop (FALSE));
1796     }
1797
1798     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1799     stream_time = stream->start_position;
1800
1801     GST_DEBUG_OBJECT (stream, "Restarting stream at "
1802         "stream position %" GST_STIME_FORMAT, GST_STIME_ARGS (stream_time));
1803
1804     if (GST_CLOCK_STIME_IS_VALID (stream_time)) {
1805       /* TODO check return */
1806       gst_adaptive_demux2_stream_seek (demux, stream, demux->segment.rate >= 0,
1807           0, stream_time, &stream_time);
1808       stream->current_position = stream->start_position;
1809
1810       GST_DEBUG_OBJECT (stream,
1811           "stream_time after restart seek: %" GST_STIME_FORMAT
1812           " position %" GST_STIME_FORMAT, GST_STIME_ARGS (stream_time),
1813           GST_STIME_ARGS (stream->current_position));
1814     }
1815
1816     /* Trigger (re)computation of the parsebin input segment */
1817     stream->compute_segment = TRUE;
1818
1819     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1820
1821     stream->discont = TRUE;
1822     stream->need_header = TRUE;
1823     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
1824   }
1825
1826   /* Check if we're done with our segment */
1827   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1828   if (demux->segment.rate > 0) {
1829     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop)
1830         && stream->current_position >= demux->segment.stop) {
1831       end_of_manifest = TRUE;
1832     }
1833   } else {
1834     if (GST_CLOCK_TIME_IS_VALID (demux->segment.start)
1835         && stream->current_position <= demux->segment.start) {
1836       end_of_manifest = TRUE;
1837     }
1838   }
1839   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1840
1841   if (end_of_manifest) {
1842     gst_adaptive_demux2_stream_end_of_manifest (stream);
1843     return FALSE;
1844   }
1845   return gst_adaptive_demux2_stream_load_a_fragment (stream);
1846 }
1847
1848 static gboolean
1849 gst_adaptive_demux2_stream_can_start (GstAdaptiveDemux2Stream * stream)
1850 {
1851   GstAdaptiveDemux *demux = stream->demux;
1852   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1853
1854   if (!klass->stream_can_start)
1855     return TRUE;
1856   return klass->stream_can_start (demux, stream);
1857 }
1858
1859 /**
1860  * gst_adaptive_demux2_stream_start:
1861  * @stream: a #GstAdaptiveDemux2Stream
1862  *
1863  * Start the given @stream. Should be called by subclasses that previously
1864  * returned %FALSE in `GstAdaptiveDemux::stream_can_start()`
1865  */
1866 void
1867 gst_adaptive_demux2_stream_start (GstAdaptiveDemux2Stream * stream)
1868 {
1869   GstAdaptiveDemux *demux;
1870
1871   g_return_if_fail (stream && stream->demux);
1872
1873   demux = stream->demux;
1874
1875   if (stream->pending_cb_id != 0 || stream->download_active) {
1876     /* There is already something active / pending on this stream */
1877     GST_LOG_OBJECT (stream, "Stream already running");
1878     return;
1879   }
1880
1881   /* Some streams require a delayed start, i.e. they need more information
1882    * before they can actually be started */
1883   if (!gst_adaptive_demux2_stream_can_start (stream)) {
1884     GST_LOG_OBJECT (stream, "Stream will be started asynchronously");
1885     return;
1886   }
1887
1888   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS) {
1889     GST_LOG_OBJECT (stream, "Stream is EOS already");
1890     return;
1891   }
1892
1893   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED ||
1894       stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART) {
1895     GST_LOG_OBJECT (stream, "Activating stream. Current state %d",
1896         stream->state);
1897     stream->cancelled = FALSE;
1898     stream->replaced = FALSE;
1899     stream->last_ret = GST_FLOW_OK;
1900
1901     if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)
1902       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
1903   }
1904
1905   GST_LOG_OBJECT (stream, "Scheduling next_download() call");
1906   stream->pending_cb_id =
1907       gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
1908       (GSourceFunc) gst_adaptive_demux2_stream_next_download,
1909       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1910 }
1911
1912 void
1913 gst_adaptive_demux2_stream_stop (GstAdaptiveDemux2Stream * stream)
1914 {
1915   GstAdaptiveDemux *demux = stream->demux;
1916
1917   GST_DEBUG_OBJECT (stream, "Stopping stream (from state %d)", stream->state);
1918   stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
1919
1920   if (stream->pending_cb_id != 0) {
1921     gst_adaptive_demux_loop_cancel_call (demux->priv->scheduler_task,
1922         stream->pending_cb_id);
1923     stream->pending_cb_id = 0;
1924   }
1925
1926   /* Cancel and drop the existing download request */
1927   downloadhelper_cancel_request (demux->download_helper,
1928       stream->download_request);
1929   download_request_unref (stream->download_request);
1930   stream->downloading_header = stream->downloading_index = FALSE;
1931   stream->download_request = download_request_new ();
1932   stream->download_active = FALSE;
1933 }
1934
1935 gboolean
1936 gst_adaptive_demux2_stream_is_running (GstAdaptiveDemux2Stream * stream)
1937 {
1938   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)
1939     return FALSE;
1940   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART)
1941     return FALSE;
1942   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS)
1943     return FALSE;
1944   return TRUE;
1945 }
1946
1947 gboolean
1948 gst_adaptive_demux2_stream_is_selected_locked (GstAdaptiveDemux2Stream * stream)
1949 {
1950   GList *tmp;
1951
1952   for (tmp = stream->tracks; tmp; tmp = tmp->next) {
1953     GstAdaptiveDemuxTrack *track = tmp->data;
1954     if (track->selected)
1955       return TRUE;
1956   }
1957
1958   return FALSE;
1959 }
1960
1961 /**
1962  * gst_adaptive_demux2_stream_is_selected:
1963  * @stream: A #GstAdaptiveDemux2Stream
1964  *
1965  * Returns: %TRUE if any of the tracks targetted by @stream is selected
1966  */
1967 gboolean
1968 gst_adaptive_demux2_stream_is_selected (GstAdaptiveDemux2Stream * stream)
1969 {
1970   gboolean ret;
1971
1972   g_return_val_if_fail (stream && stream->demux, FALSE);
1973
1974   TRACKS_LOCK (stream->demux);
1975   ret = gst_adaptive_demux2_stream_is_selected_locked (stream);
1976   TRACKS_UNLOCK (stream->demux);
1977
1978   return ret;
1979 }