a7949bef3a972a48041346268b070ecf44503538
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / ext / adaptivedemux2 / gstadaptivedemux-stream.c
1 /* GStreamer
2  *
3  * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4  *   Author: Thiago Santos <thiagoss@osg.samsung.com>
5  *
6  * Copyright (C) 2021-2022 Centricular Ltd
7  *   Author: Edward Hervey <edward@centricular.com>
8  *   Author: Jan Schmidt <jan@centricular.com>
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29
30 #include "gstadaptivedemux-stream.h"
31 #include "gstadaptivedemux-private.h"
32
33 #include <glib/gi18n-lib.h>
34 #include <gst/app/gstappsrc.h>
35
36 GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
37 #define GST_CAT_DEFAULT adaptivedemux2_debug
38
39 static void gst_adaptive_demux2_stream_finalize (GObject * object);
40 static void gst_adaptive_demux2_stream_error (GstAdaptiveDemux2Stream * stream);
41 static GstFlowReturn
42 gst_adaptive_demux2_stream_data_received_default (GstAdaptiveDemux2Stream *
43     stream, GstBuffer * buffer);
44 static GstFlowReturn
45 gst_adaptive_demux2_stream_finish_fragment_default (GstAdaptiveDemux2Stream *
46     stream);
47
48 guint64
49 gst_adaptive_demux2_stream_update_current_bitrate (GstAdaptiveDemux2Stream *
50     stream);
51 static void gst_adaptive_demux2_stream_update_track_ids (GstAdaptiveDemux2Stream
52     * stream);
53
54 #define gst_adaptive_demux2_stream_parent_class parent_class
55 G_DEFINE_ABSTRACT_TYPE (GstAdaptiveDemux2Stream, gst_adaptive_demux2_stream,
56     GST_TYPE_OBJECT);
57
58 static void
59 gst_adaptive_demux2_stream_class_init (GstAdaptiveDemux2StreamClass * klass)
60 {
61   GObjectClass *gobject_class = (GObjectClass *) klass;
62
63   gobject_class->finalize = gst_adaptive_demux2_stream_finalize;
64
65   klass->data_received = gst_adaptive_demux2_stream_data_received_default;
66   klass->finish_fragment = gst_adaptive_demux2_stream_finish_fragment_default;
67 }
68
69 static GType tsdemux_type = 0;
70
71 static void
72 gst_adaptive_demux2_stream_init (GstAdaptiveDemux2Stream * stream)
73 {
74   stream->download_request = download_request_new ();
75   stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
76   stream->last_ret = GST_FLOW_OK;
77   stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
78
79   stream->fragment_bitrates =
80       g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS);
81
82   stream->start_position = stream->current_position = GST_CLOCK_TIME_NONE;
83
84   gst_segment_init (&stream->parse_segment, GST_FORMAT_TIME);
85 }
86
87 /* must be called with manifest_lock taken.
88  * It will temporarily drop the manifest_lock in order to join the task.
89  * It will join only the old_streams (the demux->streams are joined by
90  * gst_adaptive_demux_stop_tasks before gst_adaptive_demux2_stream_free is
91  * called)
92  */
93 static void
94 gst_adaptive_demux2_stream_finalize (GObject * object)
95 {
96   GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) object;
97
98   GST_LOG_OBJECT (object, "Finalizing");
99
100   if (stream->download_request)
101     download_request_unref (stream->download_request);
102
103   g_clear_error (&stream->last_error);
104
105   gst_adaptive_demux2_stream_fragment_clear (&stream->fragment);
106
107   if (stream->pending_events) {
108     g_list_free_full (stream->pending_events, (GDestroyNotify) gst_event_unref);
109     stream->pending_events = NULL;
110   }
111
112   if (stream->parsebin_sink) {
113     gst_object_unref (stream->parsebin_sink);
114     stream->parsebin_sink = NULL;
115   }
116
117   if (stream->pad_added_id)
118     g_signal_handler_disconnect (stream->parsebin, stream->pad_added_id);
119   if (stream->pad_removed_id)
120     g_signal_handler_disconnect (stream->parsebin, stream->pad_removed_id);
121
122   if (stream->parsebin != NULL) {
123     GST_LOG_OBJECT (stream, "Removing parsebin");
124     gst_bin_remove (GST_BIN_CAST (stream->demux), stream->parsebin);
125     gst_element_set_state (stream->parsebin, GST_STATE_NULL);
126     gst_object_unref (stream->parsebin);
127     stream->parsebin = NULL;
128   }
129
130   g_free (stream->fragment_bitrates);
131
132   g_list_free_full (stream->tracks,
133       (GDestroyNotify) gst_adaptive_demux_track_unref);
134
135   if (stream->pending_caps)
136     gst_caps_unref (stream->pending_caps);
137
138   gst_clear_tag_list (&stream->pending_tags);
139   g_clear_pointer (&stream->stream_collection, gst_object_unref);
140
141   G_OBJECT_CLASS (parent_class)->finalize (object);
142 }
143
144 /**
145  * gst_adaptive_demux2_stream_add_track:
146  * @stream: A #GstAdaptiveDemux2Stream
147  * @track: (transfer none): A #GstAdaptiveDemuxTrack to assign to the @stream
148  *
149  * This function is called when a subclass knows of a target @track that this
150  * @stream can provide.
151  */
152 gboolean
153 gst_adaptive_demux2_stream_add_track (GstAdaptiveDemux2Stream * stream,
154     GstAdaptiveDemuxTrack * track)
155 {
156   g_return_val_if_fail (track != NULL, FALSE);
157
158   GST_DEBUG_OBJECT (stream->demux, "stream:%p track:%s", stream,
159       track->stream_id);
160   if (g_list_find (stream->tracks, track)) {
161     GST_DEBUG_OBJECT (stream->demux,
162         "track '%s' already handled by this stream", track->stream_id);
163     return FALSE;
164   }
165
166   if (stream->demux->buffering_low_watermark_time)
167     track->buffering_threshold = stream->demux->buffering_low_watermark_time;
168   else if (GST_CLOCK_TIME_IS_VALID (stream->recommended_buffering_threshold))
169     track->buffering_threshold =
170         MIN (10 * GST_SECOND, stream->recommended_buffering_threshold);
171   else {
172     /* Using a starting default, can be overriden later in
173      * ::update_stream_info() */
174     GST_DEBUG_OBJECT (stream,
175         "Setting default 10s buffering threshold on new track");
176     track->buffering_threshold = 10 * GST_SECOND;
177   }
178
179   stream->tracks =
180       g_list_append (stream->tracks, gst_adaptive_demux_track_ref (track));
181   if (stream->demux) {
182     g_assert (stream->period);
183     gst_adaptive_demux_period_add_track (stream->period, track);
184   }
185   return TRUE;
186 }
187
188 static gboolean
189 gst_adaptive_demux2_stream_next_download (GstAdaptiveDemux2Stream * stream);
190 static gboolean
191 gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream);
192 static void
193 gst_adaptive_demux2_stream_handle_playlist_eos (GstAdaptiveDemux2Stream *
194     stream);
195 static GstFlowReturn
196 gst_adaptive_demux2_stream_begin_download_uri (GstAdaptiveDemux * demux,
197     GstAdaptiveDemux2Stream * stream, const gchar * uri, gint64 start,
198     gint64 end);
199
200 #ifndef GST_DISABLE_GST_DEBUG
201 static const char *
202 uritype (GstAdaptiveDemux2Stream * s)
203 {
204   if (s->downloading_header)
205     return "header";
206   if (s->downloading_index)
207     return "index";
208   return "fragment";
209 }
210 #endif
211
212 /* Schedules another chunked download (returns TRUE) or FALSE if no more chunks */
213 static gboolean
214 schedule_another_chunk (GstAdaptiveDemux2Stream * stream)
215 {
216   GstAdaptiveDemux *demux = stream->demux;
217   DownloadRequest *request = stream->download_request;
218   GstFlowReturn ret;
219
220   gchar *uri = request->uri;
221   gint64 range_start = request->range_start;
222   gint64 range_end = request->range_end;
223   gint64 chunk_size;
224   gint64 chunk_end;
225
226   if (range_end == -1)
227     return FALSE;               /* This was a request to the end, no more to load */
228
229   /* The size of the request that just completed: */
230   chunk_size = range_end + 1 - range_start;
231
232   if (request->content_received < chunk_size)
233     return FALSE;               /* Short read - we're done */
234
235   /* Accumulate the data we just fetched, to figure out the next
236    * request start position and update the target chunk size from
237    * the updated stream fragment info */
238   range_start += chunk_size;
239   range_end = stream->fragment.range_end;
240   chunk_size = stream->fragment.chunk_size;
241
242   if (chunk_size == 0)
243     return FALSE;               /* Sub-class doesn't want another chunk */
244
245   /* HTTP ranges are inclusive for the end */
246   if (chunk_size != -1) {
247     chunk_end = range_start + chunk_size - 1;
248     if (range_end != -1 && range_end < chunk_end)
249       chunk_end = range_end;
250   } else {
251     chunk_end = range_end;
252   }
253
254   GST_DEBUG_OBJECT (stream,
255       "Starting next chunk %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT
256       " chunk_size %" G_GINT64_FORMAT, uri, range_start, chunk_end, chunk_size);
257
258   ret =
259       gst_adaptive_demux2_stream_begin_download_uri (demux, stream, uri,
260       range_start, chunk_end);
261   if (ret != GST_FLOW_OK) {
262     GST_DEBUG_OBJECT (stream,
263         "Stopping stream due to begin download failure - ret %s",
264         gst_flow_get_name (ret));
265     gst_adaptive_demux2_stream_stop (stream);
266     return FALSE;
267   }
268
269   return TRUE;
270 }
271
272 static void
273 drain_inactive_tracks (GstAdaptiveDemux2Stream * stream)
274 {
275   GList *iter;
276   GstAdaptiveDemux *demux = stream->demux;
277
278   TRACKS_LOCK (demux);
279   for (iter = stream->tracks; iter; iter = iter->next) {
280     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
281     if (!track->selected) {
282       gst_adaptive_demux_track_drain_to (track,
283           demux->priv->global_output_position);
284     }
285   }
286
287   TRACKS_UNLOCK (demux);
288 }
289
290 /* Called to complete a download, either due to failure or completion
291  * Should set up the next download if necessary */
292 static void
293 gst_adaptive_demux2_stream_finish_download (GstAdaptiveDemux2Stream *
294     stream, GstFlowReturn ret, GError * err)
295 {
296   GstAdaptiveDemux2StreamClass *klass =
297       GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
298
299   GST_DEBUG_OBJECT (stream,
300       "%s download finish: %d %s - err: %p", uritype (stream), ret,
301       gst_flow_get_name (ret), err);
302
303   stream->download_finished = TRUE;
304
305   /* finish_fragment might call gst_adaptive_demux2_stream_advance_fragment,
306    * which can look at the last_ret - so make sure it's stored before calling that.
307    * Also, for not-linked or other errors passed in that are going to make
308    * this stream stop, we'll need to store it */
309   stream->last_ret = ret;
310
311   if (err) {
312     g_clear_error (&stream->last_error);
313     stream->last_error = g_error_copy (err);
314   }
315
316   /* For actual errors, stop now, no need to call finish_fragment and get
317    * confused if it returns a non-error status, but if EOS was passed in,
318    * continue and check whether finish_fragment() says we've finished
319    * the whole manifest or just this fragment */
320   if (ret < 0 && ret != GST_FLOW_EOS) {
321     GST_INFO_OBJECT (stream,
322         "Stopping stream due to error ret %s", gst_flow_get_name (ret));
323     gst_adaptive_demux2_stream_stop (stream);
324     return;
325   }
326
327   /* Handle all the possible flow returns here: */
328   if (ret == GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC) {
329     /* We lost sync, seek back to live and return */
330     GST_WARNING_OBJECT (stream, "Lost sync when downloading");
331     gst_adaptive_demux_handle_lost_sync (stream->demux);
332     return;
333   } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_END_OF_FRAGMENT) {
334     /* The sub-class wants to stop the fragment immediately */
335     stream->fragment.finished = TRUE;
336     ret = klass->finish_fragment (stream);
337
338     GST_DEBUG_OBJECT (stream, "finish_fragment ret %d %s", ret,
339         gst_flow_get_name (ret));
340   } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_RESTART_FRAGMENT) {
341     GST_DEBUG_OBJECT (stream, "Restarting download as requested");
342     /* Just mark the fragment as finished */
343     stream->fragment.finished = TRUE;
344     ret = GST_FLOW_OK;
345   } else if (!klass->need_another_chunk || stream->fragment.chunk_size == -1
346       || !klass->need_another_chunk (stream)
347       || stream->fragment.chunk_size == 0) {
348     stream->fragment.finished = TRUE;
349     ret = klass->finish_fragment (stream);
350
351     GST_DEBUG_OBJECT (stream, "finish_fragment ret %d %s", ret,
352         gst_flow_get_name (ret));
353   } else if (stream->fragment.chunk_size != 0
354       && schedule_another_chunk (stream)) {
355     /* Another download has already begun, no need to queue anything below */
356     return;
357   }
358
359   /* For HLS, we might be enqueueing data into tracks that aren't
360    * selected. Drain those ones out */
361   drain_inactive_tracks (stream);
362
363   /* Now that we've called finish_fragment we can clear these flags the
364    * sub-class might have checked */
365   if (stream->downloading_header) {
366     stream->need_header = FALSE;
367     stream->downloading_header = FALSE;
368   } else if (stream->downloading_index) {
369     stream->need_index = FALSE;
370     stream->downloading_index = FALSE;
371     /* Restart the fragment again now that header + index were loaded
372      * so that get_fragment_info() will be called again */
373     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
374   } else {
375     /* Finishing a fragment data download. Try for another */
376     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
377   }
378
379   /* if GST_FLOW_EOS was passed in that means this download is finished,
380    * but it's the result returned from finish_fragment() we really care
381    * about, as that tells us if the manifest has run out of fragments
382    * to load */
383   if (ret == GST_FLOW_EOS) {
384     stream->last_ret = ret;
385
386     gst_adaptive_demux2_stream_handle_playlist_eos (stream);
387     return;
388   }
389
390   /* Now finally, if ret is anything other than success, we should stop this
391    * stream */
392   if (ret < 0) {
393     GST_DEBUG_OBJECT (stream,
394         "Stopping stream due to finish fragment ret %s",
395         gst_flow_get_name (ret));
396     gst_adaptive_demux2_stream_stop (stream);
397     return;
398   }
399
400   /* Clear the last_ret marker before starting a fresh download */
401   stream->last_ret = GST_FLOW_OK;
402
403   GST_LOG_OBJECT (stream, "Scheduling next_download() call");
404   stream->pending_cb_id =
405       gst_adaptive_demux_loop_call (stream->demux->priv->scheduler_task,
406       (GSourceFunc) gst_adaptive_demux2_stream_next_download,
407       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
408 }
409
410 /* Must be called from the scheduler context */
411 void
412 gst_adaptive_demux2_stream_parse_error (GstAdaptiveDemux2Stream * stream,
413     GError * err)
414 {
415   GstAdaptiveDemux *demux = stream->demux;
416
417   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
418     return;
419
420   downloadhelper_cancel_request (demux->download_helper,
421       stream->download_request);
422
423   /* cancellation is async, so recycle our download request to avoid races */
424   download_request_unref (stream->download_request);
425   stream->download_request = download_request_new ();
426
427   gst_adaptive_demux2_stream_finish_download (stream, GST_FLOW_CUSTOM_ERROR,
428       err);
429 }
430
431 static void
432 gst_adaptive_demux2_stream_prepare_segment (GstAdaptiveDemux2Stream * stream,
433     gboolean first_and_live)
434 {
435   GstAdaptiveDemux *demux = stream->demux;
436   GstClockTime period_start = gst_adaptive_demux_get_period_start_time (demux);
437   GstClockTime offset =
438       gst_adaptive_demux2_stream_get_presentation_offset (stream);
439
440   /* FIXME: Add a helper function to retrieve the demuxer segment
441    * using the SEGMENT_LOCK */
442   stream->parse_segment = demux->segment;
443
444   /* The demuxer segment is just built from seek events, but for each stream
445    * we have to adjust segments according to the current period and the
446    * stream specific presentation time offset.
447    *
448    * For each period, buffer timestamps start again from 0. Additionally the
449    * buffer timestamps are shifted by the stream specific presentation time
450    * offset, so the first buffer timestamp of a period is 0 + presentation
451    * time offset. If the stream contains timestamps itself, this is also
452    * supposed to be the presentation time stored inside the stream.
453    *
454    * The stream time over periods is supposed to be continuous, that is the
455    * buffer timestamp 0 + presentation time offset should map to the start
456    * time of the current period.
457    *
458    *
459    * The adjustment of the stream segments as such works the following.
460    *
461    * If the demuxer segment start is bigger than the period start, this
462    * means that we have to drop some media at the beginning of the current
463    * period, e.g. because a seek into the middle of the period has
464    * happened. The amount of media to drop is the difference between the
465    * period start and the demuxer segment start, and as each period starts
466    * again from 0, this difference is going to be the actual stream's
467    * segment start. As all timestamps of the stream are shifted by the
468    * presentation time offset, we will also have to move the segment start
469    * by that offset.
470    *
471    * Likewise, the demuxer segment stop value is adjusted in the same
472    * fashion.
473    *
474    * Now the running time and stream time at the stream's segment start has
475    * to be the one that is stored inside the demuxer's segment, which means
476    * that segment.base and segment.time have to be copied over (done just
477    * above)
478    *
479    *
480    * If the demuxer segment start is smaller than the period start time,
481    * this means that the whole period is inside the segment. As each period
482    * starts timestamps from 0, and additionally timestamps are shifted by
483    * the presentation time offset, the stream's first timestamp (and as such
484    * the stream's segment start) has to be the presentation time offset.
485    * The stream time at the segment start is supposed to be the stream time
486    * of the period start according to the demuxer segment, so the stream
487    * segment's time would be set to that. The same goes for the stream
488    * segment's base, which is supposed to be the running time of the period
489    * start according to the demuxer's segment.
490    *
491    * The same logic applies for negative rates with the segment stop and
492    * the period stop time (which gets clamped).
493    *
494    *
495    * For the first case where not the complete period is inside the segment,
496    * the segment time and base as calculated by the second case would be
497    * equivalent.
498    */
499   GST_DEBUG_OBJECT (stream, "Using demux segment %" GST_SEGMENT_FORMAT,
500       &stream->parse_segment);
501
502   GST_DEBUG_OBJECT (demux,
503       "period_start: %" GST_TIME_FORMAT " offset: %" GST_TIME_FORMAT,
504       GST_TIME_ARGS (period_start), GST_TIME_ARGS (offset));
505   /* note for readers:
506    * Since stream->parse_segment is initially a copy of demux->segment,
507    * only the values that need updating are modified below. */
508   if (first_and_live) {
509     /* If first and live, demuxer did seek to the current position already */
510     stream->parse_segment.start = demux->segment.start - period_start + offset;
511     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
512       stream->parse_segment.stop = demux->segment.stop - period_start + offset;
513     /* FIXME : Do we need to handle negative rates for this ? */
514     stream->parse_segment.position = stream->parse_segment.start;
515   } else if (demux->segment.start > period_start) {
516     /* seek within a period */
517     stream->parse_segment.start = demux->segment.start - period_start + offset;
518     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
519       stream->parse_segment.stop = demux->segment.stop - period_start + offset;
520     if (stream->parse_segment.rate >= 0)
521       stream->parse_segment.position = offset;
522     else
523       stream->parse_segment.position = stream->parse_segment.stop;
524   } else {
525     stream->parse_segment.start = offset;
526     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
527       stream->parse_segment.stop = demux->segment.stop - period_start + offset;
528     if (stream->parse_segment.rate >= 0) {
529       stream->parse_segment.position = offset;
530       stream->parse_segment.base =
531           gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
532           period_start);
533     } else {
534       stream->parse_segment.position = stream->parse_segment.stop;
535       stream->parse_segment.base =
536           gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
537           period_start + demux->segment.stop - demux->segment.start);
538     }
539     stream->parse_segment.time =
540         gst_segment_to_stream_time (&demux->segment, GST_FORMAT_TIME,
541         period_start);
542   }
543
544   stream->send_segment = TRUE;
545
546   GST_DEBUG_OBJECT (stream, "Prepared segment %" GST_SEGMENT_FORMAT,
547       &stream->parse_segment);
548 }
549
550 /* Segment lock hold */
551 static void
552 update_buffer_pts_and_demux_position_locked (GstAdaptiveDemux * demux,
553     GstAdaptiveDemux2Stream * stream, GstBuffer * buffer)
554 {
555   GstClockTimeDiff pos;
556
557   GST_DEBUG_OBJECT (stream, "stream->fragment.stream_time %" GST_STIME_FORMAT,
558       GST_STIME_ARGS (stream->fragment.stream_time));
559
560   pos = stream->fragment.stream_time;
561
562   if (GST_CLOCK_STIME_IS_VALID (pos)) {
563     GstClockTime offset =
564         gst_adaptive_demux2_stream_get_presentation_offset (stream);
565
566     pos += offset;
567
568     if (pos < 0) {
569       GST_WARNING_OBJECT (stream, "Clamping segment and buffer position to 0");
570       pos = 0;
571     }
572
573     GST_BUFFER_PTS (buffer) = pos;
574   } else {
575     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
576   }
577
578   GST_DEBUG_OBJECT (stream, "Buffer/stream position is now: %" GST_TIME_FORMAT,
579       GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
580 }
581
582 /* Must be called from the scheduler context */
583 GstFlowReturn
584 gst_adaptive_demux2_stream_push_buffer (GstAdaptiveDemux2Stream * stream,
585     GstBuffer * buffer)
586 {
587   GstAdaptiveDemux *demux = stream->demux;
588   GstFlowReturn ret = GST_FLOW_OK;
589   gboolean discont = FALSE;
590   /* Pending events */
591   GstEvent *pending_caps = NULL, *pending_segment = NULL, *pending_tags =
592       NULL, *stream_start = NULL, *buffer_gap = NULL;
593   GList *pending_events = NULL;
594
595   if (stream->compute_segment) {
596     gst_adaptive_demux2_stream_prepare_segment (stream, stream->first_and_live);
597     stream->compute_segment = FALSE;
598     stream->first_and_live = FALSE;
599   }
600
601   if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DROPPABLE)) {
602     GST_DEBUG_OBJECT (stream, "Creating gap event for droppable buffer");
603     buffer_gap =
604         gst_event_new_gap (GST_BUFFER_PTS (buffer),
605         GST_BUFFER_DURATION (buffer));
606   }
607
608   if (stream->first_fragment_buffer) {
609     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
610     if (demux->segment.rate < 0)
611       /* Set DISCONT flag for every first buffer in reverse playback mode
612        * as each fragment for its own has to be reversed */
613       discont = TRUE;
614     update_buffer_pts_and_demux_position_locked (demux, stream, buffer);
615     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
616
617     GST_LOG_OBJECT (stream, "Handling initial buffer %" GST_PTR_FORMAT, buffer);
618
619     /* Do we need to inject STREAM_START and SEGMENT events ?
620      *
621      * This can happen when a stream is restarted, and also when switching to a
622      * variant which needs a header (in which case downloading_header will be
623      * TRUE)
624      */
625     if (G_UNLIKELY (stream->send_segment || stream->downloading_header)) {
626       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
627       pending_segment = gst_event_new_segment (&stream->parse_segment);
628       gst_event_set_seqnum (pending_segment, demux->priv->segment_seqnum);
629       stream->send_segment = FALSE;
630       GST_DEBUG_OBJECT (stream, "Sending %" GST_PTR_FORMAT, pending_segment);
631       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
632       stream_start = gst_event_new_stream_start ("bogus");
633       if (demux->have_group_id)
634         gst_event_set_group_id (stream_start, demux->group_id);
635     }
636   } else {
637     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
638   }
639   stream->first_fragment_buffer = FALSE;
640
641   if (stream->discont) {
642     discont = TRUE;
643     stream->discont = FALSE;
644   }
645
646   if (discont) {
647     GST_DEBUG_OBJECT (stream, "Marking fragment as discontinuous");
648     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
649   } else {
650     GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
651   }
652
653   GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
654   GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
655   if (G_UNLIKELY (stream->pending_caps)) {
656     pending_caps = gst_event_new_caps (stream->pending_caps);
657     gst_caps_unref (stream->pending_caps);
658     stream->pending_caps = NULL;
659   }
660
661   if (G_UNLIKELY (stream->pending_tags)) {
662     GstTagList *tags = stream->pending_tags;
663
664     stream->pending_tags = NULL;
665
666     if (tags)
667       pending_tags = gst_event_new_tag (tags);
668   }
669   if (G_UNLIKELY (stream->pending_events)) {
670     pending_events = stream->pending_events;
671     stream->pending_events = NULL;
672   }
673
674   /* Do not push events or buffers holding the manifest lock */
675   if (G_UNLIKELY (stream_start)) {
676     GST_DEBUG_OBJECT (stream,
677         "Setting stream start: %" GST_PTR_FORMAT, stream_start);
678     gst_pad_send_event (stream->parsebin_sink, stream_start);
679   }
680   if (G_UNLIKELY (pending_caps)) {
681     GST_DEBUG_OBJECT (stream,
682         "Setting pending caps: %" GST_PTR_FORMAT, pending_caps);
683     gst_pad_send_event (stream->parsebin_sink, pending_caps);
684   }
685   if (G_UNLIKELY (pending_segment)) {
686     GST_DEBUG_OBJECT (stream,
687         "Sending pending seg: %" GST_PTR_FORMAT, pending_segment);
688     gst_pad_send_event (stream->parsebin_sink, pending_segment);
689   }
690   if (G_UNLIKELY (pending_tags)) {
691     GST_DEBUG_OBJECT (stream,
692         "Sending pending tags: %" GST_PTR_FORMAT, pending_tags);
693     gst_pad_send_event (stream->parsebin_sink, pending_tags);
694   }
695   while (pending_events != NULL) {
696     GstEvent *event = pending_events->data;
697
698     GST_DEBUG_OBJECT (stream, "Sending pending event: %" GST_PTR_FORMAT, event);
699     if (!gst_pad_send_event (stream->parsebin_sink, event))
700       GST_ERROR_OBJECT (stream, "Failed to send pending event");
701
702     pending_events = g_list_delete_link (pending_events, pending_events);
703   }
704
705   GST_DEBUG_OBJECT (stream,
706       "About to push buffer of size %" G_GSIZE_FORMAT " offset %"
707       G_GUINT64_FORMAT, gst_buffer_get_size (buffer),
708       GST_BUFFER_OFFSET (buffer));
709
710   ret = gst_pad_chain (stream->parsebin_sink, buffer);
711
712   if (buffer_gap) {
713     GST_DEBUG_OBJECT (stream, "Sending %" GST_PTR_FORMAT, buffer_gap);
714     gst_pad_send_event (stream->parsebin_sink, buffer_gap);
715   }
716
717   if (G_UNLIKELY (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)) {
718     GST_LOG_OBJECT (demux, "Stream was cancelled");
719     return GST_FLOW_FLUSHING;
720   }
721
722   GST_LOG_OBJECT (stream, "Push result: %d %s", ret, gst_flow_get_name (ret));
723
724   return ret;
725 }
726
727 static GstFlowReturn
728 gst_adaptive_demux2_stream_parse_buffer (GstAdaptiveDemux2Stream * stream,
729     GstBuffer * buffer)
730 {
731   GstAdaptiveDemux *demux = stream->demux;
732   GstAdaptiveDemux2StreamClass *klass =
733       GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
734   GstFlowReturn ret = GST_FLOW_OK;
735
736   /* do not make any changes if the stream is cancelled */
737   if (G_UNLIKELY (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)) {
738     GST_DEBUG_OBJECT (stream, "Stream was stopped. Aborting");
739     gst_buffer_unref (buffer);
740     return GST_FLOW_FLUSHING;
741   }
742
743   /* starting_fragment is set to TRUE at the beginning of
744    * _stream_download_fragment()
745    * /!\ If there is a header/index being downloaded, then this will
746    * be TRUE for the first one ... but FALSE for the remaining ones,
747    * including the *actual* fragment ! */
748   if (stream->starting_fragment) {
749     stream->starting_fragment = FALSE;
750     if (klass->start_fragment != NULL && !klass->start_fragment (stream))
751       return GST_FLOW_ERROR;
752   }
753
754   stream->download_total_bytes += gst_buffer_get_size (buffer);
755
756   GST_TRACE_OBJECT (stream,
757       "Received %s buffer of size %" G_GSIZE_FORMAT, uritype (stream),
758       gst_buffer_get_size (buffer));
759
760   ret = klass->data_received (stream, buffer);
761
762   if (ret != GST_FLOW_OK) {
763     GST_DEBUG_OBJECT (stream, "data_received returned %s",
764         gst_flow_get_name (ret));
765
766     if (ret == GST_FLOW_FLUSHING) {
767       /* do not make any changes if the stream is cancelled */
768       if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED) {
769         GST_DEBUG_OBJECT (stream, "Stream was stopped. Aborting");
770         return ret;
771       }
772     }
773
774     if (ret < GST_FLOW_EOS) {
775       GstEvent *eos = gst_event_new_eos ();
776       GST_ELEMENT_FLOW_ERROR (demux, ret);
777
778       GST_DEBUG_OBJECT (stream, "Pushing EOS to parser");
779
780       /* TODO push this on all pads */
781       gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
782       gst_pad_send_event (stream->parsebin_sink, eos);
783       ret = GST_FLOW_ERROR;
784
785       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_ERRORED;
786     }
787   }
788
789   return ret;
790 }
791
792 /* Calculate the low and high download buffering watermarks
793  * in time as MAX (low-watermark-time, low-watermark-fragments) and
794  * MIN (high-watermark-time, high-watermark-fragments) respectively
795  */
796 static void
797 calculate_track_thresholds (GstAdaptiveDemux * demux,
798     GstAdaptiveDemux2Stream * stream,
799     GstClockTime fragment_duration, GstClockTime * low_threshold,
800     GstClockTime * high_threshold)
801 {
802   GST_OBJECT_LOCK (demux);
803   *low_threshold = demux->buffering_low_watermark_fragments * fragment_duration;
804   if (*low_threshold == 0 ||
805       (demux->buffering_low_watermark_time != 0
806           && demux->buffering_low_watermark_time > *low_threshold)) {
807     *low_threshold = demux->buffering_low_watermark_time;
808   }
809
810   if (*low_threshold == 0) {
811     /* This implies both low level properties were 0, the default is 10s unless
812      * the subclass has specified a recommended buffering threshold */
813     *low_threshold = 10 * GST_SECOND;
814     if (GST_CLOCK_TIME_IS_VALID (stream->recommended_buffering_threshold))
815       *low_threshold =
816           MIN (stream->recommended_buffering_threshold, *low_threshold);
817   }
818
819   *high_threshold =
820       demux->buffering_high_watermark_fragments * fragment_duration;
821   if (*high_threshold == 0 || (demux->buffering_high_watermark_time != 0
822           && demux->buffering_high_watermark_time < *high_threshold)) {
823     *high_threshold = demux->buffering_high_watermark_time;
824   }
825
826   /* Make sure the low and high thresholds are less than the maximum buffering
827    * time */
828   if (*high_threshold == 0 ||
829       (demux->max_buffering_time != 0
830           && demux->max_buffering_time < *high_threshold)) {
831     *high_threshold = demux->max_buffering_time;
832   }
833
834   if (*low_threshold == 0 ||
835       (demux->max_buffering_time != 0
836           && demux->max_buffering_time < *low_threshold)) {
837     *low_threshold = demux->max_buffering_time;
838   }
839
840   /* Make sure the high threshold is higher than (or equal to) the low threshold.
841    * It's OK if they are the same, as the minimum download is 1 fragment */
842   if (*high_threshold == 0 ||
843       (*low_threshold != 0 && *low_threshold > *high_threshold)) {
844     *high_threshold = *low_threshold;
845   }
846
847   GST_OBJECT_UNLOCK (demux);
848 }
849
850 #define ABSDIFF(a,b) ((a) < (b) ? (b) - (a) : (a) - (b))
851 static gboolean
852 gst_adaptive_demux2_stream_wait_for_output_space (GstAdaptiveDemux * demux,
853     GstAdaptiveDemux2Stream * stream, GstClockTime fragment_duration)
854 {
855   gboolean need_to_wait = TRUE;
856   gboolean have_any_tracks = FALSE;
857   gboolean have_active_tracks = FALSE;
858   gboolean have_filled_inactive = FALSE;
859   gboolean update_buffering = FALSE;
860
861   GstClockTime low_threshold = 0, high_threshold = 0;
862   GList *iter;
863
864   calculate_track_thresholds (demux, stream, fragment_duration,
865       &low_threshold, &high_threshold);
866   GST_DEBUG_OBJECT (stream,
867       "Thresholds low:%" GST_TIME_FORMAT " high:%" GST_TIME_FORMAT
868       " recommended:%" GST_TIME_FORMAT, GST_TIME_ARGS (low_threshold),
869       GST_TIME_ARGS (high_threshold),
870       GST_TIME_ARGS (stream->recommended_buffering_threshold));
871
872   /* If there are no tracks at all, don't wait. If there are no active
873    * tracks, keep filling until at least one track is full. If there
874    * are active tracks, require that they are all full */
875   TRACKS_LOCK (demux);
876   for (iter = stream->tracks; iter; iter = iter->next) {
877     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
878
879     /* Update the buffering threshold if it changed by more than a second */
880     if (ABSDIFF (low_threshold, track->buffering_threshold) > GST_SECOND) {
881       GST_DEBUG_OBJECT (stream, "Updating threshold");
882       /* The buffering threshold for this track changed, make sure to
883        * re-check buffering status */
884       update_buffering = TRUE;
885       track->buffering_threshold = low_threshold;
886     }
887
888     have_any_tracks = TRUE;
889     if (track->active)
890       have_active_tracks = TRUE;
891
892     if (track->level_time < high_threshold) {
893       if (track->active) {
894         need_to_wait = FALSE;
895         GST_DEBUG_OBJECT (stream,
896             "track %s has level %" GST_TIME_FORMAT
897             " - needs more data (target %" GST_TIME_FORMAT
898             ") (fragment duration %" GST_TIME_FORMAT ")",
899             track->stream_id, GST_TIME_ARGS (track->level_time),
900             GST_TIME_ARGS (high_threshold), GST_TIME_ARGS (fragment_duration));
901         continue;
902       }
903     } else if (!track->active) {        /* track is over threshold and inactive */
904       have_filled_inactive = TRUE;
905     }
906
907     GST_DEBUG_OBJECT (stream,
908         "track %s active (%d) has level %" GST_TIME_FORMAT,
909         track->stream_id, track->active, GST_TIME_ARGS (track->level_time));
910   }
911
912   /* If there are no tracks, don't wait (we might need data to create them),
913    * or if there are active tracks that need more data to hit the threshold,
914    * don't wait. Otherwise it means all active tracks are full and we should wait */
915   if (!have_any_tracks) {
916     GST_DEBUG_OBJECT (stream, "no tracks created yet - not waiting");
917     need_to_wait = FALSE;
918   } else if (!have_active_tracks && !have_filled_inactive) {
919     GST_DEBUG_OBJECT (stream,
920         "have only inactive tracks that need more data - not waiting");
921     need_to_wait = FALSE;
922   }
923
924   if (need_to_wait) {
925     stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
926
927     for (iter = stream->tracks; iter; iter = iter->next) {
928       GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
929
930       GST_DEBUG_OBJECT (stream,
931           "Waiting for queued data on track %s to drop below %"
932           GST_TIME_FORMAT " (fragment duration %" GST_TIME_FORMAT ")",
933           track->stream_id, GST_TIME_ARGS (high_threshold),
934           GST_TIME_ARGS (fragment_duration));
935
936       /* we want to get woken up when the global output position reaches
937        * a point where the input is closer than "high_threshold" to needing
938        * output, so we can put more data in */
939       GstClockTimeDiff wakeup_time = track->input_time - high_threshold;
940
941       if (stream->next_input_wakeup_time == GST_CLOCK_STIME_NONE ||
942           wakeup_time < stream->next_input_wakeup_time) {
943         stream->next_input_wakeup_time = wakeup_time;
944
945         GST_DEBUG_OBJECT (stream,
946             "Track %s level %" GST_TIME_FORMAT ". Input at position %"
947             GST_TIME_FORMAT " next wakeup should be %" GST_TIME_FORMAT " now %"
948             GST_TIME_FORMAT, track->stream_id,
949             GST_TIME_ARGS (track->level_time),
950             GST_TIME_ARGS (track->input_time), GST_TIME_ARGS (wakeup_time),
951             GST_TIME_ARGS (demux->priv->global_output_position));
952       }
953     }
954
955     if (stream->next_input_wakeup_time != GST_CLOCK_TIME_NONE) {
956       GST_DEBUG_OBJECT (stream,
957           "Next input wakeup time is now %" GST_TIME_FORMAT,
958           GST_TIME_ARGS (stream->next_input_wakeup_time));
959
960       /* If this stream needs waking up sooner than any other current one,
961        * update the period wakeup time, which is what the output loop
962        * will check */
963       GstAdaptiveDemuxPeriod *period = stream->period;
964       if (period->next_input_wakeup_time == GST_CLOCK_STIME_NONE ||
965           period->next_input_wakeup_time > stream->next_input_wakeup_time) {
966         period->next_input_wakeup_time = stream->next_input_wakeup_time;
967       }
968     }
969   }
970
971   if (update_buffering) {
972     demux_update_buffering_locked (demux);
973     demux_post_buffering_locked (demux);
974   }
975
976   TRACKS_UNLOCK (demux);
977
978   return need_to_wait;
979 }
980
981 static GstAdaptiveDemuxTrack *
982 match_parsebin_to_track (GstAdaptiveDemux2Stream * stream, GstPad * pad)
983 {
984   GList *tmp;
985   GstAdaptiveDemuxTrack *found_track = NULL, *first_matched_track = NULL;
986   gint num_possible_tracks = 0;
987   GstStream *gst_stream;
988   const gchar *internal_stream_id;
989   GstStreamType stream_type;
990
991   gst_stream = gst_pad_get_stream (pad);
992
993   /* FIXME: Edward: Added assertion because I don't see in what cases we would
994    * end up with a pad from parsebin which wouldn't have an associated
995    * GstStream. */
996   g_assert (gst_stream);
997
998   internal_stream_id = gst_stream_get_stream_id (gst_stream);
999   stream_type = gst_stream_get_stream_type (gst_stream);
1000
1001   GST_DEBUG_OBJECT (pad,
1002       "Trying to match pad from parsebin with internal streamid %s and stream %"
1003       GST_PTR_FORMAT, GST_STR_NULL (internal_stream_id), gst_stream);
1004
1005   /* Try to match directly by the track's pending upstream_stream_id */
1006   for (tmp = stream->tracks; tmp; tmp = tmp->next) {
1007     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1008
1009     if (stream_type != GST_STREAM_TYPE_UNKNOWN && track->type != stream_type)
1010       continue;
1011
1012     GST_DEBUG_OBJECT (pad, "track upstream_stream_id: %s",
1013         track->upstream_stream_id);
1014
1015     if (first_matched_track == NULL)
1016       first_matched_track = track;
1017     num_possible_tracks++;
1018
1019     /* If this track has a desired upstream stream id, match on it */
1020     if (track->upstream_stream_id == NULL ||
1021         g_strcmp0 (track->upstream_stream_id, internal_stream_id)) {
1022       /* This is not the track for this pad */
1023       continue;
1024     }
1025
1026     /* Remove pending upstream id (we have matched it for the pending
1027      * stream_id) */
1028     g_free (track->upstream_stream_id);
1029     track->upstream_stream_id = NULL;
1030     found_track = track;
1031     break;
1032   }
1033
1034   if (found_track == NULL) {
1035     /* If we arrive here, it means the stream is switching pads after
1036      * the stream has already started running */
1037     /* No track is currently waiting for this particular stream id -
1038      * try and match an existing linked track. If there's only 1 possible,
1039      * take it. */
1040     if (num_possible_tracks == 1 && first_matched_track != NULL) {
1041       GST_LOG_OBJECT (pad, "Only one possible track to link to");
1042       found_track = first_matched_track;
1043     }
1044   }
1045
1046   if (found_track == NULL) {
1047     /* TODO: There are multiple possible tracks, need to match based
1048      * on language code and caps. Have you found a stream like this? */
1049     GST_FIXME_OBJECT (pad, "Need to match track based on caps and language");
1050   }
1051
1052   if (found_track != NULL) {
1053     if (!gst_pad_is_linked (found_track->sinkpad)) {
1054       GST_LOG_OBJECT (pad, "Linking to track pad %" GST_PTR_FORMAT,
1055           found_track->sinkpad);
1056
1057       if (gst_pad_link (pad, found_track->sinkpad) != GST_PAD_LINK_OK) {
1058         GST_ERROR_OBJECT (pad, "Couldn't connect to track sinkpad");
1059         /* FIXME : Do something if we can't link ? */
1060       }
1061     } else {
1062       /* Store pad as pending link */
1063       GST_LOG_OBJECT (pad,
1064           "Remembering pad to be linked when current pad is unlinked");
1065       g_assert (found_track->pending_srcpad == NULL);
1066       found_track->pending_srcpad = gst_object_ref (pad);
1067     }
1068   }
1069
1070   if (gst_stream)
1071     gst_object_unref (gst_stream);
1072
1073   return found_track;
1074 }
1075
1076 static void
1077 parsebin_pad_removed_cb (GstElement * parsebin, GstPad * pad,
1078     GstAdaptiveDemux2Stream * stream)
1079 {
1080   GList *iter;
1081   GST_DEBUG_OBJECT (stream, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1082
1083   /* Remove from pending source pad */
1084   TRACKS_LOCK (stream->demux);
1085   for (iter = stream->tracks; iter; iter = iter->next) {
1086     GstAdaptiveDemuxTrack *track = iter->data;
1087     if (track->pending_srcpad == pad) {
1088       gst_object_unref (track->pending_srcpad);
1089       track->pending_srcpad = NULL;
1090       break;
1091     }
1092   }
1093   TRACKS_UNLOCK (stream->demux);
1094 }
1095
1096 static void
1097 parsebin_pad_added_cb (GstElement * parsebin, GstPad * pad,
1098     GstAdaptiveDemux2Stream * stream)
1099 {
1100   if (!GST_PAD_IS_SRC (pad))
1101     return;
1102
1103   GST_DEBUG_OBJECT (stream, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1104
1105   if (!match_parsebin_to_track (stream, pad))
1106     GST_WARNING_OBJECT (pad, "Found no track to handle pad");
1107
1108   GST_DEBUG_OBJECT (stream->demux, "Done linking");
1109 }
1110
1111 static void
1112 parsebin_deep_element_added_cb (GstBin * parsebin, GstBin * unused,
1113     GstElement * element, GstAdaptiveDemux * demux)
1114 {
1115   if (G_OBJECT_TYPE (element) == tsdemux_type) {
1116     GST_DEBUG_OBJECT (demux, "Overriding tsdemux ignore-pcr to TRUE");
1117     g_object_set (element, "ignore-pcr", TRUE, NULL);
1118   }
1119 }
1120
1121 /* must be called with manifest_lock taken */
1122 static gboolean
1123 gst_adaptive_demux2_stream_create_parser (GstAdaptiveDemux2Stream * stream)
1124 {
1125   GstAdaptiveDemux *demux = stream->demux;
1126
1127   if (stream->parsebin == NULL) {
1128     GstEvent *event;
1129
1130     GST_DEBUG_OBJECT (demux, "Setting up new parsing source");
1131
1132     /* Workaround to detect if tsdemux is being used */
1133     if (tsdemux_type == 0) {
1134       GstElement *element = gst_element_factory_make ("tsdemux", NULL);
1135       if (element) {
1136         tsdemux_type = G_OBJECT_TYPE (element);
1137         gst_object_unref (element);
1138       }
1139     }
1140
1141     stream->parsebin = gst_element_factory_make ("parsebin", NULL);
1142     if (tsdemux_type)
1143       g_signal_connect (stream->parsebin, "deep-element-added",
1144           (GCallback) parsebin_deep_element_added_cb, demux);
1145     gst_bin_add (GST_BIN_CAST (demux), gst_object_ref (stream->parsebin));
1146     stream->parsebin_sink =
1147         gst_element_get_static_pad (stream->parsebin, "sink");
1148     stream->pad_added_id = g_signal_connect (stream->parsebin, "pad-added",
1149         G_CALLBACK (parsebin_pad_added_cb), stream);
1150     stream->pad_removed_id = g_signal_connect (stream->parsebin, "pad-removed",
1151         G_CALLBACK (parsebin_pad_removed_cb), stream);
1152
1153     event = gst_event_new_stream_start ("bogus");
1154     if (demux->have_group_id)
1155       gst_event_set_group_id (event, demux->group_id);
1156
1157     gst_pad_send_event (stream->parsebin_sink, event);
1158
1159     /* Not sure if these need to be outside the manifest lock: */
1160     gst_element_sync_state_with_parent (stream->parsebin);
1161     stream->last_status_code = 200;     /* default to OK */
1162   }
1163   return TRUE;
1164 }
1165
1166 static void
1167 on_download_cancellation (DownloadRequest * request, DownloadRequestState state,
1168     GstAdaptiveDemux2Stream * stream)
1169 {
1170 }
1171
1172 static void
1173 on_download_error (DownloadRequest * request, DownloadRequestState state,
1174     GstAdaptiveDemux2Stream * stream)
1175 {
1176   GstAdaptiveDemux *demux = stream->demux;
1177   guint last_status_code = request->status_code;
1178   gboolean live;
1179
1180   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING) {
1181     GST_DEBUG_OBJECT (stream, "Stream state changed to %d. Aborting",
1182         stream->state);
1183     return;
1184   }
1185
1186   stream->download_active = FALSE;
1187   stream->last_status_code = last_status_code;
1188
1189   GST_DEBUG_OBJECT (stream,
1190       "Download finished with error, request state %d http status %u, dc %d",
1191       request->state, last_status_code, stream->download_error_count);
1192
1193   live = gst_adaptive_demux_is_live (demux);
1194   if (((last_status_code / 100 == 4 && live)
1195           || last_status_code / 100 == 5)) {
1196     /* 4xx/5xx */
1197     /* if current position is before available start, switch to next */
1198     if (!gst_adaptive_demux2_stream_has_next_fragment (stream))
1199       goto flushing;
1200
1201     if (live) {
1202       gint64 range_start, range_stop;
1203
1204       if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
1205               &range_stop))
1206         goto flushing;
1207
1208       if (demux->segment.position < range_start) {
1209         GstFlowReturn ret;
1210
1211         GST_DEBUG_OBJECT (stream, "Retrying once with next segment");
1212         gst_adaptive_demux2_stream_finish_download (stream, GST_FLOW_EOS, NULL);
1213
1214         GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1215
1216         ret = gst_adaptive_demux2_stream_update_fragment_info (stream);
1217         GST_DEBUG_OBJECT (stream, "update_fragment_info ret: %s",
1218             gst_flow_get_name (ret));
1219
1220         if (ret == GST_FLOW_OK)
1221           goto again;
1222
1223       } else if (demux->segment.position > range_stop) {
1224         /* wait a bit to be in range, we don't have any locks at that point */
1225         GstClockTime wait_time =
1226             gst_adaptive_demux2_stream_get_fragment_waiting_time (stream);
1227         if (wait_time > 0) {
1228           GST_DEBUG_OBJECT (stream,
1229               "Download waiting for %" GST_TIME_FORMAT,
1230               GST_TIME_ARGS (wait_time));
1231           g_assert (stream->pending_cb_id == 0);
1232           GST_LOG_OBJECT (stream, "Scheduling delayed load_a_fragment() call");
1233           stream->pending_cb_id =
1234               gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1235               wait_time,
1236               (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1237               gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1238         }
1239       }
1240     }
1241
1242   flushing:
1243     if (stream->download_error_count >= MAX_DOWNLOAD_ERROR_COUNT) {
1244       /* looks like there is no way of knowing when a live stream has ended
1245        * Have to assume we are falling behind and cause a manifest reload */
1246       GST_DEBUG_OBJECT (stream, "Converting error of live stream to EOS");
1247       gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1248       return;
1249     }
1250   } else if (!gst_adaptive_demux2_stream_has_next_fragment (stream)) {
1251     /* If this is the last fragment, consider failures EOS and not actual
1252      * errors. Due to rounding errors in the durations, the last fragment
1253      * might not actually exist */
1254     GST_DEBUG_OBJECT (stream, "Converting error for last fragment to EOS");
1255     gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1256     return;
1257   } else {
1258     /* retry same segment */
1259     if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
1260       gst_adaptive_demux2_stream_error (stream);
1261       return;
1262     }
1263     goto again;
1264   }
1265
1266 again:
1267   /* wait a short time in case the server needs a bit to recover */
1268   GST_LOG_OBJECT (stream,
1269       "Scheduling delayed load_a_fragment() call to retry in 10 milliseconds");
1270   g_assert (stream->pending_cb_id == 0);
1271   stream->pending_cb_id = gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task, 10 * GST_MSECOND,  /* Retry in 10 ms */
1272       (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1273       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1274 }
1275
1276 static void
1277 update_stream_bitrate (GstAdaptiveDemux2Stream * stream,
1278     DownloadRequest * request)
1279 {
1280   GstClockTimeDiff last_download_duration;
1281   guint64 fragment_bytes_downloaded = request->content_received;
1282
1283   /* The stream last_download time tracks the full download time for now */
1284   stream->last_download_time =
1285       GST_CLOCK_DIFF (request->download_request_time,
1286       request->download_end_time);
1287
1288   /* Here we only track the time the data took to arrive and ignore request delay, so we can estimate bitrate */
1289   last_download_duration =
1290       GST_CLOCK_DIFF (request->download_start_time, request->download_end_time);
1291
1292   /* If the entire response arrived in the first buffer
1293    * though, include the request time to get a valid
1294    * bitrate estimate */
1295   if (last_download_duration < 2 * stream->last_download_time)
1296     last_download_duration = stream->last_download_time;
1297
1298   if (last_download_duration > 0) {
1299     stream->last_bitrate =
1300         gst_util_uint64_scale (fragment_bytes_downloaded,
1301         8 * GST_SECOND, last_download_duration);
1302
1303     GST_DEBUG_OBJECT (stream,
1304         "Updated stream bitrate. %" G_GUINT64_FORMAT
1305         " bytes. download time %" GST_TIME_FORMAT " bitrate %"
1306         G_GUINT64_FORMAT " bps", fragment_bytes_downloaded,
1307         GST_TIME_ARGS (last_download_duration), stream->last_bitrate);
1308   }
1309 }
1310
1311 static void
1312 on_download_progress (DownloadRequest * request, DownloadRequestState state,
1313     GstAdaptiveDemux2Stream * stream)
1314 {
1315   GstAdaptiveDemux *demux = stream->demux;
1316   GstBuffer *buffer = download_request_take_buffer (request);
1317
1318   if (buffer) {
1319     GstFlowReturn ret;
1320
1321     GST_DEBUG_OBJECT (stream,
1322         "Handling buffer of %" G_GSIZE_FORMAT
1323         " bytes of ongoing download progress - %" G_GUINT64_FORMAT " / %"
1324         G_GUINT64_FORMAT " bytes", gst_buffer_get_size (buffer),
1325         request->content_received, request->content_length);
1326
1327     /* Drop the request lock when parsing data. FIXME: Check and comment why this is needed */
1328     download_request_unlock (request);
1329     ret = gst_adaptive_demux2_stream_parse_buffer (stream, buffer);
1330     download_request_lock (request);
1331
1332     if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
1333       return;
1334
1335     if (ret != GST_FLOW_OK) {
1336       GST_DEBUG_OBJECT (stream,
1337           "Buffer parsing returned: %d %s. Aborting download", ret,
1338           gst_flow_get_name (ret));
1339
1340       if (!stream->downloading_header && !stream->downloading_index)
1341         update_stream_bitrate (stream, request);
1342
1343       downloadhelper_cancel_request (demux->download_helper, request);
1344
1345       /* cancellation is async, so recycle our download request to avoid races */
1346       download_request_unref (stream->download_request);
1347       stream->download_request = download_request_new ();
1348
1349       gst_adaptive_demux2_stream_finish_download (stream, ret, NULL);
1350     }
1351   }
1352 }
1353
1354 static void
1355 on_download_complete (DownloadRequest * request, DownloadRequestState state,
1356     GstAdaptiveDemux2Stream * stream)
1357 {
1358   GstFlowReturn ret = GST_FLOW_OK;
1359   GstBuffer *buffer;
1360
1361   stream->download_active = FALSE;
1362
1363   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING) {
1364     GST_DEBUG_OBJECT (stream, "Stream state changed to %d. Aborting",
1365         stream->state);
1366     return;
1367   }
1368
1369   GST_DEBUG_OBJECT (stream,
1370       "Stream %p %s download for %s is complete with state %d",
1371       stream, uritype (stream), request->uri, request->state);
1372
1373   /* Update bitrate for fragment downloads */
1374   if (!stream->downloading_header && !stream->downloading_index)
1375     update_stream_bitrate (stream, request);
1376
1377   buffer = download_request_take_buffer (request);
1378   if (buffer)
1379     ret = gst_adaptive_demux2_stream_parse_buffer (stream, buffer);
1380
1381   GST_DEBUG_OBJECT (stream,
1382       "%s download finished: %s ret %d %s. Stream state %d", uritype (stream),
1383       request->uri, ret, gst_flow_get_name (ret), stream->state);
1384
1385   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
1386     return;
1387
1388   g_assert (stream->pending_cb_id == 0);
1389   gst_adaptive_demux2_stream_finish_download (stream, ret, NULL);
1390 }
1391
1392 /* must be called from the scheduler context
1393  *
1394  * Will submit the request only, which will complete asynchronously
1395  */
1396 static GstFlowReturn
1397 gst_adaptive_demux2_stream_begin_download_uri (GstAdaptiveDemux * demux,
1398     GstAdaptiveDemux2Stream * stream, const gchar * uri, gint64 start,
1399     gint64 end)
1400 {
1401   DownloadRequest *request = stream->download_request;
1402
1403   GST_DEBUG_OBJECT (demux,
1404       "Downloading %s uri: %s, range:%" G_GINT64_FORMAT " - %" G_GINT64_FORMAT,
1405       uritype (stream), uri, start, end);
1406
1407   if (!gst_adaptive_demux2_stream_create_parser (stream))
1408     return GST_FLOW_ERROR;
1409
1410   /* Configure our download request */
1411   download_request_set_uri (request, uri, start, end);
1412
1413   if (stream->downloading_header || stream->downloading_index) {
1414     download_request_set_callbacks (request,
1415         (DownloadRequestEventCallback) on_download_complete,
1416         (DownloadRequestEventCallback) on_download_error,
1417         (DownloadRequestEventCallback) on_download_cancellation,
1418         (DownloadRequestEventCallback) NULL, stream);
1419   } else {
1420     download_request_set_callbacks (request,
1421         (DownloadRequestEventCallback) on_download_complete,
1422         (DownloadRequestEventCallback) on_download_error,
1423         (DownloadRequestEventCallback) on_download_cancellation,
1424         (DownloadRequestEventCallback) on_download_progress, stream);
1425   }
1426
1427   if (!downloadhelper_submit_request (demux->download_helper,
1428           demux->manifest_uri, DOWNLOAD_FLAG_NONE, request, NULL))
1429     return GST_FLOW_ERROR;
1430
1431   stream->download_active = TRUE;
1432
1433   return GST_FLOW_OK;
1434 }
1435
1436 /* must be called from the scheduler context */
1437 static GstFlowReturn
1438 gst_adaptive_demux2_stream_download_fragment (GstAdaptiveDemux2Stream * stream)
1439 {
1440   GstAdaptiveDemux *demux = stream->demux;
1441   GstAdaptiveDemux2StreamClass *klass =
1442       GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
1443   gchar *url = NULL;
1444
1445   /* FIXME :  */
1446   /* THERE ARE THREE DIFFERENT VARIABLES FOR THE "BEGINNING" OF A FRAGMENT ! */
1447   if (stream->starting_fragment) {
1448     GST_DEBUG_OBJECT (stream, "Downloading %s%s%s",
1449         stream->fragment.uri ? "FRAGMENT " : "",
1450         stream->need_header && stream->fragment.header_uri ? "HEADER " : "",
1451         stream->need_index && stream->fragment.index_uri ? "INDEX" : "");
1452
1453     if (stream->fragment.uri == NULL && stream->fragment.header_uri == NULL &&
1454         stream->fragment.index_uri == NULL)
1455       goto no_url_error;
1456
1457     stream->first_fragment_buffer = TRUE;
1458     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING;
1459   }
1460
1461   if (stream->need_header && stream->fragment.header_uri != NULL) {
1462
1463     /* Set the need_index flag when we start the header if we'll also need the index */
1464     stream->need_index = (stream->fragment.index_uri != NULL);
1465
1466     GST_DEBUG_OBJECT (stream, "Fetching header %s %" G_GINT64_FORMAT "-%"
1467         G_GINT64_FORMAT, stream->fragment.header_uri,
1468         stream->fragment.header_range_start, stream->fragment.header_range_end);
1469
1470     stream->downloading_header = TRUE;
1471
1472     return gst_adaptive_demux2_stream_begin_download_uri (demux, stream,
1473         stream->fragment.header_uri, stream->fragment.header_range_start,
1474         stream->fragment.header_range_end);
1475   }
1476
1477   /* check if we have an index */
1478   if (stream->need_index && stream->fragment.index_uri != NULL) {
1479     GST_DEBUG_OBJECT (stream,
1480         "Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
1481         stream->fragment.index_uri,
1482         stream->fragment.index_range_start, stream->fragment.index_range_end);
1483
1484     stream->downloading_index = TRUE;
1485
1486     return gst_adaptive_demux2_stream_begin_download_uri (demux, stream,
1487         stream->fragment.index_uri, stream->fragment.index_range_start,
1488         stream->fragment.index_range_end);
1489   }
1490
1491   url = stream->fragment.uri;
1492   GST_DEBUG_OBJECT (stream, "Got url '%s' for stream %p", url, stream);
1493   if (!url)
1494     return GST_FLOW_OK;
1495
1496   /* Download the actual fragment, either in chunks or in one go */
1497   stream->first_fragment_buffer = TRUE;
1498
1499   if (klass->need_another_chunk && klass->need_another_chunk (stream)
1500       && stream->fragment.chunk_size != 0) {
1501     /* Handle chunk downloading */
1502     gint64 range_start = stream->fragment.range_start;
1503     gint64 range_end = stream->fragment.range_end;
1504     gint chunk_size = stream->fragment.chunk_size;
1505     gint64 chunk_end;
1506
1507     /* HTTP ranges are inclusive for the end */
1508     if (chunk_size != -1) {
1509       chunk_end = range_start + chunk_size - 1;
1510       if (range_end != -1 && range_end < chunk_end)
1511         chunk_end = range_end;
1512     } else {
1513       chunk_end = range_end;
1514     }
1515
1516     GST_DEBUG_OBJECT (stream,
1517         "Starting chunked download %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
1518         url, range_start, chunk_end);
1519     return gst_adaptive_demux2_stream_begin_download_uri (demux, stream, url,
1520         range_start, chunk_end);
1521   }
1522
1523   /* regular single chunk download */
1524   stream->fragment.chunk_size = 0;
1525
1526   return gst_adaptive_demux2_stream_begin_download_uri (demux, stream, url,
1527       stream->fragment.range_start, stream->fragment.range_end);
1528
1529 no_url_error:
1530   {
1531     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1532         (_("Failed to get fragment URL.")),
1533         ("An error happened when getting fragment URL"));
1534     return GST_FLOW_ERROR;
1535   }
1536 }
1537
1538 static gboolean
1539 gst_adaptive_demux2_stream_push_event (GstAdaptiveDemux2Stream * stream,
1540     GstEvent * event)
1541 {
1542   gboolean ret = TRUE;
1543   GstPad *pad;
1544
1545   /* If there's a parsebin, push the event through it */
1546   if (stream->parsebin_sink != NULL) {
1547     pad = gst_object_ref (stream->parsebin_sink);
1548     GST_DEBUG_OBJECT (pad, "Pushing event %" GST_PTR_FORMAT, event);
1549     ret = gst_pad_send_event (pad, gst_event_ref (event));
1550     gst_object_unref (pad);
1551   }
1552
1553   /* If the event is EOS, ensure that all tracks are EOS. This catches
1554    * the case where the parsebin hasn't parsed anything yet (we switched
1555    * to a never before used track right near EOS, or it didn't parse enough
1556    * to create pads and be able to send EOS through to the tracks.
1557    *
1558    * We don't need to care about any other events
1559    */
1560   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
1561     GList *iter;
1562
1563     for (iter = stream->tracks; iter; iter = iter->next) {
1564       GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
1565       ret &= gst_pad_send_event (track->sinkpad, gst_event_ref (event));
1566     }
1567   }
1568
1569   gst_event_unref (event);
1570   return ret;
1571 }
1572
1573 static void
1574 gst_adaptive_demux2_stream_error (GstAdaptiveDemux2Stream * stream)
1575 {
1576   GstAdaptiveDemux *demux = stream->demux;
1577   GstMessage *msg;
1578   GstStructure *details;
1579
1580   details = gst_structure_new_empty ("details");
1581   gst_structure_set (details, "http-status-code", G_TYPE_UINT,
1582       stream->last_status_code, NULL);
1583
1584   stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_ERRORED;
1585
1586   if (stream->last_error) {
1587     gchar *debug = g_strdup_printf ("Error on stream %s",
1588         GST_OBJECT_NAME (stream));
1589     msg =
1590         gst_message_new_error_with_details (GST_OBJECT_CAST (demux),
1591         stream->last_error, debug, details);
1592     GST_ERROR_OBJECT (stream, "Download error: %s",
1593         stream->last_error->message);
1594     g_free (debug);
1595   } else {
1596     GError *err = g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_NOT_FOUND,
1597         _("Couldn't download fragments"));
1598     msg =
1599         gst_message_new_error_with_details (GST_OBJECT_CAST (demux), err,
1600         "Fragment downloading has failed consecutive times", details);
1601     g_error_free (err);
1602     GST_ERROR_OBJECT (stream,
1603         "Download error: Couldn't download fragments, too many failures");
1604   }
1605
1606   gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
1607 }
1608
1609 /* Called when a stream reaches the end of a playback segment */
1610 static void
1611 gst_adaptive_demux2_stream_end_of_manifest (GstAdaptiveDemux2Stream * stream)
1612 {
1613   GstAdaptiveDemux *demux = stream->demux;
1614   GstFlowReturn combined =
1615       gst_adaptive_demux_period_combine_stream_flows (demux->input_period);
1616
1617   GST_DEBUG_OBJECT (stream, "Combined flow %s", gst_flow_get_name (combined));
1618
1619   if (gst_adaptive_demux_has_next_period (demux)) {
1620     if (combined == GST_FLOW_EOS) {
1621       GST_DEBUG_OBJECT (stream, "Next period available, advancing");
1622       gst_adaptive_demux_advance_period (demux);
1623     } else {
1624       /* Ensure the 'has_next_period' flag is set on the period before
1625        * pushing EOS to the stream, so that the output loop knows not
1626        * to actually output the event */
1627       GST_DEBUG_OBJECT (stream, "Marking current period has a next one");
1628       demux->input_period->has_next_period = TRUE;
1629     }
1630   }
1631
1632   if (demux->priv->outputs) {
1633     GstEvent *eos = gst_event_new_eos ();
1634
1635     GST_DEBUG_OBJECT (stream, "Stream is EOS. Stopping.");
1636     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS;
1637
1638     gst_event_set_seqnum (eos, stream->demux->priv->segment_seqnum);
1639     gst_adaptive_demux2_stream_push_event (stream, eos);
1640   } else {
1641     GST_ERROR_OBJECT (demux, "Can't push EOS on non-exposed pad");
1642     gst_adaptive_demux2_stream_error (stream);
1643   }
1644 }
1645
1646 static gboolean
1647 gst_adaptive_demux2_stream_reload_manifest_cb (GstAdaptiveDemux2Stream * stream)
1648 {
1649   GstAdaptiveDemux *demux = stream->demux;
1650
1651   gboolean is_live = gst_adaptive_demux_is_live (demux);
1652
1653   stream->pending_cb_id = 0;
1654
1655   /* Refetch the playlist now after we waited */
1656   /* FIXME: Make this manifest update async and handle it on completion */
1657   if (!is_live && gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
1658     GST_DEBUG_OBJECT (demux, "Updated the playlist");
1659   }
1660
1661   /* We were called here from a timeout, so if the load function wants to loop
1662    * again, schedule an immediate callback but return G_SOURCE_REMOVE either
1663    * way */
1664   while (gst_adaptive_demux2_stream_next_download (stream));
1665
1666   return G_SOURCE_REMOVE;
1667 }
1668
1669 static gboolean
1670 gst_adaptive_demux2_stream_on_output_space_available_cb (GstAdaptiveDemux2Stream
1671     * stream)
1672 {
1673   /* If the state already moved on, the stream was stopped, or another track
1674    * already woke up and needed data */
1675   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE)
1676     return G_SOURCE_REMOVE;
1677
1678   GstAdaptiveDemux *demux = stream->demux;
1679   TRACKS_LOCK (demux);
1680
1681   GList *iter;
1682   for (iter = stream->tracks; iter; iter = iter->next) {
1683     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
1684
1685     /* We need to recompute the track's level_time value, as the
1686      * global output position may have advanced and reduced the
1687      * value, even without anything being dequeued yet */
1688     gst_adaptive_demux_track_update_level_locked (track);
1689
1690     GST_DEBUG_OBJECT (stream, "track %s woken level %" GST_TIME_FORMAT
1691         " input position %" GST_TIME_FORMAT " at %" GST_TIME_FORMAT,
1692         track->stream_id, GST_TIME_ARGS (track->level_time),
1693         GST_TIME_ARGS (track->input_time),
1694         GST_TIME_ARGS (demux->priv->global_output_position));
1695   }
1696   TRACKS_UNLOCK (demux);
1697
1698   while (gst_adaptive_demux2_stream_load_a_fragment (stream));
1699
1700   return G_SOURCE_REMOVE;
1701 }
1702
1703 void
1704 gst_adaptive_demux2_stream_on_output_space_available (GstAdaptiveDemux2Stream *
1705     stream)
1706 {
1707   GstAdaptiveDemux *demux = stream->demux;
1708
1709   stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
1710
1711   GST_LOG_OBJECT (stream, "Scheduling output_space_available() call");
1712
1713   gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
1714       (GSourceFunc) gst_adaptive_demux2_stream_on_output_space_available_cb,
1715       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1716 }
1717
1718 void
1719 gst_adaptive_demux2_stream_on_manifest_update (GstAdaptiveDemux2Stream * stream)
1720 {
1721   GstAdaptiveDemux *demux = stream->demux;
1722
1723   if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE)
1724     return;
1725
1726   g_assert (stream->pending_cb_id == 0);
1727
1728   GST_LOG_OBJECT (stream, "Scheduling load_a_fragment() call");
1729   stream->pending_cb_id =
1730       gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
1731       (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1732       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1733 }
1734
1735 static void
1736 gst_adaptive_demux2_stream_handle_playlist_eos (GstAdaptiveDemux2Stream *
1737     stream)
1738 {
1739   GstAdaptiveDemux *demux = stream->demux;
1740
1741   if (gst_adaptive_demux_is_live (demux) && (demux->segment.rate == 1.0
1742           || gst_adaptive_demux2_stream_in_live_seek_range (demux, stream))) {
1743
1744     if (!gst_adaptive_demux_has_next_period (demux)) {
1745       /* Wait only if we can ensure current manifest has been expired.
1746        * The meaning "we have next period" *WITH* EOS is that, current
1747        * period has been ended but we can continue to the next period */
1748       GST_DEBUG_OBJECT (stream,
1749           "Live playlist EOS - waiting for manifest update");
1750       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE;
1751       /* Clear the stream last_ret EOS state, since we're not actually EOS */
1752       if (stream->last_ret == GST_FLOW_EOS)
1753         stream->last_ret = GST_FLOW_OK;
1754       gst_adaptive_demux2_stream_wants_manifest_update (demux);
1755       return;
1756     }
1757   }
1758
1759   gst_adaptive_demux2_stream_end_of_manifest (stream);
1760 }
1761
1762 static gboolean
1763 gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream)
1764 {
1765   GstAdaptiveDemux *demux = stream->demux;
1766   gboolean live = gst_adaptive_demux_is_live (demux);
1767   GstFlowReturn ret = GST_FLOW_OK;
1768
1769   stream->pending_cb_id = 0;
1770
1771   GST_LOG_OBJECT (stream, "entering, state = %d.", stream->state);
1772
1773   switch (stream->state) {
1774     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART:
1775     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT:
1776     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_LIVE:
1777     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE:
1778     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE:
1779       /* Get information about the fragment to download */
1780       GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1781       ret = gst_adaptive_demux2_stream_update_fragment_info (stream);
1782       GST_DEBUG_OBJECT (stream,
1783           "Fragment info update result: %d %s", ret, gst_flow_get_name (ret));
1784
1785       if (ret == GST_FLOW_OK)
1786         stream->starting_fragment = TRUE;
1787       break;
1788     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING:
1789       break;
1790     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS:
1791       GST_ERROR_OBJECT (stream,
1792           "Unexpected stream state EOS. The stream should not be running now.");
1793       return FALSE;
1794     case GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED:
1795       /* The stream was stopped. Just finish up */
1796       return FALSE;
1797     default:
1798       GST_ERROR_OBJECT (stream, "Unexpected stream state %d", stream->state);
1799       g_assert_not_reached ();
1800       break;
1801   }
1802
1803   if (ret == GST_FLOW_OK) {
1804     /* Wait for room in the output tracks */
1805     if (gst_adaptive_demux2_stream_wait_for_output_space (demux, stream,
1806             stream->fragment.duration)) {
1807       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE;
1808       return FALSE;
1809     }
1810   }
1811
1812   if (ret == GST_FLOW_OK) {
1813     /* wait for live fragments to be available */
1814     if (live) {
1815       GstClockTime wait_time =
1816           gst_adaptive_demux2_stream_get_fragment_waiting_time (stream);
1817       if (wait_time > 0) {
1818         GST_DEBUG_OBJECT (stream,
1819             "Download waiting for %" GST_TIME_FORMAT,
1820             GST_TIME_ARGS (wait_time));
1821
1822         stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_LIVE;
1823
1824         GST_LOG_OBJECT (stream, "Scheduling delayed load_a_fragment() call");
1825         g_assert (stream->pending_cb_id == 0);
1826         stream->pending_cb_id =
1827             gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1828             wait_time, (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1829             gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1830         return FALSE;
1831       }
1832     }
1833
1834     if (gst_adaptive_demux2_stream_download_fragment (stream) != GST_FLOW_OK) {
1835       GST_ERROR_OBJECT (demux,
1836           "Failed to begin fragment download for stream %p", stream);
1837       return FALSE;
1838     }
1839   }
1840
1841   /* Cast to int avoids a compiler warning that
1842    * GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC is not in the GstFlowReturn enum */
1843   switch ((int) ret) {
1844     case GST_FLOW_OK:
1845       break;                    /* all is good, let's go */
1846     case GST_FLOW_EOS:
1847       GST_DEBUG_OBJECT (stream, "EOS, checking to stop download loop");
1848       stream->last_ret = ret;
1849       gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1850       return FALSE;
1851     case GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC:
1852       GST_DEBUG_OBJECT (stream, "Lost sync, asking reset to current position");
1853       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
1854       gst_adaptive_demux_handle_lost_sync (demux);
1855       return FALSE;
1856     case GST_FLOW_NOT_LINKED:
1857     {
1858       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS;
1859
1860       if (gst_adaptive_demux_period_combine_stream_flows (demux->input_period)
1861           == GST_FLOW_NOT_LINKED) {
1862         GST_ELEMENT_FLOW_ERROR (demux, ret);
1863       }
1864     }
1865       break;
1866
1867     case GST_FLOW_FLUSHING:
1868       /* Flushing is normal, the target track might have been unselected */
1869       GST_DEBUG_OBJECT (stream, "Got flushing return. Stopping callback.");
1870       return FALSE;
1871     default:
1872       if (ret <= GST_FLOW_ERROR) {
1873         GST_WARNING_OBJECT (demux, "Error while downloading fragment");
1874         if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
1875           gst_adaptive_demux2_stream_error (stream);
1876           return FALSE;
1877         }
1878
1879         g_clear_error (&stream->last_error);
1880
1881         /* First try to update the playlist for non-live playlists
1882          * in case the URIs have changed in the meantime. But only
1883          * try it the first time, after that we're going to wait a
1884          * a bit to not flood the server */
1885         if (stream->download_error_count == 1
1886             && !gst_adaptive_demux_is_live (demux)) {
1887           /* TODO hlsdemux had more options to this function (boolean and err) */
1888           if (gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
1889             /* Retry immediately, the playlist actually has changed */
1890             GST_DEBUG_OBJECT (demux, "Updated the playlist");
1891             return TRUE;
1892           }
1893         }
1894
1895         /* Wait half the fragment duration before retrying */
1896         GST_LOG_OBJECT (stream, "Scheduling delayed reload_manifest_cb() call");
1897         g_assert (stream->pending_cb_id == 0);
1898         stream->pending_cb_id =
1899             gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1900             stream->fragment.duration / 2,
1901             (GSourceFunc) gst_adaptive_demux2_stream_reload_manifest_cb,
1902             gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1903         return FALSE;
1904       }
1905       break;
1906   }
1907
1908   return FALSE;
1909 }
1910
1911 static gboolean
1912 gst_adaptive_demux2_stream_next_download (GstAdaptiveDemux2Stream * stream)
1913 {
1914   GstAdaptiveDemux *demux = stream->demux;
1915   gboolean end_of_manifest = FALSE;
1916
1917   GST_LOG_OBJECT (stream, "Looking for next download");
1918
1919   /* Restarting download, figure out new position
1920    * FIXME : Move this to a separate function ? */
1921   if (G_UNLIKELY (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART)) {
1922     GstClockTimeDiff stream_time = 0;
1923
1924     GST_DEBUG_OBJECT (stream, "Activating stream after restart");
1925
1926     if (stream->parsebin_sink != NULL) {
1927       /* If the parsebin already exists, we need to clear it out (if it doesn't,
1928        * this is the first time we've used this stream, so it's all good) */
1929       gst_adaptive_demux2_stream_push_event (stream,
1930           gst_event_new_flush_start ());
1931       gst_adaptive_demux2_stream_push_event (stream,
1932           gst_event_new_flush_stop (FALSE));
1933     }
1934
1935     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1936     stream_time = stream->start_position;
1937
1938     GST_DEBUG_OBJECT (stream, "Restarting stream at "
1939         "stream position %" GST_STIME_FORMAT, GST_STIME_ARGS (stream_time));
1940
1941     if (GST_CLOCK_STIME_IS_VALID (stream_time)) {
1942       /* TODO check return */
1943       gst_adaptive_demux2_stream_seek (stream, demux->segment.rate >= 0,
1944           0, stream_time, &stream_time);
1945       stream->current_position = stream->start_position;
1946
1947       GST_DEBUG_OBJECT (stream,
1948           "stream_time after restart seek: %" GST_STIME_FORMAT
1949           " position %" GST_STIME_FORMAT, GST_STIME_ARGS (stream_time),
1950           GST_STIME_ARGS (stream->current_position));
1951     }
1952
1953     /* Trigger (re)computation of the parsebin input segment */
1954     stream->compute_segment = TRUE;
1955
1956     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1957
1958     stream->discont = TRUE;
1959     stream->need_header = TRUE;
1960     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
1961   }
1962
1963   /* Check if we're done with our segment */
1964   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1965   if (demux->segment.rate > 0) {
1966     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop)
1967         && stream->current_position >= demux->segment.stop) {
1968       end_of_manifest = TRUE;
1969     }
1970   } else {
1971     if (GST_CLOCK_TIME_IS_VALID (demux->segment.start)
1972         && stream->current_position <= demux->segment.start) {
1973       end_of_manifest = TRUE;
1974     }
1975   }
1976   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1977
1978   if (end_of_manifest) {
1979     gst_adaptive_demux2_stream_end_of_manifest (stream);
1980     return FALSE;
1981   }
1982   return gst_adaptive_demux2_stream_load_a_fragment (stream);
1983 }
1984
1985 static gboolean
1986 gst_adaptive_demux2_stream_can_start (GstAdaptiveDemux2Stream * stream)
1987 {
1988   GstAdaptiveDemux2StreamClass *klass =
1989       GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
1990
1991   if (!klass->can_start)
1992     return TRUE;
1993   return klass->can_start (stream);
1994 }
1995
1996 /**
1997  * gst_adaptive_demux2_stream_start:
1998  * @stream: a #GstAdaptiveDemux2Stream
1999  *
2000  * Start the given @stream. Should be called by subclasses that previously
2001  * returned %FALSE in `GstAdaptiveDemux::stream_can_start()`
2002  */
2003 void
2004 gst_adaptive_demux2_stream_start (GstAdaptiveDemux2Stream * stream)
2005 {
2006   GstAdaptiveDemux *demux;
2007
2008   g_return_if_fail (stream && stream->demux);
2009
2010   demux = stream->demux;
2011
2012   if (stream->pending_cb_id != 0 || stream->download_active) {
2013     /* There is already something active / pending on this stream */
2014     GST_LOG_OBJECT (stream, "Stream already running");
2015     return;
2016   }
2017
2018   /* Some streams require a delayed start, i.e. they need more information
2019    * before they can actually be started */
2020   if (!gst_adaptive_demux2_stream_can_start (stream)) {
2021     GST_LOG_OBJECT (stream, "Stream will be started asynchronously");
2022     return;
2023   }
2024
2025   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS) {
2026     GST_LOG_OBJECT (stream, "Stream is EOS already");
2027     return;
2028   }
2029
2030   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED ||
2031       stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART) {
2032     GST_LOG_OBJECT (stream, "Activating stream. Current state %d",
2033         stream->state);
2034     stream->last_ret = GST_FLOW_OK;
2035
2036     if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)
2037       stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
2038   }
2039
2040   GST_LOG_OBJECT (stream, "Scheduling next_download() call");
2041   stream->pending_cb_id =
2042       gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2043       (GSourceFunc) gst_adaptive_demux2_stream_next_download,
2044       gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
2045 }
2046
2047 void
2048 gst_adaptive_demux2_stream_stop (GstAdaptiveDemux2Stream * stream)
2049 {
2050   GstAdaptiveDemux *demux = stream->demux;
2051
2052   GST_DEBUG_OBJECT (stream, "Stopping stream (from state %d)", stream->state);
2053   stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
2054
2055   if (stream->pending_cb_id != 0) {
2056     gst_adaptive_demux_loop_cancel_call (demux->priv->scheduler_task,
2057         stream->pending_cb_id);
2058     stream->pending_cb_id = 0;
2059   }
2060
2061   /* Cancel and drop the existing download request */
2062   downloadhelper_cancel_request (demux->download_helper,
2063       stream->download_request);
2064   download_request_unref (stream->download_request);
2065   stream->downloading_header = stream->downloading_index = FALSE;
2066   stream->download_request = download_request_new ();
2067   stream->download_active = FALSE;
2068
2069   stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
2070 }
2071
2072 gboolean
2073 gst_adaptive_demux2_stream_is_running (GstAdaptiveDemux2Stream * stream)
2074 {
2075   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)
2076     return FALSE;
2077   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART)
2078     return FALSE;
2079   if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS)
2080     return FALSE;
2081   return TRUE;
2082 }
2083
2084 gboolean
2085 gst_adaptive_demux2_stream_is_selected_locked (GstAdaptiveDemux2Stream * stream)
2086 {
2087   GList *tmp;
2088
2089   for (tmp = stream->tracks; tmp; tmp = tmp->next) {
2090     GstAdaptiveDemuxTrack *track = tmp->data;
2091     if (track->selected)
2092       return TRUE;
2093   }
2094
2095   return FALSE;
2096 }
2097
2098 /**
2099  * gst_adaptive_demux2_stream_is_selected:
2100  * @stream: A #GstAdaptiveDemux2Stream
2101  *
2102  * Returns: %TRUE if any of the tracks targetted by @stream is selected
2103  */
2104 gboolean
2105 gst_adaptive_demux2_stream_is_selected (GstAdaptiveDemux2Stream * stream)
2106 {
2107   gboolean ret;
2108
2109   g_return_val_if_fail (stream && stream->demux, FALSE);
2110
2111   TRACKS_LOCK (stream->demux);
2112   ret = gst_adaptive_demux2_stream_is_selected_locked (stream);
2113   TRACKS_UNLOCK (stream->demux);
2114
2115   return ret;
2116 }
2117
2118 /* Called from the scheduler task */
2119 GstClockTime
2120 gst_adaptive_demux2_stream_get_presentation_offset (GstAdaptiveDemux2Stream *
2121     stream)
2122 {
2123   GstAdaptiveDemux2StreamClass *klass =
2124       GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
2125
2126   if (klass->get_presentation_offset == NULL)
2127     return 0;
2128
2129   return klass->get_presentation_offset (stream);
2130 }
2131
2132 GstFlowReturn
2133 gst_adaptive_demux2_stream_update_fragment_info (GstAdaptiveDemux2Stream *
2134     stream)
2135 {
2136   GstAdaptiveDemux2StreamClass *klass =
2137       GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
2138   GstFlowReturn ret;
2139
2140   g_return_val_if_fail (klass->update_fragment_info != NULL, GST_FLOW_ERROR);
2141
2142   /* Make sure the sub-class will update bitrate, or else
2143    * we will later */
2144   stream->fragment.finished = FALSE;
2145
2146   GST_LOG_OBJECT (stream, "position %" GST_TIME_FORMAT,
2147       GST_TIME_ARGS (stream->current_position));
2148
2149   ret = klass->update_fragment_info (stream);
2150
2151   GST_LOG_OBJECT (stream, "ret:%s uri:%s",
2152       gst_flow_get_name (ret), stream->fragment.uri);
2153   if (ret == GST_FLOW_OK) {
2154     GST_LOG_OBJECT (stream,
2155         "stream_time %" GST_STIME_FORMAT " duration:%" GST_TIME_FORMAT,
2156         GST_STIME_ARGS (stream->fragment.stream_time),
2157         GST_TIME_ARGS (stream->fragment.duration));
2158     GST_LOG_OBJECT (stream,
2159         "range start:%" G_GINT64_FORMAT " end:%" G_GINT64_FORMAT,
2160         stream->fragment.range_start, stream->fragment.range_end);
2161   }
2162
2163   return ret;
2164 }
2165
2166 static GstFlowReturn
2167 gst_adaptive_demux2_stream_data_received_default (GstAdaptiveDemux2Stream *
2168     stream, GstBuffer * buffer)
2169 {
2170   return gst_adaptive_demux2_stream_push_buffer (stream, buffer);
2171 }
2172
2173 static GstFlowReturn
2174 gst_adaptive_demux2_stream_finish_fragment_default (GstAdaptiveDemux2Stream *
2175     stream)
2176 {
2177   /* No need to advance, this isn't a real fragment */
2178   if (G_UNLIKELY (stream->downloading_header || stream->downloading_index))
2179     return GST_FLOW_OK;
2180
2181   return gst_adaptive_demux2_stream_advance_fragment (stream,
2182       stream->fragment.duration);
2183 }
2184
2185 /* must be called from the scheduler */
2186 gboolean
2187 gst_adaptive_demux2_stream_has_next_fragment (GstAdaptiveDemux2Stream * stream)
2188 {
2189   GstAdaptiveDemux2StreamClass *klass =
2190       GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
2191   gboolean ret = TRUE;
2192
2193   if (klass->has_next_fragment)
2194     ret = klass->has_next_fragment (stream);
2195
2196   return ret;
2197 }
2198
2199 /* must be called from the scheduler */
2200 GstFlowReturn
2201 gst_adaptive_demux2_stream_seek (GstAdaptiveDemux2Stream * stream,
2202     gboolean forward, GstSeekFlags flags,
2203     GstClockTimeDiff ts, GstClockTimeDiff * final_ts)
2204 {
2205   GstAdaptiveDemux2StreamClass *klass =
2206       GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
2207
2208   if (klass->stream_seek)
2209     return klass->stream_seek (stream, forward, flags, ts, final_ts);
2210   return GST_FLOW_ERROR;
2211 }
2212
2213 static gboolean
2214 gst_adaptive_demux2_stream_select_bitrate (GstAdaptiveDemux *
2215     demux, GstAdaptiveDemux2Stream * stream, guint64 bitrate)
2216 {
2217   GstAdaptiveDemux2StreamClass *klass =
2218       GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
2219
2220   if (klass->select_bitrate)
2221     return klass->select_bitrate (stream, bitrate);
2222   return FALSE;
2223 }
2224
2225 GstClockTime
2226 gst_adaptive_demux2_stream_get_fragment_waiting_time (GstAdaptiveDemux2Stream *
2227     stream)
2228 {
2229   GstAdaptiveDemux2StreamClass *klass =
2230       GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
2231
2232   if (klass->get_fragment_waiting_time)
2233     return klass->get_fragment_waiting_time (stream);
2234   return 0;
2235 }
2236
2237 /* must be called from the scheduler */
2238 /* Called from: the ::finish_fragment() handlers when an *actual* fragment is
2239  * done
2240  *
2241  * @duration: Is the duration of the advancement starting from
2242  * stream->current_position which might not be the fragment duration after a
2243  * seek.
2244  */
2245 GstFlowReturn
2246 gst_adaptive_demux2_stream_advance_fragment (GstAdaptiveDemux2Stream * stream,
2247     GstClockTime duration)
2248 {
2249   if (stream->last_ret != GST_FLOW_OK)
2250     return stream->last_ret;
2251
2252   GstAdaptiveDemux2StreamClass *klass =
2253       GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
2254   GstAdaptiveDemux *demux = stream->demux;
2255   GstFlowReturn ret = GST_FLOW_OK;
2256
2257   g_assert (klass->advance_fragment != NULL);
2258
2259   GST_LOG_OBJECT (stream,
2260       "stream_time %" GST_STIME_FORMAT " duration:%" GST_TIME_FORMAT,
2261       GST_STIME_ARGS (stream->fragment.stream_time), GST_TIME_ARGS (duration));
2262
2263   stream->download_error_count = 0;
2264   g_clear_error (&stream->last_error);
2265
2266 #if 0
2267   /* FIXME - url has no indication of byte ranges for subsegments */
2268   /* FIXME: Reenable statistics sending? */
2269   gst_element_post_message (GST_ELEMENT_CAST (demux),
2270       gst_message_new_element (GST_OBJECT_CAST (demux),
2271           gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
2272               "manifest-uri", G_TYPE_STRING,
2273               demux->manifest_uri, "uri", G_TYPE_STRING,
2274               stream->fragment.uri, "fragment-start-time",
2275               GST_TYPE_CLOCK_TIME, stream->download_start_time,
2276               "fragment-stop-time", GST_TYPE_CLOCK_TIME,
2277               gst_util_get_timestamp (), "fragment-size", G_TYPE_UINT64,
2278               stream->download_total_bytes, "fragment-download-time",
2279               GST_TYPE_CLOCK_TIME, stream->last_download_time, NULL)));
2280 #endif
2281
2282   /* Don't update to the end of the segment if in reverse playback */
2283   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2284   if (GST_CLOCK_TIME_IS_VALID (duration) && demux->segment.rate > 0) {
2285     stream->parse_segment.position += duration;
2286     stream->current_position += duration;
2287
2288     GST_DEBUG_OBJECT (stream,
2289         "stream position now %" GST_TIME_FORMAT,
2290         GST_TIME_ARGS (stream->current_position));
2291   }
2292   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2293
2294   /* When advancing with a non 1.0 rate on live streams, we need to check
2295    * the live seeking range again to make sure we can still advance to
2296    * that position */
2297   if (demux->segment.rate != 1.0 && gst_adaptive_demux_is_live (demux)) {
2298     if (!gst_adaptive_demux2_stream_in_live_seek_range (demux, stream))
2299       ret = GST_FLOW_EOS;
2300     else
2301       ret = klass->advance_fragment (stream);
2302   } else if (gst_adaptive_demux_is_live (demux)
2303       || gst_adaptive_demux2_stream_has_next_fragment (stream)) {
2304     ret = klass->advance_fragment (stream);
2305   } else {
2306     ret = GST_FLOW_EOS;
2307   }
2308
2309   stream->download_start_time =
2310       GST_TIME_AS_USECONDS (gst_adaptive_demux2_get_monotonic_time (demux));
2311
2312   /* Always check if we need to switch bitrate on OK, or when live
2313    * (it's normal to have EOS on advancing in live when we hit the
2314    * end of the manifest) */
2315   if (ret == GST_FLOW_OK || gst_adaptive_demux_is_live (demux)) {
2316     GST_DEBUG_OBJECT (stream, "checking if stream requires bitrate change");
2317     if (gst_adaptive_demux2_stream_select_bitrate (demux, stream,
2318             gst_adaptive_demux2_stream_update_current_bitrate (stream))) {
2319       GST_DEBUG_OBJECT (stream, "Bitrate changed. Returning FLOW_SWITCH");
2320       stream->need_header = TRUE;
2321       ret = (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH;
2322     }
2323   }
2324
2325   stream->last_ret = ret;
2326   return stream->last_ret;
2327 }
2328
2329 /* TRACKS_LOCK held */
2330 static GstAdaptiveDemuxTrack *
2331 gst_adaptive_demux2_stream_find_track_of_type (GstAdaptiveDemux2Stream * stream,
2332     GstStreamType stream_type)
2333 {
2334   GList *iter;
2335
2336   for (iter = stream->tracks; iter; iter = iter->next) {
2337     GstAdaptiveDemuxTrack *track = iter->data;
2338
2339     if (track->type == stream_type)
2340       return track;
2341   }
2342
2343   return NULL;
2344 }
2345
2346 /* TRACKS lock held */
2347 static void
2348 gst_adaptive_demux2_stream_update_track_ids (GstAdaptiveDemux2Stream * stream)
2349 {
2350   guint i;
2351
2352   GST_DEBUG_OBJECT (stream, "Updating track information from collection");
2353
2354   for (i = 0; i < gst_stream_collection_get_size (stream->stream_collection);
2355       i++) {
2356     GstStream *gst_stream =
2357         gst_stream_collection_get_stream (stream->stream_collection, i);
2358     GstStreamType stream_type = gst_stream_get_stream_type (gst_stream);
2359     GstAdaptiveDemuxTrack *track;
2360
2361     if (stream_type == GST_STREAM_TYPE_UNKNOWN)
2362       continue;
2363     track = gst_adaptive_demux2_stream_find_track_of_type (stream, stream_type);
2364     if (!track) {
2365       GST_DEBUG_OBJECT (stream,
2366           "We don't have an existing track to handle stream %" GST_PTR_FORMAT,
2367           gst_stream);
2368       continue;
2369     }
2370
2371     if (track->upstream_stream_id)
2372       g_free (track->upstream_stream_id);
2373     track->upstream_stream_id =
2374         g_strdup (gst_stream_get_stream_id (gst_stream));
2375   }
2376
2377 }
2378
2379 static gboolean
2380 tags_have_language_info (GstTagList * tags)
2381 {
2382   const gchar *language = NULL;
2383
2384   if (tags == NULL)
2385     return FALSE;
2386
2387   if (gst_tag_list_peek_string_index (tags, GST_TAG_LANGUAGE_CODE, 0,
2388           &language))
2389     return TRUE;
2390   if (gst_tag_list_peek_string_index (tags, GST_TAG_LANGUAGE_NAME, 0,
2391           &language))
2392     return TRUE;
2393
2394   return FALSE;
2395 }
2396
2397 static gboolean
2398 can_handle_collection (GstAdaptiveDemux2Stream * stream,
2399     GstStreamCollection * collection)
2400 {
2401   guint i;
2402   guint nb_audio, nb_video, nb_text;
2403   gboolean have_audio_languages = TRUE;
2404   gboolean have_text_languages = TRUE;
2405
2406   nb_audio = nb_video = nb_text = 0;
2407
2408   for (i = 0; i < gst_stream_collection_get_size (collection); i++) {
2409     GstStream *gst_stream = gst_stream_collection_get_stream (collection, i);
2410     GstTagList *tags = gst_stream_get_tags (gst_stream);
2411
2412     GST_DEBUG_OBJECT (stream,
2413         "Internal collection stream #%d %" GST_PTR_FORMAT, i, gst_stream);
2414     switch (gst_stream_get_stream_type (gst_stream)) {
2415       case GST_STREAM_TYPE_AUDIO:
2416         have_audio_languages &= tags_have_language_info (tags);
2417         nb_audio++;
2418         break;
2419       case GST_STREAM_TYPE_VIDEO:
2420         nb_video++;
2421         break;
2422       case GST_STREAM_TYPE_TEXT:
2423         have_text_languages &= tags_have_language_info (tags);
2424         nb_text++;
2425         break;
2426       default:
2427         break;
2428     }
2429     if (tags)
2430       gst_tag_list_unref (tags);
2431   }
2432
2433   /* Check that we either have at most 1 of each track type, or that
2434    * we have language tags for each to tell which is which */
2435   if (nb_video > 1 ||
2436       (nb_audio > 1 && !have_audio_languages) ||
2437       (nb_text > 1 && !have_text_languages)) {
2438     GST_WARNING
2439         ("Collection can't be handled (nb_audio:%d, nb_video:%d, nb_text:%d)",
2440         nb_audio, nb_video, nb_text);
2441     return FALSE;
2442   }
2443
2444   return TRUE;
2445 }
2446
2447 /* Called from the demuxer when it receives a GstStreamCollection on the bus
2448  * for this stream. */
2449 /* TRACKS lock held */
2450 gboolean
2451 gst_adaptive_demux2_stream_handle_collection (GstAdaptiveDemux2Stream * stream,
2452     GstStreamCollection * collection, gboolean * had_pending_tracks)
2453 {
2454   g_assert (had_pending_tracks != NULL);
2455
2456   /* Check whether the collection is "sane" or not.
2457    *
2458    * In the context of adaptive streaming, we can only handle multiplexed
2459    * content where the output sub-streams can be matched reliably to the various
2460    * tracks. That is, only a single stream of each type, or if there are
2461    * multiple audio/subtitle tracks, they can be differentiated by language
2462    * (and possibly in the future by codec).
2463    */
2464   if (!can_handle_collection (stream, collection)) {
2465     return FALSE;
2466   }
2467
2468   /* Store the collection on the stream */
2469   gst_object_replace ((GstObject **) & stream->stream_collection,
2470       (GstObject *) collection);
2471
2472   /* If stream is marked as having pending_tracks, ask the subclass to
2473    * handle that and create the tracks now */
2474   if (stream->pending_tracks) {
2475     GstAdaptiveDemux2StreamClass *klass =
2476         GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
2477     g_assert (klass->create_tracks);
2478     klass->create_tracks (stream);
2479     stream->pending_tracks = FALSE;
2480     *had_pending_tracks = TRUE;
2481   } else {
2482     g_assert (stream->tracks);
2483
2484     /* Now we should have assigned tracks, match them to the
2485      * collection and update the pending upstream stream_id
2486      * for each of them based on the collection information. */
2487     gst_adaptive_demux2_stream_update_track_ids (stream);
2488   }
2489
2490   return TRUE;
2491 }
2492
2493 static guint64
2494 _update_average_bitrate (GstAdaptiveDemux2Stream * stream, guint64 new_bitrate)
2495 {
2496   gint index = stream->moving_index % NUM_LOOKBACK_FRAGMENTS;
2497
2498   stream->moving_bitrate -= stream->fragment_bitrates[index];
2499   stream->fragment_bitrates[index] = new_bitrate;
2500   stream->moving_bitrate += new_bitrate;
2501
2502   stream->moving_index += 1;
2503
2504   if (stream->moving_index > NUM_LOOKBACK_FRAGMENTS)
2505     return stream->moving_bitrate / NUM_LOOKBACK_FRAGMENTS;
2506   return stream->moving_bitrate / stream->moving_index;
2507 }
2508
2509 guint64
2510 gst_adaptive_demux2_stream_update_current_bitrate (GstAdaptiveDemux2Stream *
2511     stream)
2512 {
2513   guint64 average_bitrate;
2514   guint64 fragment_bitrate;
2515   guint connection_speed, min_bitrate, max_bitrate, target_download_rate;
2516
2517   fragment_bitrate = stream->last_bitrate;
2518   GST_DEBUG_OBJECT (stream, "Download bitrate is : %" G_GUINT64_FORMAT " bps",
2519       fragment_bitrate);
2520
2521   average_bitrate = _update_average_bitrate (stream, fragment_bitrate);
2522
2523   GST_INFO_OBJECT (stream,
2524       "last fragment bitrate was %" G_GUINT64_FORMAT, fragment_bitrate);
2525   GST_INFO_OBJECT (stream,
2526       "Last %u fragments average bitrate is %" G_GUINT64_FORMAT,
2527       NUM_LOOKBACK_FRAGMENTS, average_bitrate);
2528
2529   /* Conservative approach, make sure we don't upgrade too fast */
2530   stream->current_download_rate = MIN (average_bitrate, fragment_bitrate);
2531
2532   /* For the video stream, update the demuxer reported download
2533    * rate. FIXME: Move all bandwidth estimation to the
2534    * download helper and make it the demuxer's responsibility
2535    * to select the right set of things to download within
2536    * that bandwidth */
2537   GstAdaptiveDemux *demux = stream->demux;
2538   GST_OBJECT_LOCK (demux);
2539
2540   /* If this is stream containing our video, update the overall demuxer
2541    * reported bitrate and notify, to give the application a
2542    * chance to choose a new connection-bitrate */
2543   if ((stream->stream_type & GST_STREAM_TYPE_VIDEO) != 0) {
2544     demux->current_download_rate = stream->current_download_rate;
2545     GST_OBJECT_UNLOCK (demux);
2546     g_object_notify (G_OBJECT (demux), "current-bandwidth");
2547     GST_OBJECT_LOCK (demux);
2548   }
2549
2550   connection_speed = demux->connection_speed;
2551   min_bitrate = demux->min_bitrate;
2552   max_bitrate = demux->max_bitrate;
2553   GST_OBJECT_UNLOCK (demux);
2554
2555   if (connection_speed) {
2556     GST_LOG_OBJECT (stream, "connection-speed is set to %u kbps, using it",
2557         connection_speed / 1000);
2558     return connection_speed;
2559   }
2560
2561   /* No explicit connection_speed, so choose the new variant to use as a
2562    * fraction of the measured download rate */
2563   target_download_rate =
2564       CLAMP (stream->current_download_rate, 0,
2565       G_MAXUINT) * demux->bandwidth_target_ratio;
2566
2567   GST_DEBUG_OBJECT (stream, "Bitrate after target ratio limit (%0.2f): %u",
2568       demux->bandwidth_target_ratio, target_download_rate);
2569
2570 #if 0
2571   /* Debugging code, modulate the bitrate every few fragments */
2572   {
2573     static guint ctr = 0;
2574     if (ctr % 3 == 0) {
2575       GST_INFO_OBJECT (stream, "Halving reported bitrate for debugging");
2576       target_download_rate /= 2;
2577     }
2578     ctr++;
2579   }
2580 #endif
2581
2582   if (min_bitrate > 0 && target_download_rate < min_bitrate) {
2583     target_download_rate = min_bitrate;
2584     GST_LOG_OBJECT (stream, "Bitrate adjusted due to min-bitrate : %u bits/s",
2585         min_bitrate);
2586   }
2587
2588   if (max_bitrate > 0 && target_download_rate > max_bitrate) {
2589     target_download_rate = max_bitrate;
2590     GST_LOG_OBJECT (stream, "Bitrate adjusted due to max-bitrate : %u bits/s",
2591         max_bitrate);
2592   }
2593
2594   GST_DEBUG_OBJECT (stream, "Returning target download rate of %u bps",
2595       target_download_rate);
2596
2597   return target_download_rate;
2598 }
2599
2600 void
2601 gst_adaptive_demux2_stream_fragment_clear (GstAdaptiveDemux2StreamFragment * f)
2602 {
2603   g_free (f->uri);
2604   f->uri = NULL;
2605   f->range_start = 0;
2606   f->range_end = -1;
2607
2608   g_free (f->header_uri);
2609   f->header_uri = NULL;
2610   f->header_range_start = 0;
2611   f->header_range_end = -1;
2612
2613   g_free (f->index_uri);
2614   f->index_uri = NULL;
2615   f->index_range_start = 0;
2616   f->index_range_end = -1;
2617
2618   f->stream_time = GST_CLOCK_STIME_NONE;
2619   f->duration = GST_CLOCK_TIME_NONE;
2620   f->finished = FALSE;
2621 }