adaptivedemux2: stream: Set period has_next_period flag before EOS
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / ext / adaptivedemux2 / gstadaptivedemux-stream.c
1 /* GStreamer
2  *
3  * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4  *   Author: Thiago Santos <thiagoss@osg.samsung.com>
5  *
6  * Copyright (C) 2021-2022 Centricular Ltd
7  *   Author: Edward Hervey <edward@centricular.com>
8  *   Author: Jan Schmidt <jan@centricular.com>
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29
30 #include "gstadaptivedemux.h"
31 #include "gstadaptivedemux-private.h"
32
33 #include <glib/gi18n-lib.h>
34 #include <gst/app/gstappsrc.h>
35
36 GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
37 #define GST_CAT_DEFAULT adaptivedemux2_debug
38
39 static void gst_adaptive_demux2_stream_finalize (GObject * object);
40 static void gst_adaptive_demux2_stream_error (GstAdaptiveDemux2Stream * stream);
41
42 #define gst_adaptive_demux2_stream_parent_class parent_class
43 G_DEFINE_ABSTRACT_TYPE (GstAdaptiveDemux2Stream, gst_adaptive_demux2_stream,
44     GST_TYPE_OBJECT);
45
46 static void
47 gst_adaptive_demux2_stream_class_init (GstAdaptiveDemux2StreamClass * klass)
48 {
49   GObjectClass *gobject_class = (GObjectClass *) klass;
50
51   gobject_class->finalize = gst_adaptive_demux2_stream_finalize;
52 }
53
54 static GType tsdemux_type = 0;
55
56 static void
57 gst_adaptive_demux2_stream_init (GstAdaptiveDemux2Stream * stream)
58 {
59   stream->download_request = download_request_new ();
60   stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
61   stream->last_ret = GST_FLOW_OK;
62
63   stream->fragment_bitrates =
64       g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS);
65
66   stream->start_position = stream->current_position = GST_CLOCK_TIME_NONE;
67
68   gst_segment_init (&stream->parse_segment, GST_FORMAT_TIME);
69 }
70
71 /* must be called with manifest_lock taken.
72  * It will temporarily drop the manifest_lock in order to join the task.
73  * It will join only the old_streams (the demux->streams are joined by
74  * gst_adaptive_demux_stop_tasks before gst_adaptive_demux2_stream_free is
75  * called)
76  */
77 static void
78 gst_adaptive_demux2_stream_finalize (GObject * object)
79 {
80   GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) object;
81
82   GST_LOG_OBJECT (object, "Finalizing");
83
84   if (stream->download_request)
85     download_request_unref (stream->download_request);
86
87   stream->cancelled = TRUE;
88   g_clear_error (&stream->last_error);
89
90   gst_adaptive_demux2_stream_fragment_clear (&stream->fragment);
91
92   if (stream->pending_events) {
93     g_list_free_full (stream->pending_events, (GDestroyNotify) gst_event_unref);
94     stream->pending_events = NULL;
95   }
96
97   if (stream->parsebin_sink) {
98     gst_object_unref (stream->parsebin_sink);
99     stream->parsebin_sink = NULL;
100   }
101
102   if (stream->pad_added_id)
103     g_signal_handler_disconnect (stream->parsebin, stream->pad_added_id);
104   if (stream->pad_removed_id)
105     g_signal_handler_disconnect (stream->parsebin, stream->pad_removed_id);
106
107   if (stream->parsebin != NULL) {
108     GST_LOG_OBJECT (stream, "Removing parsebin");
109     gst_bin_remove (GST_BIN_CAST (stream->demux), stream->parsebin);
110     gst_element_set_state (stream->parsebin, GST_STATE_NULL);
111     gst_object_unref (stream->parsebin);
112     stream->parsebin = NULL;
113   }
114
115   g_free (stream->fragment_bitrates);
116
117   g_list_free_full (stream->tracks,
118       (GDestroyNotify) gst_adaptive_demux_track_unref);
119
120   if (stream->pending_caps)
121     gst_caps_unref (stream->pending_caps);
122
123   g_clear_pointer (&stream->pending_tags, gst_tag_list_unref);
124   g_clear_pointer (&stream->stream_collection, gst_object_unref);
125
126   G_OBJECT_CLASS (parent_class)->finalize (object);
127 }
128
129 /**
130  * gst_adaptive_demux2_stream_add_track:
131  * @stream: A #GstAdaptiveDemux2Stream
132  * @track: (transfer none): A #GstAdaptiveDemuxTrack to assign to the @stream
133  *
134  * This function is called when a subclass knows of a target @track that this
135  * @stream can provide.
136  */
137 gboolean
138 gst_adaptive_demux2_stream_add_track (GstAdaptiveDemux2Stream * stream,
139     GstAdaptiveDemuxTrack * track)
140 {
141   g_return_val_if_fail (track != NULL, FALSE);
142
143   GST_DEBUG_OBJECT (stream->demux, "stream:%p track:%s", stream,
144       track->stream_id);
145   if (g_list_find (stream->tracks, track)) {
146     GST_DEBUG_OBJECT (stream->demux,
147         "track '%s' already handled by this stream", track->stream_id);
148     return FALSE;
149   }
150
151   stream->tracks =
152       g_list_append (stream->tracks, gst_adaptive_demux_track_ref (track));
153   if (stream->demux) {
154     g_assert (stream->period);
155     gst_adaptive_demux_period_add_track (stream->period, track);
156   }
157   return TRUE;
158 }
159
160 static gboolean
161 gst_adaptive_demux2_stream_next_download (GstAdaptiveDemux2Stream * stream);
162 static gboolean
163 gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream);
164 static void
165 gst_adaptive_demux2_stream_handle_playlist_eos (GstAdaptiveDemux2Stream *
166     stream);
167 static GstFlowReturn
168 gst_adaptive_demux2_stream_begin_download_uri (GstAdaptiveDemux * demux,
169     GstAdaptiveDemux2Stream * stream, const gchar * uri, gint64 start,
170     gint64 end);
171
172 #ifndef GST_DISABLE_GST_DEBUG
173 static const char *
174 uritype (GstAdaptiveDemux2Stream * s)
175 {
176   if (s->downloading_header)
177     return "header";
178   if (s->downloading_index)
179     return "index";
180   return "fragment";
181 }
182 #endif
183
184 /* Schedules another chunked download (returns TRUE) or FALSE if no more chunks */
185 static gboolean
186 schedule_another_chunk (GstAdaptiveDemux2Stream * stream)
187 {
188   GstAdaptiveDemux *demux = stream->demux;
189   DownloadRequest *request = stream->download_request;
190   GstFlowReturn ret;
191
192   gchar *uri = request->uri;
193   gint64 range_start = request->range_start;
194   gint64 range_end = request->range_end;
195   gint64 chunk_size;
196   gint64 chunk_end;
197
198   if (range_end == -1)
199     return FALSE;               /* This was a request to the end, no more to load */
200
201   /* The size of the request that just completed: */
202   chunk_size = range_end + 1 - range_start;
203
204   if (request->content_received < chunk_size)
205     return FALSE;               /* Short read - we're done */
206
207   /* Accumulate the data we just fetched, to figure out the next
208    * request start position and update the target chunk size from
209    * the updated stream fragment info */
210   range_start += chunk_size;
211   range_end = stream->fragment.range_end;
212   chunk_size = stream->fragment.chunk_size;
213
214   if (chunk_size == 0)
215     return FALSE;               /* Sub-class doesn't want another chunk */
216
217   /* HTTP ranges are inclusive for the end */
218   if (chunk_size != -1) {
219     chunk_end = range_start + chunk_size - 1;
220     if (range_end != -1 && range_end < chunk_end)
221       chunk_end = range_end;
222   } else {
223     chunk_end = range_end;
224   }
225
226   GST_DEBUG_OBJECT (stream,
227       "Starting next chunk %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT
228       " chunk_size %" G_GINT64_FORMAT, uri, range_start, chunk_end, chunk_size);
229
230   ret =
231       gst_adaptive_demux2_stream_begin_download_uri (demux, stream, uri,
232       range_start, chunk_end);
233   if (ret != GST_FLOW_OK) {
234     GST_DEBUG_OBJECT (stream,
235         "Stopping stream due to begin download failure - ret %s",
236         gst_flow_get_name (ret));
237     gst_adaptive_demux2_stream_stop (stream);
238     return FALSE;
239   }
240
241   return TRUE;
242 }
243
244 static void
245 drain_inactive_tracks (GstAdaptiveDemux * demux,
246     GstAdaptiveDemux2Stream * stream)
247 {
248   GList *iter;
249
250   TRACKS_LOCK (demux);
251   for (iter = stream->tracks; iter; iter = iter->next) {
252     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
253     if (!track->selected) {
254       gst_adaptive_demux_track_drain_to (track,
255           demux->priv->global_output_position);
256     }
257   }
258
259   TRACKS_UNLOCK (demux);
260 }
261
262 /* Called to complete a download, either due to failure or completion
263  * Should set up the next download if necessary */
264 static void
265 gst_adaptive_demux2_stream_finish_download (GstAdaptiveDemux2Stream *
266     stream, GstFlowReturn ret, GError * err)
267 {
268   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux);
269   GstAdaptiveDemux *demux = stream->demux;
270
271   GST_DEBUG_OBJECT (stream,
272       "%s download finish: %d %s - err: %p", uritype (stream), ret,
273       gst_flow_get_name (ret), err);
274
275   stream->download_finished = TRUE;
276
277   /* finish_fragment might call gst_adaptive_demux2_stream_advance_fragment,
278    * which can look at the last_ret - so make sure it's stored before calling that.
279    * Also, for not-linked or other errors passed in that are going to make
280    * this stream stop, we'll need to store it */
281   stream->last_ret = ret;
282
283   if (err) {
284     g_clear_error (&stream->last_error);
285     stream->last_error = g_error_copy (err);
286   }
287
288   /* For actual errors, stop now, no need to call finish_fragment and get
289    * confused if it returns a non-error status, but if EOS was passed in,
290    * continue and check whether finish_fragment() says we've finished
291    * the whole manifest or just this fragment */
292   if (ret < 0 && ret != GST_FLOW_EOS) {
293     GST_INFO_OBJECT (stream,
294         "Stopping stream due to error ret %s", gst_flow_get_name (ret));
295     gst_adaptive_demux2_stream_stop (stream);
296     return;
297   }
298
299   /* Handle all the possible flow returns here: */
300   if (ret == GST_ADAPTIVE_DEMUX_FLOW_END_OF_FRAGMENT) {
301     /* The sub-class wants to stop the fragment immediately */
302     stream->fragment.finished = TRUE;
303     ret = klass->finish_fragment (demux, stream);
304
305     GST_DEBUG_OBJECT (stream, "finish_fragment ret %d %s", ret,
306         gst_flow_get_name (ret));
307   } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_RESTART_FRAGMENT) {
308     GST_DEBUG_OBJECT (stream, "Restarting download as requested");
309     /* Just mark the fragment as finished */
310     stream->fragment.finished = TRUE;
311     ret = GST_FLOW_OK;
312   } else if (!klass->need_another_chunk || stream->fragment.chunk_size == -1
313       || !klass->need_another_chunk (stream)
314       || stream->fragment.chunk_size == 0) {
315     stream->fragment.finished = TRUE;
316     ret = klass->finish_fragment (stream->demux, stream);
317
318     GST_DEBUG_OBJECT (stream, "finish_fragment ret %d %s", ret,
319         gst_flow_get_name (ret));
320   } else if (stream->fragment.chunk_size != 0
321       && schedule_another_chunk (stream)) {
322     /* Another download has already begun, no need to queue anything below */
323     return;
324   }
325
326   /* For HLS, we might be enqueueing data into tracks that aren't
327    * selected. Drain those ones out */
328   drain_inactive_tracks (stream->demux, stream);
329
330   /* Now that we've called finish_fragment we can clear these flags the
331    * sub-class might have checked */
332   if (stream->downloading_header) {
333     stream->need_header = FALSE;
334     stream->downloading_header = FALSE;
335   } else if (stream->downloading_index) {
336     stream->need_index = FALSE;
337     stream->downloading_index = FALSE;
338     /* Restart the fragment again now that header + index were loaded
339      * so that get_fragment_info() will be called again */
340     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
341   } else {
342     /* Finishing a fragment data download. Try for another */
343     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
344   }
345
346   /* if GST_FLOW_EOS was passed in that means this download is finished,
347    * but it's the result returned from finish_fragment() we really care
348    * about, as that tells us if the manifest has run out of fragments
349    * to load */
350   if (ret == GST_FLOW_EOS) {
351     stream->last_ret = ret;
352
353     gst_adaptive_demux2_stream_handle_playlist_eos (stream);
354     return;
355   }
356
357   /* Now finally, if ret is anything other than success, we should stop this
358    * stream */
359   if (ret < 0) {
360     GST_DEBUG_OBJECT (stream,
361         "Stopping stream due to finish fragment ret %s",
362         gst_flow_get_name (ret));
363     gst_adaptive_demux2_stream_stop (stream);
364     return;
365   }
366
367   /* Clear the last_ret marker before starting a fresh download */
368   stream->last_ret = GST_FLOW_OK;
369
370   GST_LOG_OBJECT (stream, "Scheduling next_download() call");
371   stream->pending_cb_id =
372       gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
373       (GSourceFunc) gst_adaptive_demux2_stream_next_download,
374       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
375 }
376
377 /* Must be called from the scheduler context */
378 void
379 gst_adaptive_demux2_stream_parse_error (GstAdaptiveDemux2Stream * stream,
380     GError * err)
381 {
382   GstAdaptiveDemux *demux = stream->demux;
383
384   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
385     return;
386
387   downloadhelper_cancel_request (demux->download_helper,
388       stream->download_request);
389
390   /* cancellation is async, so recycle our download request to avoid races */
391   download_request_unref (stream->download_request);
392   stream->download_request = download_request_new ();
393
394   gst_adaptive_demux2_stream_finish_download (stream, GST_FLOW_CUSTOM_ERROR,
395       err);
396 }
397
398 static void
399 gst_adaptive_demux2_stream_prepare_segment (GstAdaptiveDemux * demux,
400     GstAdaptiveDemux2Stream * stream, gboolean first_and_live)
401 {
402   GstClockTime period_start = gst_adaptive_demux_get_period_start_time (demux);
403   GstClockTime offset =
404       gst_adaptive_demux2_stream_get_presentation_offset (demux, stream);
405
406   stream->parse_segment = demux->segment;
407
408   /* The demuxer segment is just built from seek events, but for each stream
409    * we have to adjust segments according to the current period and the
410    * stream specific presentation time offset.
411    *
412    * For each period, buffer timestamps start again from 0. Additionally the
413    * buffer timestamps are shifted by the stream specific presentation time
414    * offset, so the first buffer timestamp of a period is 0 + presentation
415    * time offset. If the stream contains timestamps itself, this is also
416    * supposed to be the presentation time stored inside the stream.
417    *
418    * The stream time over periods is supposed to be continuous, that is the
419    * buffer timestamp 0 + presentation time offset should map to the start
420    * time of the current period.
421    *
422    *
423    * The adjustment of the stream segments as such works the following.
424    *
425    * If the demuxer segment start is bigger than the period start, this
426    * means that we have to drop some media at the beginning of the current
427    * period, e.g. because a seek into the middle of the period has
428    * happened. The amount of media to drop is the difference between the
429    * period start and the demuxer segment start, and as each period starts
430    * again from 0, this difference is going to be the actual stream's
431    * segment start. As all timestamps of the stream are shifted by the
432    * presentation time offset, we will also have to move the segment start
433    * by that offset.
434    *
435    * Likewise, the demuxer segment stop value is adjusted in the same
436    * fashion.
437    *
438    * Now the running time and stream time at the stream's segment start has
439    * to be the one that is stored inside the demuxer's segment, which means
440    * that segment.base and segment.time have to be copied over (done just
441    * above)
442    *
443    *
444    * If the demuxer segment start is smaller than the period start time,
445    * this means that the whole period is inside the segment. As each period
446    * starts timestamps from 0, and additionally timestamps are shifted by
447    * the presentation time offset, the stream's first timestamp (and as such
448    * the stream's segment start) has to be the presentation time offset.
449    * The stream time at the segment start is supposed to be the stream time
450    * of the period start according to the demuxer segment, so the stream
451    * segment's time would be set to that. The same goes for the stream
452    * segment's base, which is supposed to be the running time of the period
453    * start according to the demuxer's segment.
454    *
455    * The same logic applies for negative rates with the segment stop and
456    * the period stop time (which gets clamped).
457    *
458    *
459    * For the first case where not the complete period is inside the segment,
460    * the segment time and base as calculated by the second case would be
461    * equivalent.
462    */
463   GST_DEBUG_OBJECT (demux, "Using demux segment %" GST_SEGMENT_FORMAT,
464       &demux->segment);
465   GST_DEBUG_OBJECT (demux,
466       "period_start: %" GST_TIME_FORMAT " offset: %" GST_TIME_FORMAT,
467       GST_TIME_ARGS (period_start), GST_TIME_ARGS (offset));
468   /* note for readers:
469    * Since stream->parse_segment is initially a copy of demux->segment,
470    * only the values that need updating are modified below. */
471   if (first_and_live) {
472     /* If first and live, demuxer did seek to the current position already */
473     stream->parse_segment.start = demux->segment.start - period_start + offset;
474     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
475       stream->parse_segment.stop = demux->segment.stop - period_start + offset;
476     /* FIXME : Do we need to handle negative rates for this ? */
477     stream->parse_segment.position = stream->parse_segment.start;
478   } else if (demux->segment.start > period_start) {
479     /* seek within a period */
480     stream->parse_segment.start = demux->segment.start - period_start + offset;
481     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
482       stream->parse_segment.stop = demux->segment.stop - period_start + offset;
483     if (stream->parse_segment.rate >= 0)
484       stream->parse_segment.position = offset;
485     else
486       stream->parse_segment.position = stream->parse_segment.stop;
487   } else {
488     stream->parse_segment.start = offset;
489     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
490       stream->parse_segment.stop = demux->segment.stop - period_start + offset;
491     if (stream->parse_segment.rate >= 0) {
492       stream->parse_segment.position = offset;
493       stream->parse_segment.base =
494           gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
495           period_start);
496     } else {
497       stream->parse_segment.position = stream->parse_segment.stop;
498       stream->parse_segment.base =
499           gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
500           period_start + demux->segment.stop - demux->segment.start);
501     }
502     stream->parse_segment.time =
503         gst_segment_to_stream_time (&demux->segment, GST_FORMAT_TIME,
504         period_start);
505   }
506
507   stream->send_segment = TRUE;
508
509   GST_DEBUG_OBJECT (stream, "Prepared segment %" GST_SEGMENT_FORMAT,
510       &stream->parse_segment);
511 }
512
513 /* Segment lock hold */
514 static void
515 update_buffer_pts_and_demux_position_locked (GstAdaptiveDemux * demux,
516     GstAdaptiveDemux2Stream * stream, GstBuffer * buffer)
517 {
518   GstClockTimeDiff pos;
519
520   GST_DEBUG_OBJECT (stream, "stream->fragment.stream_time %" GST_STIME_FORMAT,
521       GST_STIME_ARGS (stream->fragment.stream_time));
522
523   pos = stream->fragment.stream_time;
524
525   if (GST_CLOCK_STIME_IS_VALID (pos)) {
526     GstClockTime offset =
527         gst_adaptive_demux2_stream_get_presentation_offset (demux, stream);
528
529     pos += offset;
530
531     if (pos < 0) {
532       GST_WARNING_OBJECT (stream, "Clamping segment and buffer position to 0");
533       pos = 0;
534     }
535
536     GST_BUFFER_PTS (buffer) = pos;
537   } else {
538     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
539   }
540
541   GST_DEBUG_OBJECT (stream, "Buffer/stream position is now: %" GST_TIME_FORMAT,
542       GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
543 }
544
545 /* Must be called from the scheduler context */
546 GstFlowReturn
547 gst_adaptive_demux2_stream_push_buffer (GstAdaptiveDemux2Stream * stream,
548     GstBuffer * buffer)
549 {
550   GstAdaptiveDemux *demux = stream->demux;
551   GstFlowReturn ret = GST_FLOW_OK;
552   gboolean discont = FALSE;
553   /* Pending events */
554   GstEvent *pending_caps = NULL, *pending_segment = NULL, *pending_tags =
555       NULL, *stream_start = NULL, *buffer_gap = NULL;
556   GList *pending_events = NULL;
557
558   if (stream->compute_segment) {
559     gst_adaptive_demux2_stream_prepare_segment (demux, stream,
560         stream->first_and_live);
561     stream->compute_segment = FALSE;
562     stream->first_and_live = FALSE;
563   }
564
565   if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DROPPABLE)) {
566     GST_DEBUG_OBJECT (stream, "Creating gap event for droppable buffer");
567     buffer_gap =
568         gst_event_new_gap (GST_BUFFER_PTS (buffer),
569         GST_BUFFER_DURATION (buffer));
570   }
571
572   if (stream->first_fragment_buffer) {
573     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
574     if (demux->segment.rate < 0)
575       /* Set DISCONT flag for every first buffer in reverse playback mode
576        * as each fragment for its own has to be reversed */
577       discont = TRUE;
578     update_buffer_pts_and_demux_position_locked (demux, stream, buffer);
579     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
580
581     GST_LOG_OBJECT (stream, "Handling initial buffer %" GST_PTR_FORMAT, buffer);
582
583     /* Do we need to inject STREAM_START and SEGMENT events ?
584      *
585      * This can happen when a stream is restarted, and also when switching to a
586      * variant which needs a header (in which case downloading_header will be
587      * TRUE)
588      */
589     if (G_UNLIKELY (stream->send_segment || stream->downloading_header)) {
590       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
591       pending_segment = gst_event_new_segment (&stream->parse_segment);
592       gst_event_set_seqnum (pending_segment, demux->priv->segment_seqnum);
593       stream->send_segment = FALSE;
594       GST_DEBUG_OBJECT (stream, "Sending %" GST_PTR_FORMAT, pending_segment);
595       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
596       stream_start = gst_event_new_stream_start ("bogus");
597       if (demux->have_group_id)
598         gst_event_set_group_id (stream_start, demux->group_id);
599     }
600   } else {
601     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
602   }
603   stream->first_fragment_buffer = FALSE;
604
605   if (stream->discont) {
606     discont = TRUE;
607     stream->discont = FALSE;
608   }
609
610   if (discont) {
611     GST_DEBUG_OBJECT (stream, "Marking fragment as discontinuous");
612     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
613   } else {
614     GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
615   }
616
617   GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
618   GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
619   if (G_UNLIKELY (stream->pending_caps)) {
620     pending_caps = gst_event_new_caps (stream->pending_caps);
621     gst_caps_unref (stream->pending_caps);
622     stream->pending_caps = NULL;
623   }
624
625   if (G_UNLIKELY (stream->pending_tags)) {
626     GstTagList *tags = stream->pending_tags;
627
628     stream->pending_tags = NULL;
629
630     if (tags)
631       pending_tags = gst_event_new_tag (tags);
632   }
633   if (G_UNLIKELY (stream->pending_events)) {
634     pending_events = stream->pending_events;
635     stream->pending_events = NULL;
636   }
637
638   /* Do not push events or buffers holding the manifest lock */
639   if (G_UNLIKELY (stream_start)) {
640     GST_DEBUG_OBJECT (stream,
641         "Setting stream start: %" GST_PTR_FORMAT, stream_start);
642     gst_pad_send_event (stream->parsebin_sink, stream_start);
643   }
644   if (G_UNLIKELY (pending_caps)) {
645     GST_DEBUG_OBJECT (stream,
646         "Setting pending caps: %" GST_PTR_FORMAT, pending_caps);
647     gst_pad_send_event (stream->parsebin_sink, pending_caps);
648   }
649   if (G_UNLIKELY (pending_segment)) {
650     GST_DEBUG_OBJECT (stream,
651         "Sending pending seg: %" GST_PTR_FORMAT, pending_segment);
652     gst_pad_send_event (stream->parsebin_sink, pending_segment);
653   }
654   if (G_UNLIKELY (pending_tags)) {
655     GST_DEBUG_OBJECT (stream,
656         "Sending pending tags: %" GST_PTR_FORMAT, pending_tags);
657     gst_pad_send_event (stream->parsebin_sink, pending_tags);
658   }
659   while (pending_events != NULL) {
660     GstEvent *event = pending_events->data;
661
662     GST_DEBUG_OBJECT (stream, "Sending pending event: %" GST_PTR_FORMAT, event);
663     if (!gst_pad_send_event (stream->parsebin_sink, event))
664       GST_ERROR_OBJECT (stream, "Failed to send pending event");
665
666     pending_events = g_list_delete_link (pending_events, pending_events);
667   }
668
669   GST_DEBUG_OBJECT (stream,
670       "About to push buffer of size %" G_GSIZE_FORMAT " offset %"
671       G_GUINT64_FORMAT, gst_buffer_get_size (buffer),
672       GST_BUFFER_OFFSET (buffer));
673
674   ret = gst_pad_chain (stream->parsebin_sink, buffer);
675
676   if (buffer_gap) {
677     GST_DEBUG_OBJECT (stream, "Sending %" GST_PTR_FORMAT, buffer_gap);
678     gst_pad_send_event (stream->parsebin_sink, buffer_gap);
679   }
680
681   if (G_UNLIKELY (stream->cancelled)) {
682     GST_LOG_OBJECT (demux, "Stream was cancelled");
683     return GST_FLOW_FLUSHING;
684   }
685
686   GST_LOG_OBJECT (stream, "Push result: %d %s", ret, gst_flow_get_name (ret));
687
688   return ret;
689 }
690
691 static GstFlowReturn
692 gst_adaptive_demux2_stream_parse_buffer (GstAdaptiveDemux2Stream * stream,
693     GstBuffer * buffer)
694 {
695   GstAdaptiveDemux *demux = stream->demux;
696   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
697   GstFlowReturn ret = GST_FLOW_OK;
698
699   /* do not make any changes if the stream is cancelled */
700   if (G_UNLIKELY (stream->cancelled)) {
701     gst_buffer_unref (buffer);
702     return GST_FLOW_FLUSHING;
703   }
704
705   /* starting_fragment is set to TRUE at the beginning of
706    * _stream_download_fragment()
707    * /!\ If there is a header/index being downloaded, then this will
708    * be TRUE for the first one ... but FALSE for the remaining ones,
709    * including the *actual* fragment ! */
710   if (stream->starting_fragment) {
711     stream->starting_fragment = FALSE;
712     if (klass->start_fragment != NULL && !klass->start_fragment (demux, stream))
713       return GST_FLOW_ERROR;
714   }
715
716   stream->download_total_bytes += gst_buffer_get_size (buffer);
717
718   GST_TRACE_OBJECT (stream,
719       "Received %s buffer of size %" G_GSIZE_FORMAT, uritype (stream),
720       gst_buffer_get_size (buffer));
721
722   ret = klass->data_received (demux, stream, buffer);
723
724   if (ret != GST_FLOW_OK) {
725     GST_DEBUG_OBJECT (stream, "data_received returned %s",
726         gst_flow_get_name (ret));
727
728     if (ret == GST_FLOW_FLUSHING) {
729       /* do not make any changes if the stream is cancelled */
730       if (G_UNLIKELY (stream->cancelled)) {
731         return ret;
732       }
733     }
734
735     if (ret < GST_FLOW_EOS) {
736       GstEvent *eos = gst_event_new_eos ();
737       GST_ELEMENT_FLOW_ERROR (demux, ret);
738
739       GST_DEBUG_OBJECT (stream, "Pushing EOS to parser");
740
741       /* TODO push this on all pads */
742       gst_event_set_seqnum (eos, stream->demux->priv->segment_seqnum);
743       gst_pad_send_event (stream->parsebin_sink, eos);
744       ret = GST_FLOW_ERROR;
745
746       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_ERRORED;
747     }
748   }
749
750   return ret;
751 }
752
753 /* Calculate the low and high download buffering watermarks
754  * in time as MAX (low-watermark-time, low-watermark-fragments) and
755  * MIN (high-watermark-time, high-watermark-fragments) respectively
756  */
757 static void
758 calculate_track_thresholds (GstAdaptiveDemux * demux,
759     GstClockTime fragment_duration, GstClockTime * low_threshold,
760     GstClockTime * high_threshold)
761 {
762   GST_OBJECT_LOCK (demux);
763   *low_threshold = demux->buffering_low_watermark_fragments * fragment_duration;
764   if (*low_threshold == 0 ||
765       (demux->buffering_low_watermark_time != 0
766           && demux->buffering_low_watermark_time > *low_threshold)) {
767     *low_threshold = demux->buffering_low_watermark_time;
768   }
769
770   *high_threshold =
771       demux->buffering_high_watermark_fragments * fragment_duration;
772   if (*high_threshold == 0 || (demux->buffering_high_watermark_time != 0
773           && demux->buffering_high_watermark_time < *high_threshold)) {
774     *high_threshold = demux->buffering_high_watermark_time;
775   }
776
777   /* Make sure the low and high thresholds are less than the maximum buffering
778    * time */
779   if (*high_threshold == 0 ||
780       (demux->max_buffering_time != 0
781           && demux->max_buffering_time < *high_threshold)) {
782     *high_threshold = demux->max_buffering_time;
783   }
784
785   if (*low_threshold == 0 ||
786       (demux->max_buffering_time != 0
787           && demux->max_buffering_time < *low_threshold)) {
788     *low_threshold = demux->max_buffering_time;
789   }
790
791   /* Make sure the high threshold is higher than (or equal to) the low threshold.
792    * It's OK if they are the same, as the minimum download is 1 fragment */
793   if (*high_threshold == 0 ||
794       (*low_threshold != 0 && *low_threshold > *high_threshold)) {
795     *high_threshold = *low_threshold;
796   }
797   GST_OBJECT_UNLOCK (demux);
798 }
799
800 static gboolean
801 gst_adaptive_demux2_stream_wait_for_output_space (GstAdaptiveDemux * demux,
802     GstAdaptiveDemux2Stream * stream, GstClockTime fragment_duration)
803 {
804   gboolean need_to_wait = TRUE;
805   gboolean have_any_tracks = FALSE;
806   gboolean have_active_tracks = FALSE;
807   gboolean have_filled_inactive = FALSE;
808   gboolean update_buffering = FALSE;
809
810   GstClockTime low_threshold = 0, high_threshold = 0;
811   GList *iter;
812
813   calculate_track_thresholds (demux, fragment_duration,
814       &low_threshold, &high_threshold);
815
816   /* If there are no tracks at all, don't wait. If there are no active
817    * tracks, keep filling until at least one track is full. If there
818    * are active tracks, require that they are all full */
819   TRACKS_LOCK (demux);
820   for (iter = stream->tracks; iter; iter = iter->next) {
821     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
822
823     /* Update the buffering threshold */
824     if (low_threshold != track->buffering_threshold) {
825       /* The buffering threshold for this track changed, make sure to
826        * re-check buffering status */
827       update_buffering = TRUE;
828       track->buffering_threshold = low_threshold;
829     }
830
831     have_any_tracks = TRUE;
832     if (track->active)
833       have_active_tracks = TRUE;
834
835     if (track->level_time < high_threshold) {
836       if (track->active) {
837         need_to_wait = FALSE;
838         GST_DEBUG_OBJECT (demux,
839             "stream %p track %s has level %" GST_TIME_FORMAT
840             " - needs more data (target %" GST_TIME_FORMAT
841             ") (fragment duration %" GST_TIME_FORMAT ")", stream,
842             track->stream_id, GST_TIME_ARGS (track->level_time),
843             GST_TIME_ARGS (high_threshold), GST_TIME_ARGS (fragment_duration));
844         continue;
845       }
846     } else if (!track->active) {        /* track is over threshold and inactive */
847       have_filled_inactive = TRUE;
848     }
849
850     GST_DEBUG_OBJECT (demux,
851         "stream %p track %s active (%d) has level %" GST_TIME_FORMAT,
852         stream, track->stream_id, track->active,
853         GST_TIME_ARGS (track->level_time));
854   }
855
856   /* If there are no tracks, don't wait (we might need data to create them),
857    * or if there are active tracks that need more data to hit the threshold,
858    * don't wait. Otherwise it means all active tracks are full and we should wait */
859   if (!have_any_tracks) {
860     GST_DEBUG_OBJECT (demux, "stream %p has no tracks - not waiting", stream);
861     need_to_wait = FALSE;
862   } else if (!have_active_tracks && !have_filled_inactive) {
863     GST_DEBUG_OBJECT (demux,
864         "stream %p has inactive tracks that need more data - not waiting",
865         stream);
866     need_to_wait = FALSE;
867   }
868
869   if (need_to_wait) {
870     for (iter = stream->tracks; iter; iter = iter->next) {
871       GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
872       track->waiting_del_level = high_threshold;
873       GST_DEBUG_OBJECT (demux,
874           "Waiting for queued data on stream %p track %s to drop below %"
875           GST_TIME_FORMAT " (fragment duration %" GST_TIME_FORMAT ")",
876           stream, track->stream_id, GST_TIME_ARGS (track->waiting_del_level),
877           GST_TIME_ARGS (fragment_duration));
878     }
879   }
880
881   if (update_buffering) {
882     demux_update_buffering_locked (demux);
883     demux_post_buffering_locked (demux);
884   }
885
886   TRACKS_UNLOCK (demux);
887
888   return need_to_wait;
889 }
890
891 static GstAdaptiveDemuxTrack *
892 match_parsebin_to_track (GstAdaptiveDemux2Stream * stream, GstPad * pad)
893 {
894   GList *tmp;
895   GstAdaptiveDemuxTrack *found_track = NULL, *first_matched_track = NULL;
896   gint num_possible_tracks = 0;
897   GstStream *gst_stream;
898   const gchar *internal_stream_id;
899   GstStreamType stream_type;
900
901   gst_stream = gst_pad_get_stream (pad);
902
903   /* FIXME: Edward: Added assertion because I don't see in what cases we would
904    * end up with a pad from parsebin which wouldn't have an associated
905    * GstStream. */
906   g_assert (gst_stream);
907
908   internal_stream_id = gst_stream_get_stream_id (gst_stream);
909   stream_type = gst_stream_get_stream_type (gst_stream);
910
911   GST_DEBUG_OBJECT (pad,
912       "Trying to match pad from parsebin with internal streamid %s and caps %"
913       GST_PTR_FORMAT, GST_STR_NULL (internal_stream_id),
914       gst_stream_get_caps (gst_stream));
915
916   /* Try to match directly by the track's pending upstream_stream_id */
917   for (tmp = stream->tracks; tmp; tmp = tmp->next) {
918     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
919
920     if (stream_type != GST_STREAM_TYPE_UNKNOWN && track->type != stream_type)
921       continue;
922
923     GST_DEBUG_OBJECT (pad, "track upstream_stream_id: %s",
924         track->upstream_stream_id);
925
926     if (first_matched_track == NULL)
927       first_matched_track = track;
928     num_possible_tracks++;
929
930     /* If this track has a desired upstream stream id, match on it */
931     if (track->upstream_stream_id == NULL ||
932         g_strcmp0 (track->upstream_stream_id, internal_stream_id)) {
933       /* This is not the track for this pad */
934       continue;
935     }
936
937     /* Remove pending upstream id (we have matched it for the pending
938      * stream_id) */
939     g_free (track->upstream_stream_id);
940     track->upstream_stream_id = NULL;
941     found_track = track;
942     break;
943   }
944
945   if (found_track == NULL) {
946     /* If we arrive here, it means the stream is switching pads after
947      * the stream has already started running */
948     /* No track is currently waiting for this particular stream id -
949      * try and match an existing linked track. If there's only 1 possible,
950      * take it. */
951     if (num_possible_tracks == 1 && first_matched_track != NULL) {
952       GST_LOG_OBJECT (pad, "Only one possible track to link to");
953       found_track = first_matched_track;
954     }
955   }
956
957   if (found_track == NULL) {
958     /* TODO: There are multiple possible tracks, need to match based
959      * on language code and caps. Have you found a stream like this? */
960     GST_FIXME_OBJECT (pad, "Need to match track based on caps and language");
961   }
962
963   if (found_track != NULL) {
964     if (!gst_pad_is_linked (found_track->sinkpad)) {
965       GST_LOG_OBJECT (pad, "Linking to track pad %" GST_PTR_FORMAT,
966           found_track->sinkpad);
967
968       if (gst_pad_link (pad, found_track->sinkpad) != GST_PAD_LINK_OK) {
969         GST_ERROR_OBJECT (pad, "Couldn't connect to track sinkpad");
970         /* FIXME : Do something if we can't link ? */
971       }
972     } else {
973       /* Store pad as pending link */
974       GST_LOG_OBJECT (pad,
975           "Remembering pad to be linked when current pad is unlinked");
976       g_assert (found_track->pending_srcpad == NULL);
977       found_track->pending_srcpad = gst_object_ref (pad);
978     }
979   }
980
981   if (gst_stream)
982     gst_object_unref (gst_stream);
983
984   return found_track;
985 }
986
987 static void
988 parsebin_pad_removed_cb (GstElement * parsebin, GstPad * pad,
989     GstAdaptiveDemux2Stream * stream)
990 {
991   GList *iter;
992   GST_DEBUG_OBJECT (stream, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
993
994   /* Remove from pending source pad */
995   TRACKS_LOCK (stream->demux);
996   for (iter = stream->tracks; iter; iter = iter->next) {
997     GstAdaptiveDemuxTrack *track = iter->data;
998     if (track->pending_srcpad == pad) {
999       gst_object_unref (track->pending_srcpad);
1000       track->pending_srcpad = NULL;
1001       break;
1002     }
1003   }
1004   TRACKS_UNLOCK (stream->demux);
1005 }
1006
1007 static void
1008 parsebin_pad_added_cb (GstElement * parsebin, GstPad * pad,
1009     GstAdaptiveDemux2Stream * stream)
1010 {
1011   if (!GST_PAD_IS_SRC (pad))
1012     return;
1013
1014   GST_DEBUG_OBJECT (stream, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1015
1016   if (!match_parsebin_to_track (stream, pad))
1017     GST_WARNING_OBJECT (pad, "Found no track to handle pad");
1018
1019   GST_DEBUG_OBJECT (stream->demux, "Done linking");
1020 }
1021
1022 static void
1023 parsebin_deep_element_added_cb (GstBin * parsebin, GstBin * unused,
1024     GstElement * element, GstAdaptiveDemux * demux)
1025 {
1026   if (G_OBJECT_TYPE (element) == tsdemux_type) {
1027     GST_DEBUG_OBJECT (demux, "Overriding tsdemux ignore-pcr to TRUE");
1028     g_object_set (element, "ignore-pcr", TRUE, NULL);
1029   }
1030 }
1031
1032 /* must be called with manifest_lock taken */
1033 static gboolean
1034 gst_adaptive_demux2_stream_create_parser (GstAdaptiveDemux2Stream * stream)
1035 {
1036   GstAdaptiveDemux *demux = stream->demux;
1037
1038   if (stream->parsebin == NULL) {
1039     GstEvent *event;
1040
1041     GST_DEBUG_OBJECT (demux, "Setting up new parsing source");
1042
1043     /* Workaround to detect if tsdemux is being used */
1044     if (tsdemux_type == 0) {
1045       GstElement *element = gst_element_factory_make ("tsdemux", NULL);
1046       if (element) {
1047         tsdemux_type = G_OBJECT_TYPE (element);
1048         gst_object_unref (element);
1049       }
1050     }
1051
1052     stream->parsebin = gst_element_factory_make ("parsebin", NULL);
1053     if (tsdemux_type)
1054       g_signal_connect (stream->parsebin, "deep-element-added",
1055           (GCallback) parsebin_deep_element_added_cb, demux);
1056     gst_bin_add (GST_BIN_CAST (demux), gst_object_ref (stream->parsebin));
1057     stream->parsebin_sink =
1058         gst_element_get_static_pad (stream->parsebin, "sink");
1059     stream->pad_added_id = g_signal_connect (stream->parsebin, "pad-added",
1060         G_CALLBACK (parsebin_pad_added_cb), stream);
1061     stream->pad_removed_id = g_signal_connect (stream->parsebin, "pad-removed",
1062         G_CALLBACK (parsebin_pad_removed_cb), stream);
1063
1064     event = gst_event_new_stream_start ("bogus");
1065     if (demux->have_group_id)
1066       gst_event_set_group_id (event, demux->group_id);
1067
1068     gst_pad_send_event (stream->parsebin_sink, event);
1069
1070     /* Not sure if these need to be outside the manifest lock: */
1071     gst_element_sync_state_with_parent (stream->parsebin);
1072     stream->last_status_code = 200;     /* default to OK */
1073   }
1074   return TRUE;
1075 }
1076
1077 static void
1078 on_download_cancellation (DownloadRequest * request, DownloadRequestState state,
1079     GstAdaptiveDemux2Stream * stream)
1080 {
1081 }
1082
1083 static void
1084 on_download_error (DownloadRequest * request, DownloadRequestState state,
1085     GstAdaptiveDemux2Stream * stream)
1086 {
1087   GstAdaptiveDemux *demux = stream->demux;
1088   guint last_status_code = request->status_code;
1089   gboolean live;
1090
1091   stream->download_active = FALSE;
1092   stream->last_status_code = last_status_code;
1093
1094   GST_DEBUG_OBJECT (stream,
1095       "Download finished with error, request state %d http status %u, dc %d",
1096       request->state, last_status_code, stream->download_error_count);
1097
1098   live = gst_adaptive_demux_is_live (demux);
1099   if (((last_status_code / 100 == 4 && live)
1100           || last_status_code / 100 == 5)) {
1101     /* 4xx/5xx */
1102     /* if current position is before available start, switch to next */
1103     if (!gst_adaptive_demux2_stream_has_next_fragment (demux, stream))
1104       goto flushing;
1105
1106     if (live) {
1107       gint64 range_start, range_stop;
1108
1109       if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
1110               &range_stop))
1111         goto flushing;
1112
1113       if (demux->segment.position < range_start) {
1114         GstFlowReturn ret;
1115
1116         GST_DEBUG_OBJECT (stream, "Retrying once with next segment");
1117         gst_adaptive_demux2_stream_finish_download (stream, GST_FLOW_EOS, NULL);
1118
1119         GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1120
1121         ret = gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
1122         GST_DEBUG_OBJECT (stream, "update_fragment_info ret: %s",
1123             gst_flow_get_name (ret));
1124
1125         if (ret == GST_FLOW_OK)
1126           goto again;
1127
1128       } else if (demux->segment.position > range_stop) {
1129         /* wait a bit to be in range, we don't have any locks at that point */
1130         GstClockTime wait_time =
1131             gst_adaptive_demux2_stream_get_fragment_waiting_time (demux,
1132             stream);
1133         if (wait_time > 0) {
1134           GST_DEBUG_OBJECT (stream,
1135               "Download waiting for %" GST_TIME_FORMAT,
1136               GST_TIME_ARGS (wait_time));
1137           g_assert (stream->pending_cb_id == 0);
1138           GST_LOG_OBJECT (stream, "Scheduling delayed load_a_fragment() call");
1139           stream->pending_cb_id =
1140               gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1141               wait_time,
1142               (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1143               gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1144         }
1145       }
1146     }
1147
1148   flushing:
1149     if (stream->download_error_count >= MAX_DOWNLOAD_ERROR_COUNT) {
1150       /* looks like there is no way of knowing when a live stream has ended
1151        * Have to assume we are falling behind and cause a manifest reload */
1152       GST_DEBUG_OBJECT (stream, "Converting error of live stream to EOS");
1153       gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1154       return;
1155     }
1156   } else if (!gst_adaptive_demux2_stream_has_next_fragment (demux, stream)) {
1157     /* If this is the last fragment, consider failures EOS and not actual
1158      * errors. Due to rounding errors in the durations, the last fragment
1159      * might not actually exist */
1160     GST_DEBUG_OBJECT (stream, "Converting error for last fragment to EOS");
1161     gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1162     return;
1163   } else {
1164     /* retry same segment */
1165     if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
1166       gst_adaptive_demux2_stream_error (stream);
1167       return;
1168     }
1169     goto again;
1170   }
1171
1172 again:
1173   /* wait a short time in case the server needs a bit to recover */
1174   GST_LOG_OBJECT (stream,
1175       "Scheduling delayed load_a_fragment() call to retry in 10 milliseconds");
1176   g_assert (stream->pending_cb_id == 0);
1177   stream->pending_cb_id = gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task, 10 * GST_MSECOND,  /* Retry in 10 ms */
1178       (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1179       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1180 }
1181
1182 static void
1183 update_stream_bitrate (GstAdaptiveDemux2Stream * stream,
1184     DownloadRequest * request)
1185 {
1186   GstClockTimeDiff last_download_duration;
1187   guint64 fragment_bytes_downloaded = request->content_received;
1188
1189   /* The stream last_download time tracks the full download time for now */
1190   stream->last_download_time =
1191       GST_CLOCK_DIFF (request->download_request_time,
1192       request->download_end_time);
1193
1194   /* Here we only track the time the data took to arrive and ignore request delay, so we can estimate bitrate */
1195   last_download_duration =
1196       GST_CLOCK_DIFF (request->download_start_time, request->download_end_time);
1197
1198   /* If the entire response arrived in the first buffer
1199    * though, include the request time to get a valid
1200    * bitrate estimate */
1201   if (last_download_duration < 2 * stream->last_download_time)
1202     last_download_duration = stream->last_download_time;
1203
1204   if (last_download_duration > 0) {
1205     stream->last_bitrate =
1206         gst_util_uint64_scale (fragment_bytes_downloaded,
1207         8 * GST_SECOND, last_download_duration);
1208
1209     GST_DEBUG_OBJECT (stream,
1210         "Updated stream bitrate. %" G_GUINT64_FORMAT
1211         " bytes. download time %" GST_TIME_FORMAT " bitrate %"
1212         G_GUINT64_FORMAT " bps", fragment_bytes_downloaded,
1213         GST_TIME_ARGS (last_download_duration), stream->last_bitrate);
1214   }
1215 }
1216
1217 static void
1218 on_download_progress (DownloadRequest * request, DownloadRequestState state,
1219     GstAdaptiveDemux2Stream * stream)
1220 {
1221   GstAdaptiveDemux *demux = stream->demux;
1222   GstBuffer *buffer = download_request_take_buffer (request);
1223
1224   if (buffer) {
1225     GstFlowReturn ret;
1226
1227     GST_DEBUG_OBJECT (stream,
1228         "Handling buffer of %" G_GSIZE_FORMAT
1229         " bytes of ongoing download progress - %" G_GUINT64_FORMAT " / %"
1230         G_GUINT64_FORMAT " bytes", gst_buffer_get_size (buffer),
1231         request->content_received, request->content_length);
1232
1233     /* Drop the request lock when parsing data. FIXME: Check and comment why this is needed */
1234     download_request_unlock (request);
1235     ret = gst_adaptive_demux2_stream_parse_buffer (stream, buffer);
1236     download_request_lock (request);
1237
1238     if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
1239       return;
1240
1241     if (ret != GST_FLOW_OK) {
1242       GST_DEBUG_OBJECT (stream,
1243           "Buffer parsing returned: %d %s. Aborting download", ret,
1244           gst_flow_get_name (ret));
1245
1246       if (!stream->downloading_header && !stream->downloading_index)
1247         update_stream_bitrate (stream, request);
1248
1249       downloadhelper_cancel_request (demux->download_helper, request);
1250
1251       /* cancellation is async, so recycle our download request to avoid races */
1252       download_request_unref (stream->download_request);
1253       stream->download_request = download_request_new ();
1254
1255       gst_adaptive_demux2_stream_finish_download (stream, ret, NULL);
1256     }
1257   }
1258 }
1259
1260 static void
1261 on_download_complete (DownloadRequest * request, DownloadRequestState state,
1262     GstAdaptiveDemux2Stream * stream)
1263 {
1264   GstFlowReturn ret = GST_FLOW_OK;
1265   GstBuffer *buffer;
1266
1267   stream->download_active = FALSE;
1268
1269   if (G_UNLIKELY (stream->cancelled))
1270     return;
1271
1272   GST_DEBUG_OBJECT (stream,
1273       "Stream %p %s download for %s is complete with state %d",
1274       stream, uritype (stream), request->uri, request->state);
1275
1276   /* Update bitrate for fragment downloads */
1277   if (!stream->downloading_header && !stream->downloading_index)
1278     update_stream_bitrate (stream, request);
1279
1280   buffer = download_request_take_buffer (request);
1281   if (buffer)
1282     ret = gst_adaptive_demux2_stream_parse_buffer (stream, buffer);
1283
1284   GST_DEBUG_OBJECT (stream,
1285       "%s download finished: %s ret %d %s. Stream state %d", uritype (stream),
1286       request->uri, ret, gst_flow_get_name (ret), stream->state);
1287
1288   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
1289     return;
1290
1291   g_assert (stream->pending_cb_id == 0);
1292   gst_adaptive_demux2_stream_finish_download (stream, ret, NULL);
1293 }
1294
1295 /* must be called from the scheduler context
1296  *
1297  * Will submit the request only, which will complete asynchronously
1298  */
1299 static GstFlowReturn
1300 gst_adaptive_demux2_stream_begin_download_uri (GstAdaptiveDemux * demux,
1301     GstAdaptiveDemux2Stream * stream, const gchar * uri, gint64 start,
1302     gint64 end)
1303 {
1304   DownloadRequest *request = stream->download_request;
1305
1306   GST_DEBUG_OBJECT (demux,
1307       "Downloading %s uri: %s, range:%" G_GINT64_FORMAT " - %" G_GINT64_FORMAT,
1308       uritype (stream), uri, start, end);
1309
1310   if (!gst_adaptive_demux2_stream_create_parser (stream))
1311     return GST_FLOW_ERROR;
1312
1313   /* Configure our download request */
1314   download_request_set_uri (request, uri, start, end);
1315
1316   if (stream->downloading_header || stream->downloading_index) {
1317     download_request_set_callbacks (request,
1318         (DownloadRequestEventCallback) on_download_complete,
1319         (DownloadRequestEventCallback) on_download_error,
1320         (DownloadRequestEventCallback) on_download_cancellation,
1321         (DownloadRequestEventCallback) NULL, stream);
1322   } else {
1323     download_request_set_callbacks (request,
1324         (DownloadRequestEventCallback) on_download_complete,
1325         (DownloadRequestEventCallback) on_download_error,
1326         (DownloadRequestEventCallback) on_download_cancellation,
1327         (DownloadRequestEventCallback) on_download_progress, stream);
1328   }
1329
1330   if (!downloadhelper_submit_request (demux->download_helper,
1331           demux->manifest_uri, DOWNLOAD_FLAG_NONE, request, NULL))
1332     return GST_FLOW_ERROR;
1333
1334   stream->download_active = TRUE;
1335
1336   return GST_FLOW_OK;
1337 }
1338
1339 /* must be called from the scheduler context */
1340 static GstFlowReturn
1341 gst_adaptive_demux2_stream_download_fragment (GstAdaptiveDemux2Stream * stream)
1342 {
1343   GstAdaptiveDemux *demux = stream->demux;
1344   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1345   gchar *url = NULL;
1346
1347   /* FIXME :  */
1348   /* THERE ARE THREE DIFFERENT VARIABLES FOR THE "BEGINNING" OF A FRAGMENT ! */
1349   if (stream->starting_fragment) {
1350     GST_DEBUG_OBJECT (stream, "Downloading %s%s%s",
1351         stream->fragment.uri ? "FRAGMENT " : "",
1352         stream->need_header && stream->fragment.header_uri ? "HEADER " : "",
1353         stream->need_index && stream->fragment.index_uri ? "INDEX" : "");
1354
1355     if (stream->fragment.uri == NULL && stream->fragment.header_uri == NULL &&
1356         stream->fragment.index_uri == NULL)
1357       goto no_url_error;
1358
1359     stream->first_fragment_buffer = TRUE;
1360     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING;
1361   }
1362
1363   if (stream->need_header && stream->fragment.header_uri != NULL) {
1364
1365     /* Set the need_index flag when we start the header if we'll also need the index */
1366     stream->need_index = (stream->fragment.index_uri != NULL);
1367
1368     GST_DEBUG_OBJECT (stream, "Fetching header %s %" G_GINT64_FORMAT "-%"
1369         G_GINT64_FORMAT, stream->fragment.header_uri,
1370         stream->fragment.header_range_start, stream->fragment.header_range_end);
1371
1372     stream->downloading_header = TRUE;
1373
1374     return gst_adaptive_demux2_stream_begin_download_uri (demux, stream,
1375         stream->fragment.header_uri, stream->fragment.header_range_start,
1376         stream->fragment.header_range_end);
1377   }
1378
1379   /* check if we have an index */
1380   if (stream->need_index && stream->fragment.index_uri != NULL) {
1381     GST_DEBUG_OBJECT (stream,
1382         "Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
1383         stream->fragment.index_uri,
1384         stream->fragment.index_range_start, stream->fragment.index_range_end);
1385
1386     stream->downloading_index = TRUE;
1387
1388     return gst_adaptive_demux2_stream_begin_download_uri (demux, stream,
1389         stream->fragment.index_uri, stream->fragment.index_range_start,
1390         stream->fragment.index_range_end);
1391   }
1392
1393   url = stream->fragment.uri;
1394   GST_DEBUG_OBJECT (stream, "Got url '%s' for stream %p", url, stream);
1395   if (!url)
1396     return GST_FLOW_OK;
1397
1398   /* Download the actual fragment, either in chunks or in one go */
1399   stream->first_fragment_buffer = TRUE;
1400
1401   if (klass->need_another_chunk && klass->need_another_chunk (stream)
1402       && stream->fragment.chunk_size != 0) {
1403     /* Handle chunk downloading */
1404     gint64 range_start = stream->fragment.range_start;
1405     gint64 range_end = stream->fragment.range_end;
1406     gint chunk_size = stream->fragment.chunk_size;
1407     gint64 chunk_end;
1408
1409     /* HTTP ranges are inclusive for the end */
1410     if (chunk_size != -1) {
1411       chunk_end = range_start + chunk_size - 1;
1412       if (range_end != -1 && range_end < chunk_end)
1413         chunk_end = range_end;
1414     } else {
1415       chunk_end = range_end;
1416     }
1417
1418     GST_DEBUG_OBJECT (stream,
1419         "Starting chunked download %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
1420         url, range_start, chunk_end);
1421     return gst_adaptive_demux2_stream_begin_download_uri (demux, stream, url,
1422         range_start, chunk_end);
1423   }
1424
1425   /* regular single chunk download */
1426   stream->fragment.chunk_size = 0;
1427
1428   return gst_adaptive_demux2_stream_begin_download_uri (demux, stream, url,
1429       stream->fragment.range_start, stream->fragment.range_end);
1430
1431 no_url_error:
1432   {
1433     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1434         (_("Failed to get fragment URL.")),
1435         ("An error happened when getting fragment URL"));
1436     return GST_FLOW_ERROR;
1437   }
1438 }
1439
1440 static gboolean
1441 gst_adaptive_demux2_stream_push_event (GstAdaptiveDemux2Stream * stream,
1442     GstEvent * event)
1443 {
1444   gboolean ret = TRUE;
1445   GstPad *pad;
1446
1447   /* If there's a parsebin, push the event through it */
1448   if (stream->parsebin_sink != NULL) {
1449     pad = gst_object_ref (stream->parsebin_sink);
1450     GST_DEBUG_OBJECT (pad, "Pushing event %" GST_PTR_FORMAT, event);
1451     ret = gst_pad_send_event (pad, gst_event_ref (event));
1452     gst_object_unref (pad);
1453   }
1454
1455   /* If the event is EOS, ensure that all tracks are EOS. This catches
1456    * the case where the parsebin hasn't parsed anything yet (we switched
1457    * to a never before used track right near EOS, or it didn't parse enough
1458    * to create pads and be able to send EOS through to the tracks.
1459    *
1460    * We don't need to care about any other events
1461    */
1462   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
1463     GList *iter;
1464
1465     for (iter = stream->tracks; iter; iter = iter->next) {
1466       GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
1467       ret &= gst_pad_send_event (track->sinkpad, gst_event_ref (event));
1468     }
1469   }
1470
1471   gst_event_unref (event);
1472   return ret;
1473 }
1474
1475 static void
1476 gst_adaptive_demux2_stream_error (GstAdaptiveDemux2Stream * stream)
1477 {
1478   GstAdaptiveDemux *demux = stream->demux;
1479   GstMessage *msg;
1480   GstStructure *details;
1481
1482   details = gst_structure_new_empty ("details");
1483   gst_structure_set (details, "http-status-code", G_TYPE_UINT,
1484       stream->last_status_code, NULL);
1485
1486   stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_ERRORED;
1487
1488   if (stream->last_error) {
1489     gchar *debug = g_strdup_printf ("Error on stream %s",
1490         GST_OBJECT_NAME (stream));
1491     msg =
1492         gst_message_new_error_with_details (GST_OBJECT_CAST (demux),
1493         stream->last_error, debug, details);
1494     GST_ERROR_OBJECT (stream, "Download error: %s",
1495         stream->last_error->message);
1496     g_free (debug);
1497   } else {
1498     GError *err = g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_NOT_FOUND,
1499         _("Couldn't download fragments"));
1500     msg =
1501         gst_message_new_error_with_details (GST_OBJECT_CAST (demux), err,
1502         "Fragment downloading has failed consecutive times", details);
1503     g_error_free (err);
1504     GST_ERROR_OBJECT (stream,
1505         "Download error: Couldn't download fragments, too many failures");
1506   }
1507
1508   gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
1509 }
1510
1511 /* Called when a stream reaches the end of a playback segment */
1512 static void
1513 gst_adaptive_demux2_stream_end_of_manifest (GstAdaptiveDemux2Stream * stream)
1514 {
1515   GstAdaptiveDemux *demux = stream->demux;
1516   GstFlowReturn combined =
1517       gst_adaptive_demux_period_combine_stream_flows (demux->input_period);
1518
1519   GST_DEBUG_OBJECT (stream, "Combined flow %s", gst_flow_get_name (combined));
1520
1521   if (gst_adaptive_demux_has_next_period (demux)) {
1522     if (combined == GST_FLOW_EOS) {
1523       GST_DEBUG_OBJECT (stream, "Next period available, advancing");
1524       gst_adaptive_demux_advance_period (demux);
1525     } else {
1526       /* Ensure the 'has_next_period' flag is set on the period before
1527        * pushing EOS to the stream, so that the output loop knows not
1528        * to actually output the event */
1529       GST_DEBUG_OBJECT (stream, "Marking current period has a next one");
1530       demux->input_period->has_next_period = TRUE;
1531     }
1532   }
1533
1534   if (demux->priv->outputs) {
1535     GstEvent *eos = gst_event_new_eos ();
1536
1537     GST_DEBUG_OBJECT (stream, "Stream is EOS. Stopping.");
1538     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS;
1539
1540     gst_event_set_seqnum (eos, stream->demux->priv->segment_seqnum);
1541     gst_adaptive_demux2_stream_push_event (stream, eos);
1542   } else {
1543     GST_ERROR_OBJECT (demux, "Can't push EOS on non-exposed pad");
1544     gst_adaptive_demux2_stream_error (stream);
1545   }
1546 }
1547
1548 static gboolean
1549 gst_adaptive_demux2_stream_reload_manifest_cb (GstAdaptiveDemux2Stream * stream)
1550 {
1551   GstAdaptiveDemux *demux = stream->demux;
1552
1553   gboolean is_live = gst_adaptive_demux_is_live (demux);
1554
1555   stream->pending_cb_id = 0;
1556
1557   /* Refetch the playlist now after we waited */
1558   /* FIXME: Make this manifest update async and handle it on completion */
1559   if (!is_live && gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
1560     GST_DEBUG_OBJECT (demux, "Updated the playlist");
1561   }
1562
1563   /* We were called here from a timeout, so if the load function wants to loop
1564    * again, schedule an immediate callback but return G_SOURCE_REMOVE either
1565    * way */
1566   while (gst_adaptive_demux2_stream_next_download (stream));
1567
1568   return G_SOURCE_REMOVE;
1569 }
1570
1571 static gboolean
1572 gst_adaptive_demux2_stream_on_output_space_available_cb (GstAdaptiveDemux2Stream
1573     * stream)
1574 {
1575   /* If the state already moved on, the stream was stopped, or another track
1576    * already woke up and needed data */
1577   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE)
1578     return G_SOURCE_REMOVE;
1579
1580   while (gst_adaptive_demux2_stream_load_a_fragment (stream));
1581
1582   return G_SOURCE_REMOVE;
1583 }
1584
1585 void
1586 gst_adaptive_demux2_stream_on_output_space_available (GstAdaptiveDemux2Stream *
1587     stream)
1588 {
1589   GstAdaptiveDemux *demux = stream->demux;
1590   GList *iter;
1591
1592   for (iter = stream->tracks; iter; iter = iter->next) {
1593     GstAdaptiveDemuxTrack *tmp_track = (GstAdaptiveDemuxTrack *) iter->data;
1594     tmp_track->waiting_del_level = 0;
1595   }
1596
1597   GST_LOG_OBJECT (stream, "Scheduling output_space_available() call");
1598
1599   gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
1600       (GSourceFunc) gst_adaptive_demux2_stream_on_output_space_available_cb,
1601       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1602 }
1603
1604 void
1605 gst_adaptive_demux2_stream_on_manifest_update (GstAdaptiveDemux2Stream * stream)
1606 {
1607   GstAdaptiveDemux *demux = stream->demux;
1608
1609   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE)
1610     return;
1611
1612   g_assert (stream->pending_cb_id == 0);
1613
1614   GST_LOG_OBJECT (stream, "Scheduling load_a_fragment() call");
1615   stream->pending_cb_id =
1616       gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
1617       (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1618       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1619 }
1620
1621 static void
1622 gst_adaptive_demux2_stream_handle_playlist_eos (GstAdaptiveDemux2Stream *
1623     stream)
1624 {
1625   GstAdaptiveDemux *demux = stream->demux;
1626
1627   if (gst_adaptive_demux_is_live (demux) && (demux->segment.rate == 1.0
1628           || gst_adaptive_demux2_stream_in_live_seek_range (demux, stream))) {
1629
1630     if (!gst_adaptive_demux_has_next_period (demux)) {
1631       /* Wait only if we can ensure current manifest has been expired.
1632        * The meaning "we have next period" *WITH* EOS is that, current
1633        * period has been ended but we can continue to the next period */
1634       GST_DEBUG_OBJECT (stream,
1635           "Live playlist EOS - waiting for manifest update");
1636       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE;
1637       gst_adaptive_demux2_stream_wants_manifest_update (demux);
1638       return;
1639     }
1640
1641     if (stream->replaced)
1642       return;
1643   }
1644
1645   gst_adaptive_demux2_stream_end_of_manifest (stream);
1646 }
1647
1648 static gboolean
1649 gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream)
1650 {
1651   GstAdaptiveDemux *demux = stream->demux;
1652   gboolean live = gst_adaptive_demux_is_live (demux);
1653   GstFlowReturn ret = GST_FLOW_OK;
1654
1655   stream->pending_cb_id = 0;
1656
1657   GST_LOG_OBJECT (stream, "entering, state = %d.", stream->state);
1658
1659   switch (stream->state) {
1660     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART:
1661     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT:
1662     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_LIVE:
1663     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE:
1664     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE:
1665       /* Get information about the fragment to download */
1666       GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1667       ret = gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
1668       GST_DEBUG_OBJECT (stream,
1669           "Fragment info update result: %d %s", ret, gst_flow_get_name (ret));
1670
1671       if (ret == GST_FLOW_OK)
1672         stream->starting_fragment = TRUE;
1673       break;
1674     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING:
1675       break;
1676     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS:
1677       GST_ERROR_OBJECT (stream,
1678           "Unexpected stream state EOS. The stream should not be running now.");
1679       return FALSE;
1680     default:
1681       GST_ERROR_OBJECT (stream, "Unexpected stream state %d", stream->state);
1682       g_assert_not_reached ();
1683       break;
1684   }
1685
1686   if (ret == GST_FLOW_OK) {
1687     /* Wait for room in the output tracks */
1688     if (gst_adaptive_demux2_stream_wait_for_output_space (demux, stream,
1689             stream->fragment.duration)) {
1690       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE;
1691       return FALSE;
1692     }
1693   }
1694
1695   if (ret == GST_FLOW_OK) {
1696     /* wait for live fragments to be available */
1697     if (live) {
1698       GstClockTime wait_time =
1699           gst_adaptive_demux2_stream_get_fragment_waiting_time (demux, stream);
1700       if (wait_time > 0) {
1701         GST_DEBUG_OBJECT (stream,
1702             "Download waiting for %" GST_TIME_FORMAT,
1703             GST_TIME_ARGS (wait_time));
1704
1705         stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_LIVE;
1706
1707         GST_LOG_OBJECT (stream, "Scheduling delayed load_a_fragment() call");
1708         g_assert (stream->pending_cb_id == 0);
1709         stream->pending_cb_id =
1710             gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1711             wait_time, (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1712             gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1713         return FALSE;
1714       }
1715     }
1716
1717     if (gst_adaptive_demux2_stream_download_fragment (stream) != GST_FLOW_OK) {
1718       GST_ERROR_OBJECT (demux,
1719           "Failed to begin fragment download for stream %p", stream);
1720       return FALSE;
1721     }
1722   }
1723
1724   switch (ret) {
1725     case GST_FLOW_OK:
1726       break;                    /* all is good, let's go */
1727     case GST_FLOW_EOS:
1728       GST_DEBUG_OBJECT (stream, "EOS, checking to stop download loop");
1729       gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1730       return FALSE;
1731     case GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC:
1732       GST_DEBUG_OBJECT (stream, "Lost sync, asking reset to current position");
1733       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
1734       gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
1735           (GSourceFunc) gst_adaptive_demux_handle_lost_sync, demux, NULL);
1736       return FALSE;
1737     case GST_FLOW_NOT_LINKED:
1738     {
1739       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS;
1740
1741       if (gst_adaptive_demux_period_combine_stream_flows (demux->input_period)
1742           == GST_FLOW_NOT_LINKED) {
1743         GST_ELEMENT_FLOW_ERROR (demux, ret);
1744       }
1745     }
1746       break;
1747
1748     case GST_FLOW_FLUSHING:
1749       /* Flushing is normal, the target track might have been unselected */
1750       if (G_UNLIKELY (stream->cancelled))
1751         return FALSE;
1752
1753     default:
1754       if (ret <= GST_FLOW_ERROR) {
1755         GST_WARNING_OBJECT (demux, "Error while downloading fragment");
1756         if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
1757           gst_adaptive_demux2_stream_error (stream);
1758           return FALSE;
1759         }
1760
1761         g_clear_error (&stream->last_error);
1762
1763         /* First try to update the playlist for non-live playlists
1764          * in case the URIs have changed in the meantime. But only
1765          * try it the first time, after that we're going to wait a
1766          * a bit to not flood the server */
1767         if (stream->download_error_count == 1
1768             && !gst_adaptive_demux_is_live (demux)) {
1769           /* TODO hlsdemux had more options to this function (boolean and err) */
1770           if (gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
1771             /* Retry immediately, the playlist actually has changed */
1772             GST_DEBUG_OBJECT (demux, "Updated the playlist");
1773             return TRUE;
1774           }
1775         }
1776
1777         /* Wait half the fragment duration before retrying */
1778         GST_LOG_OBJECT (stream, "Scheduling delayed reload_manifest_cb() call");
1779         g_assert (stream->pending_cb_id == 0);
1780         stream->pending_cb_id =
1781             gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1782             stream->fragment.duration / 2,
1783             (GSourceFunc) gst_adaptive_demux2_stream_reload_manifest_cb,
1784             gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1785         return FALSE;
1786       }
1787       break;
1788   }
1789
1790   return FALSE;
1791 }
1792
1793 static gboolean
1794 gst_adaptive_demux2_stream_next_download (GstAdaptiveDemux2Stream * stream)
1795 {
1796   GstAdaptiveDemux *demux = stream->demux;
1797   gboolean end_of_manifest = FALSE;
1798
1799   GST_LOG_OBJECT (stream, "Looking for next download");
1800
1801   /* Restarting download, figure out new position
1802    * FIXME : Move this to a separate function ? */
1803   if (G_UNLIKELY (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART)) {
1804     GstClockTimeDiff stream_time = 0;
1805
1806     GST_DEBUG_OBJECT (stream, "Activating stream after restart");
1807
1808     if (stream->parsebin_sink != NULL) {
1809       /* If the parsebin already exists, we need to clear it out (if it doesn't,
1810        * this is the first time we've used this stream, so it's all good) */
1811       gst_adaptive_demux2_stream_push_event (stream,
1812           gst_event_new_flush_start ());
1813       gst_adaptive_demux2_stream_push_event (stream,
1814           gst_event_new_flush_stop (FALSE));
1815     }
1816
1817     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1818     stream_time = stream->start_position;
1819
1820     GST_DEBUG_OBJECT (stream, "Restarting stream at "
1821         "stream position %" GST_STIME_FORMAT, GST_STIME_ARGS (stream_time));
1822
1823     if (GST_CLOCK_STIME_IS_VALID (stream_time)) {
1824       /* TODO check return */
1825       gst_adaptive_demux2_stream_seek (demux, stream, demux->segment.rate >= 0,
1826           0, stream_time, &stream_time);
1827       stream->current_position = stream->start_position;
1828
1829       GST_DEBUG_OBJECT (stream,
1830           "stream_time after restart seek: %" GST_STIME_FORMAT
1831           " position %" GST_STIME_FORMAT, GST_STIME_ARGS (stream_time),
1832           GST_STIME_ARGS (stream->current_position));
1833     }
1834
1835     /* Trigger (re)computation of the parsebin input segment */
1836     stream->compute_segment = TRUE;
1837
1838     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1839
1840     stream->discont = TRUE;
1841     stream->need_header = TRUE;
1842     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
1843   }
1844
1845   /* Check if we're done with our segment */
1846   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1847   if (demux->segment.rate > 0) {
1848     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop)
1849         && stream->current_position >= demux->segment.stop) {
1850       end_of_manifest = TRUE;
1851     }
1852   } else {
1853     if (GST_CLOCK_TIME_IS_VALID (demux->segment.start)
1854         && stream->current_position <= demux->segment.start) {
1855       end_of_manifest = TRUE;
1856     }
1857   }
1858   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1859
1860   if (end_of_manifest) {
1861     gst_adaptive_demux2_stream_end_of_manifest (stream);
1862     return FALSE;
1863   }
1864   return gst_adaptive_demux2_stream_load_a_fragment (stream);
1865 }
1866
1867 static gboolean
1868 gst_adaptive_demux2_stream_can_start (GstAdaptiveDemux2Stream * stream)
1869 {
1870   GstAdaptiveDemux *demux = stream->demux;
1871   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1872
1873   if (!klass->stream_can_start)
1874     return TRUE;
1875   return klass->stream_can_start (demux, stream);
1876 }
1877
1878 /**
1879  * gst_adaptive_demux2_stream_start:
1880  * @stream: a #GstAdaptiveDemux2Stream
1881  *
1882  * Start the given @stream. Should be called by subclasses that previously
1883  * returned %FALSE in `GstAdaptiveDemux::stream_can_start()`
1884  */
1885 void
1886 gst_adaptive_demux2_stream_start (GstAdaptiveDemux2Stream * stream)
1887 {
1888   GstAdaptiveDemux *demux;
1889
1890   g_return_if_fail (stream && stream->demux);
1891
1892   demux = stream->demux;
1893
1894   if (stream->pending_cb_id != 0 || stream->download_active) {
1895     /* There is already something active / pending on this stream */
1896     GST_LOG_OBJECT (stream, "Stream already running");
1897     return;
1898   }
1899
1900   /* Some streams require a delayed start, i.e. they need more information
1901    * before they can actually be started */
1902   if (!gst_adaptive_demux2_stream_can_start (stream)) {
1903     GST_LOG_OBJECT (stream, "Stream will be started asynchronously");
1904     return;
1905   }
1906
1907   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS) {
1908     GST_LOG_OBJECT (stream, "Stream is EOS already");
1909     return;
1910   }
1911
1912   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED ||
1913       stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART) {
1914     GST_LOG_OBJECT (stream, "Activating stream. Current state %d",
1915         stream->state);
1916     stream->cancelled = FALSE;
1917     stream->replaced = FALSE;
1918     stream->last_ret = GST_FLOW_OK;
1919
1920     if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)
1921       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
1922   }
1923
1924   GST_LOG_OBJECT (stream, "Scheduling next_download() call");
1925   stream->pending_cb_id =
1926       gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
1927       (GSourceFunc) gst_adaptive_demux2_stream_next_download,
1928       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1929 }
1930
1931 void
1932 gst_adaptive_demux2_stream_stop (GstAdaptiveDemux2Stream * stream)
1933 {
1934   GstAdaptiveDemux *demux = stream->demux;
1935
1936   GST_DEBUG_OBJECT (stream, "Stopping stream (from state %d)", stream->state);
1937   stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
1938
1939   if (stream->pending_cb_id != 0) {
1940     gst_adaptive_demux_loop_cancel_call (demux->priv->scheduler_task,
1941         stream->pending_cb_id);
1942     stream->pending_cb_id = 0;
1943   }
1944
1945   /* Cancel and drop the existing download request */
1946   downloadhelper_cancel_request (demux->download_helper,
1947       stream->download_request);
1948   download_request_unref (stream->download_request);
1949   stream->downloading_header = stream->downloading_index = FALSE;
1950   stream->download_request = download_request_new ();
1951   stream->download_active = FALSE;
1952 }
1953
1954 gboolean
1955 gst_adaptive_demux2_stream_is_running (GstAdaptiveDemux2Stream * stream)
1956 {
1957   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)
1958     return FALSE;
1959   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART)
1960     return FALSE;
1961   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS)
1962     return FALSE;
1963   return TRUE;
1964 }
1965
1966 gboolean
1967 gst_adaptive_demux2_stream_is_selected_locked (GstAdaptiveDemux2Stream * stream)
1968 {
1969   GList *tmp;
1970
1971   for (tmp = stream->tracks; tmp; tmp = tmp->next) {
1972     GstAdaptiveDemuxTrack *track = tmp->data;
1973     if (track->selected)
1974       return TRUE;
1975   }
1976
1977   return FALSE;
1978 }
1979
1980 /**
1981  * gst_adaptive_demux2_stream_is_selected:
1982  * @stream: A #GstAdaptiveDemux2Stream
1983  *
1984  * Returns: %TRUE if any of the tracks targetted by @stream is selected
1985  */
1986 gboolean
1987 gst_adaptive_demux2_stream_is_selected (GstAdaptiveDemux2Stream * stream)
1988 {
1989   gboolean ret;
1990
1991   g_return_val_if_fail (stream && stream->demux, FALSE);
1992
1993   TRACKS_LOCK (stream->demux);
1994   ret = gst_adaptive_demux2_stream_is_selected_locked (stream);
1995   TRACKS_UNLOCK (stream->demux);
1996
1997   return ret;
1998 }