3 * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4 * Author: Thiago Santos <thiagoss@osg.samsung.com>
6 * Copyright (C) 2021-2022 Centricular Ltd
7 * Author: Edward Hervey <edward@centricular.com>
8 * Author: Jan Schmidt <jan@centricular.com>
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Library General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Library General Public License for more details.
20 * You should have received a copy of the GNU Library General Public
21 * License along with this library; if not, write to the
22 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23 * Boston, MA 02110-1301, USA.
30 #include "gstadaptivedemux.h"
31 #include "gstadaptivedemux-private.h"
33 #include <glib/gi18n-lib.h>
34 #include <gst/app/gstappsrc.h>
36 GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
37 #define GST_CAT_DEFAULT adaptivedemux2_debug
39 static void gst_adaptive_demux2_stream_finalize (GObject * object);
40 static void gst_adaptive_demux2_stream_error (GstAdaptiveDemux2Stream * stream);
42 #define gst_adaptive_demux2_stream_parent_class parent_class
43 G_DEFINE_ABSTRACT_TYPE (GstAdaptiveDemux2Stream, gst_adaptive_demux2_stream,
47 gst_adaptive_demux2_stream_class_init (GstAdaptiveDemux2StreamClass * klass)
49 GObjectClass *gobject_class = (GObjectClass *) klass;
51 gobject_class->finalize = gst_adaptive_demux2_stream_finalize;
54 static GType tsdemux_type = 0;
57 gst_adaptive_demux2_stream_init (GstAdaptiveDemux2Stream * stream)
59 stream->download_request = download_request_new ();
60 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
61 stream->last_ret = GST_FLOW_OK;
62 stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
64 stream->fragment_bitrates =
65 g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS);
67 stream->start_position = stream->current_position = GST_CLOCK_TIME_NONE;
69 gst_segment_init (&stream->parse_segment, GST_FORMAT_TIME);
72 /* must be called with manifest_lock taken.
73 * It will temporarily drop the manifest_lock in order to join the task.
74 * It will join only the old_streams (the demux->streams are joined by
75 * gst_adaptive_demux_stop_tasks before gst_adaptive_demux2_stream_free is
79 gst_adaptive_demux2_stream_finalize (GObject * object)
81 GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) object;
83 GST_LOG_OBJECT (object, "Finalizing");
85 if (stream->download_request)
86 download_request_unref (stream->download_request);
88 g_clear_error (&stream->last_error);
90 gst_adaptive_demux2_stream_fragment_clear (&stream->fragment);
92 if (stream->pending_events) {
93 g_list_free_full (stream->pending_events, (GDestroyNotify) gst_event_unref);
94 stream->pending_events = NULL;
97 if (stream->parsebin_sink) {
98 gst_object_unref (stream->parsebin_sink);
99 stream->parsebin_sink = NULL;
102 if (stream->pad_added_id)
103 g_signal_handler_disconnect (stream->parsebin, stream->pad_added_id);
104 if (stream->pad_removed_id)
105 g_signal_handler_disconnect (stream->parsebin, stream->pad_removed_id);
107 if (stream->parsebin != NULL) {
108 GST_LOG_OBJECT (stream, "Removing parsebin");
109 gst_bin_remove (GST_BIN_CAST (stream->demux), stream->parsebin);
110 gst_element_set_state (stream->parsebin, GST_STATE_NULL);
111 gst_object_unref (stream->parsebin);
112 stream->parsebin = NULL;
115 g_free (stream->fragment_bitrates);
117 g_list_free_full (stream->tracks,
118 (GDestroyNotify) gst_adaptive_demux_track_unref);
120 if (stream->pending_caps)
121 gst_caps_unref (stream->pending_caps);
123 g_clear_pointer (&stream->pending_tags, gst_tag_list_unref);
124 g_clear_pointer (&stream->stream_collection, gst_object_unref);
126 G_OBJECT_CLASS (parent_class)->finalize (object);
130 * gst_adaptive_demux2_stream_add_track:
131 * @stream: A #GstAdaptiveDemux2Stream
132 * @track: (transfer none): A #GstAdaptiveDemuxTrack to assign to the @stream
134 * This function is called when a subclass knows of a target @track that this
135 * @stream can provide.
138 gst_adaptive_demux2_stream_add_track (GstAdaptiveDemux2Stream * stream,
139 GstAdaptiveDemuxTrack * track)
141 g_return_val_if_fail (track != NULL, FALSE);
143 GST_DEBUG_OBJECT (stream->demux, "stream:%p track:%s", stream,
145 if (g_list_find (stream->tracks, track)) {
146 GST_DEBUG_OBJECT (stream->demux,
147 "track '%s' already handled by this stream", track->stream_id);
151 if (stream->demux->buffering_low_watermark_time)
152 track->buffering_threshold = stream->demux->buffering_low_watermark_time;
153 else if (GST_CLOCK_TIME_IS_VALID (stream->recommended_buffering_threshold))
154 track->buffering_threshold =
155 MIN (10 * GST_SECOND, stream->recommended_buffering_threshold);
157 /* Using a starting default, can be overriden later in
158 * ::update_stream_info() */
159 GST_DEBUG_OBJECT (stream,
160 "Setting default 10s buffering threshold on new track");
161 track->buffering_threshold = 10 * GST_SECOND;
165 g_list_append (stream->tracks, gst_adaptive_demux_track_ref (track));
167 g_assert (stream->period);
168 gst_adaptive_demux_period_add_track (stream->period, track);
174 gst_adaptive_demux2_stream_next_download (GstAdaptiveDemux2Stream * stream);
176 gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream);
178 gst_adaptive_demux2_stream_handle_playlist_eos (GstAdaptiveDemux2Stream *
181 gst_adaptive_demux2_stream_begin_download_uri (GstAdaptiveDemux * demux,
182 GstAdaptiveDemux2Stream * stream, const gchar * uri, gint64 start,
185 #ifndef GST_DISABLE_GST_DEBUG
187 uritype (GstAdaptiveDemux2Stream * s)
189 if (s->downloading_header)
191 if (s->downloading_index)
197 /* Schedules another chunked download (returns TRUE) or FALSE if no more chunks */
199 schedule_another_chunk (GstAdaptiveDemux2Stream * stream)
201 GstAdaptiveDemux *demux = stream->demux;
202 DownloadRequest *request = stream->download_request;
205 gchar *uri = request->uri;
206 gint64 range_start = request->range_start;
207 gint64 range_end = request->range_end;
212 return FALSE; /* This was a request to the end, no more to load */
214 /* The size of the request that just completed: */
215 chunk_size = range_end + 1 - range_start;
217 if (request->content_received < chunk_size)
218 return FALSE; /* Short read - we're done */
220 /* Accumulate the data we just fetched, to figure out the next
221 * request start position and update the target chunk size from
222 * the updated stream fragment info */
223 range_start += chunk_size;
224 range_end = stream->fragment.range_end;
225 chunk_size = stream->fragment.chunk_size;
228 return FALSE; /* Sub-class doesn't want another chunk */
230 /* HTTP ranges are inclusive for the end */
231 if (chunk_size != -1) {
232 chunk_end = range_start + chunk_size - 1;
233 if (range_end != -1 && range_end < chunk_end)
234 chunk_end = range_end;
236 chunk_end = range_end;
239 GST_DEBUG_OBJECT (stream,
240 "Starting next chunk %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT
241 " chunk_size %" G_GINT64_FORMAT, uri, range_start, chunk_end, chunk_size);
244 gst_adaptive_demux2_stream_begin_download_uri (demux, stream, uri,
245 range_start, chunk_end);
246 if (ret != GST_FLOW_OK) {
247 GST_DEBUG_OBJECT (stream,
248 "Stopping stream due to begin download failure - ret %s",
249 gst_flow_get_name (ret));
250 gst_adaptive_demux2_stream_stop (stream);
258 drain_inactive_tracks (GstAdaptiveDemux * demux,
259 GstAdaptiveDemux2Stream * stream)
264 for (iter = stream->tracks; iter; iter = iter->next) {
265 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
266 if (!track->selected) {
267 gst_adaptive_demux_track_drain_to (track,
268 demux->priv->global_output_position);
272 TRACKS_UNLOCK (demux);
275 /* Called to complete a download, either due to failure or completion
276 * Should set up the next download if necessary */
278 gst_adaptive_demux2_stream_finish_download (GstAdaptiveDemux2Stream *
279 stream, GstFlowReturn ret, GError * err)
281 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux);
282 GstAdaptiveDemux *demux = stream->demux;
284 GST_DEBUG_OBJECT (stream,
285 "%s download finish: %d %s - err: %p", uritype (stream), ret,
286 gst_flow_get_name (ret), err);
288 stream->download_finished = TRUE;
290 /* finish_fragment might call gst_adaptive_demux2_stream_advance_fragment,
291 * which can look at the last_ret - so make sure it's stored before calling that.
292 * Also, for not-linked or other errors passed in that are going to make
293 * this stream stop, we'll need to store it */
294 stream->last_ret = ret;
297 g_clear_error (&stream->last_error);
298 stream->last_error = g_error_copy (err);
301 /* For actual errors, stop now, no need to call finish_fragment and get
302 * confused if it returns a non-error status, but if EOS was passed in,
303 * continue and check whether finish_fragment() says we've finished
304 * the whole manifest or just this fragment */
305 if (ret < 0 && ret != GST_FLOW_EOS) {
306 GST_INFO_OBJECT (stream,
307 "Stopping stream due to error ret %s", gst_flow_get_name (ret));
308 gst_adaptive_demux2_stream_stop (stream);
312 /* Handle all the possible flow returns here: */
313 if (ret == GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC) {
314 /* We lost sync, seek back to live and return */
315 GST_WARNING_OBJECT (stream, "Lost sync when downloading");
316 gst_adaptive_demux_handle_lost_sync (demux);
318 } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_END_OF_FRAGMENT) {
319 /* The sub-class wants to stop the fragment immediately */
320 stream->fragment.finished = TRUE;
321 ret = klass->finish_fragment (demux, stream);
323 GST_DEBUG_OBJECT (stream, "finish_fragment ret %d %s", ret,
324 gst_flow_get_name (ret));
325 } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_RESTART_FRAGMENT) {
326 GST_DEBUG_OBJECT (stream, "Restarting download as requested");
327 /* Just mark the fragment as finished */
328 stream->fragment.finished = TRUE;
330 } else if (!klass->need_another_chunk || stream->fragment.chunk_size == -1
331 || !klass->need_another_chunk (stream)
332 || stream->fragment.chunk_size == 0) {
333 stream->fragment.finished = TRUE;
334 ret = klass->finish_fragment (stream->demux, stream);
336 GST_DEBUG_OBJECT (stream, "finish_fragment ret %d %s", ret,
337 gst_flow_get_name (ret));
338 } else if (stream->fragment.chunk_size != 0
339 && schedule_another_chunk (stream)) {
340 /* Another download has already begun, no need to queue anything below */
344 /* For HLS, we might be enqueueing data into tracks that aren't
345 * selected. Drain those ones out */
346 drain_inactive_tracks (stream->demux, stream);
348 /* Now that we've called finish_fragment we can clear these flags the
349 * sub-class might have checked */
350 if (stream->downloading_header) {
351 stream->need_header = FALSE;
352 stream->downloading_header = FALSE;
353 } else if (stream->downloading_index) {
354 stream->need_index = FALSE;
355 stream->downloading_index = FALSE;
356 /* Restart the fragment again now that header + index were loaded
357 * so that get_fragment_info() will be called again */
358 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
360 /* Finishing a fragment data download. Try for another */
361 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
364 /* if GST_FLOW_EOS was passed in that means this download is finished,
365 * but it's the result returned from finish_fragment() we really care
366 * about, as that tells us if the manifest has run out of fragments
368 if (ret == GST_FLOW_EOS) {
369 stream->last_ret = ret;
371 gst_adaptive_demux2_stream_handle_playlist_eos (stream);
375 /* Now finally, if ret is anything other than success, we should stop this
378 GST_DEBUG_OBJECT (stream,
379 "Stopping stream due to finish fragment ret %s",
380 gst_flow_get_name (ret));
381 gst_adaptive_demux2_stream_stop (stream);
385 /* Clear the last_ret marker before starting a fresh download */
386 stream->last_ret = GST_FLOW_OK;
388 GST_LOG_OBJECT (stream, "Scheduling next_download() call");
389 stream->pending_cb_id =
390 gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
391 (GSourceFunc) gst_adaptive_demux2_stream_next_download,
392 gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
395 /* Must be called from the scheduler context */
397 gst_adaptive_demux2_stream_parse_error (GstAdaptiveDemux2Stream * stream,
400 GstAdaptiveDemux *demux = stream->demux;
402 if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
405 downloadhelper_cancel_request (demux->download_helper,
406 stream->download_request);
408 /* cancellation is async, so recycle our download request to avoid races */
409 download_request_unref (stream->download_request);
410 stream->download_request = download_request_new ();
412 gst_adaptive_demux2_stream_finish_download (stream, GST_FLOW_CUSTOM_ERROR,
417 gst_adaptive_demux2_stream_prepare_segment (GstAdaptiveDemux * demux,
418 GstAdaptiveDemux2Stream * stream, gboolean first_and_live)
420 GstClockTime period_start = gst_adaptive_demux_get_period_start_time (demux);
421 GstClockTime offset =
422 gst_adaptive_demux2_stream_get_presentation_offset (demux, stream);
424 stream->parse_segment = demux->segment;
426 /* The demuxer segment is just built from seek events, but for each stream
427 * we have to adjust segments according to the current period and the
428 * stream specific presentation time offset.
430 * For each period, buffer timestamps start again from 0. Additionally the
431 * buffer timestamps are shifted by the stream specific presentation time
432 * offset, so the first buffer timestamp of a period is 0 + presentation
433 * time offset. If the stream contains timestamps itself, this is also
434 * supposed to be the presentation time stored inside the stream.
436 * The stream time over periods is supposed to be continuous, that is the
437 * buffer timestamp 0 + presentation time offset should map to the start
438 * time of the current period.
441 * The adjustment of the stream segments as such works the following.
443 * If the demuxer segment start is bigger than the period start, this
444 * means that we have to drop some media at the beginning of the current
445 * period, e.g. because a seek into the middle of the period has
446 * happened. The amount of media to drop is the difference between the
447 * period start and the demuxer segment start, and as each period starts
448 * again from 0, this difference is going to be the actual stream's
449 * segment start. As all timestamps of the stream are shifted by the
450 * presentation time offset, we will also have to move the segment start
453 * Likewise, the demuxer segment stop value is adjusted in the same
456 * Now the running time and stream time at the stream's segment start has
457 * to be the one that is stored inside the demuxer's segment, which means
458 * that segment.base and segment.time have to be copied over (done just
462 * If the demuxer segment start is smaller than the period start time,
463 * this means that the whole period is inside the segment. As each period
464 * starts timestamps from 0, and additionally timestamps are shifted by
465 * the presentation time offset, the stream's first timestamp (and as such
466 * the stream's segment start) has to be the presentation time offset.
467 * The stream time at the segment start is supposed to be the stream time
468 * of the period start according to the demuxer segment, so the stream
469 * segment's time would be set to that. The same goes for the stream
470 * segment's base, which is supposed to be the running time of the period
471 * start according to the demuxer's segment.
473 * The same logic applies for negative rates with the segment stop and
474 * the period stop time (which gets clamped).
477 * For the first case where not the complete period is inside the segment,
478 * the segment time and base as calculated by the second case would be
481 GST_DEBUG_OBJECT (demux, "Using demux segment %" GST_SEGMENT_FORMAT,
483 GST_DEBUG_OBJECT (demux,
484 "period_start: %" GST_TIME_FORMAT " offset: %" GST_TIME_FORMAT,
485 GST_TIME_ARGS (period_start), GST_TIME_ARGS (offset));
487 * Since stream->parse_segment is initially a copy of demux->segment,
488 * only the values that need updating are modified below. */
489 if (first_and_live) {
490 /* If first and live, demuxer did seek to the current position already */
491 stream->parse_segment.start = demux->segment.start - period_start + offset;
492 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
493 stream->parse_segment.stop = demux->segment.stop - period_start + offset;
494 /* FIXME : Do we need to handle negative rates for this ? */
495 stream->parse_segment.position = stream->parse_segment.start;
496 } else if (demux->segment.start > period_start) {
497 /* seek within a period */
498 stream->parse_segment.start = demux->segment.start - period_start + offset;
499 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
500 stream->parse_segment.stop = demux->segment.stop - period_start + offset;
501 if (stream->parse_segment.rate >= 0)
502 stream->parse_segment.position = offset;
504 stream->parse_segment.position = stream->parse_segment.stop;
506 stream->parse_segment.start = offset;
507 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
508 stream->parse_segment.stop = demux->segment.stop - period_start + offset;
509 if (stream->parse_segment.rate >= 0) {
510 stream->parse_segment.position = offset;
511 stream->parse_segment.base =
512 gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
515 stream->parse_segment.position = stream->parse_segment.stop;
516 stream->parse_segment.base =
517 gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
518 period_start + demux->segment.stop - demux->segment.start);
520 stream->parse_segment.time =
521 gst_segment_to_stream_time (&demux->segment, GST_FORMAT_TIME,
525 stream->send_segment = TRUE;
527 GST_DEBUG_OBJECT (stream, "Prepared segment %" GST_SEGMENT_FORMAT,
528 &stream->parse_segment);
531 /* Segment lock hold */
533 update_buffer_pts_and_demux_position_locked (GstAdaptiveDemux * demux,
534 GstAdaptiveDemux2Stream * stream, GstBuffer * buffer)
536 GstClockTimeDiff pos;
538 GST_DEBUG_OBJECT (stream, "stream->fragment.stream_time %" GST_STIME_FORMAT,
539 GST_STIME_ARGS (stream->fragment.stream_time));
541 pos = stream->fragment.stream_time;
543 if (GST_CLOCK_STIME_IS_VALID (pos)) {
544 GstClockTime offset =
545 gst_adaptive_demux2_stream_get_presentation_offset (demux, stream);
550 GST_WARNING_OBJECT (stream, "Clamping segment and buffer position to 0");
554 GST_BUFFER_PTS (buffer) = pos;
556 GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
559 GST_DEBUG_OBJECT (stream, "Buffer/stream position is now: %" GST_TIME_FORMAT,
560 GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
563 /* Must be called from the scheduler context */
565 gst_adaptive_demux2_stream_push_buffer (GstAdaptiveDemux2Stream * stream,
568 GstAdaptiveDemux *demux = stream->demux;
569 GstFlowReturn ret = GST_FLOW_OK;
570 gboolean discont = FALSE;
572 GstEvent *pending_caps = NULL, *pending_segment = NULL, *pending_tags =
573 NULL, *stream_start = NULL, *buffer_gap = NULL;
574 GList *pending_events = NULL;
576 if (stream->compute_segment) {
577 gst_adaptive_demux2_stream_prepare_segment (demux, stream,
578 stream->first_and_live);
579 stream->compute_segment = FALSE;
580 stream->first_and_live = FALSE;
583 if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DROPPABLE)) {
584 GST_DEBUG_OBJECT (stream, "Creating gap event for droppable buffer");
586 gst_event_new_gap (GST_BUFFER_PTS (buffer),
587 GST_BUFFER_DURATION (buffer));
590 if (stream->first_fragment_buffer) {
591 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
592 if (demux->segment.rate < 0)
593 /* Set DISCONT flag for every first buffer in reverse playback mode
594 * as each fragment for its own has to be reversed */
596 update_buffer_pts_and_demux_position_locked (demux, stream, buffer);
597 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
599 GST_LOG_OBJECT (stream, "Handling initial buffer %" GST_PTR_FORMAT, buffer);
601 /* Do we need to inject STREAM_START and SEGMENT events ?
603 * This can happen when a stream is restarted, and also when switching to a
604 * variant which needs a header (in which case downloading_header will be
607 if (G_UNLIKELY (stream->send_segment || stream->downloading_header)) {
608 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
609 pending_segment = gst_event_new_segment (&stream->parse_segment);
610 gst_event_set_seqnum (pending_segment, demux->priv->segment_seqnum);
611 stream->send_segment = FALSE;
612 GST_DEBUG_OBJECT (stream, "Sending %" GST_PTR_FORMAT, pending_segment);
613 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
614 stream_start = gst_event_new_stream_start ("bogus");
615 if (demux->have_group_id)
616 gst_event_set_group_id (stream_start, demux->group_id);
619 GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
621 stream->first_fragment_buffer = FALSE;
623 if (stream->discont) {
625 stream->discont = FALSE;
629 GST_DEBUG_OBJECT (stream, "Marking fragment as discontinuous");
630 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
632 GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
635 GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
636 GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
637 if (G_UNLIKELY (stream->pending_caps)) {
638 pending_caps = gst_event_new_caps (stream->pending_caps);
639 gst_caps_unref (stream->pending_caps);
640 stream->pending_caps = NULL;
643 if (G_UNLIKELY (stream->pending_tags)) {
644 GstTagList *tags = stream->pending_tags;
646 stream->pending_tags = NULL;
649 pending_tags = gst_event_new_tag (tags);
651 if (G_UNLIKELY (stream->pending_events)) {
652 pending_events = stream->pending_events;
653 stream->pending_events = NULL;
656 /* Do not push events or buffers holding the manifest lock */
657 if (G_UNLIKELY (stream_start)) {
658 GST_DEBUG_OBJECT (stream,
659 "Setting stream start: %" GST_PTR_FORMAT, stream_start);
660 gst_pad_send_event (stream->parsebin_sink, stream_start);
662 if (G_UNLIKELY (pending_caps)) {
663 GST_DEBUG_OBJECT (stream,
664 "Setting pending caps: %" GST_PTR_FORMAT, pending_caps);
665 gst_pad_send_event (stream->parsebin_sink, pending_caps);
667 if (G_UNLIKELY (pending_segment)) {
668 GST_DEBUG_OBJECT (stream,
669 "Sending pending seg: %" GST_PTR_FORMAT, pending_segment);
670 gst_pad_send_event (stream->parsebin_sink, pending_segment);
672 if (G_UNLIKELY (pending_tags)) {
673 GST_DEBUG_OBJECT (stream,
674 "Sending pending tags: %" GST_PTR_FORMAT, pending_tags);
675 gst_pad_send_event (stream->parsebin_sink, pending_tags);
677 while (pending_events != NULL) {
678 GstEvent *event = pending_events->data;
680 GST_DEBUG_OBJECT (stream, "Sending pending event: %" GST_PTR_FORMAT, event);
681 if (!gst_pad_send_event (stream->parsebin_sink, event))
682 GST_ERROR_OBJECT (stream, "Failed to send pending event");
684 pending_events = g_list_delete_link (pending_events, pending_events);
687 GST_DEBUG_OBJECT (stream,
688 "About to push buffer of size %" G_GSIZE_FORMAT " offset %"
689 G_GUINT64_FORMAT, gst_buffer_get_size (buffer),
690 GST_BUFFER_OFFSET (buffer));
692 ret = gst_pad_chain (stream->parsebin_sink, buffer);
695 GST_DEBUG_OBJECT (stream, "Sending %" GST_PTR_FORMAT, buffer_gap);
696 gst_pad_send_event (stream->parsebin_sink, buffer_gap);
699 if (G_UNLIKELY (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)) {
700 GST_LOG_OBJECT (demux, "Stream was cancelled");
701 return GST_FLOW_FLUSHING;
704 GST_LOG_OBJECT (stream, "Push result: %d %s", ret, gst_flow_get_name (ret));
710 gst_adaptive_demux2_stream_parse_buffer (GstAdaptiveDemux2Stream * stream,
713 GstAdaptiveDemux *demux = stream->demux;
714 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
715 GstFlowReturn ret = GST_FLOW_OK;
717 /* do not make any changes if the stream is cancelled */
718 if (G_UNLIKELY (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)) {
719 GST_DEBUG_OBJECT (stream, "Stream was stopped. Aborting");
720 gst_buffer_unref (buffer);
721 return GST_FLOW_FLUSHING;
724 /* starting_fragment is set to TRUE at the beginning of
725 * _stream_download_fragment()
726 * /!\ If there is a header/index being downloaded, then this will
727 * be TRUE for the first one ... but FALSE for the remaining ones,
728 * including the *actual* fragment ! */
729 if (stream->starting_fragment) {
730 stream->starting_fragment = FALSE;
731 if (klass->start_fragment != NULL && !klass->start_fragment (demux, stream))
732 return GST_FLOW_ERROR;
735 stream->download_total_bytes += gst_buffer_get_size (buffer);
737 GST_TRACE_OBJECT (stream,
738 "Received %s buffer of size %" G_GSIZE_FORMAT, uritype (stream),
739 gst_buffer_get_size (buffer));
741 ret = klass->data_received (demux, stream, buffer);
743 if (ret != GST_FLOW_OK) {
744 GST_DEBUG_OBJECT (stream, "data_received returned %s",
745 gst_flow_get_name (ret));
747 if (ret == GST_FLOW_FLUSHING) {
748 /* do not make any changes if the stream is cancelled */
749 if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED) {
750 GST_DEBUG_OBJECT (stream, "Stream was stopped. Aborting");
755 if (ret < GST_FLOW_EOS) {
756 GstEvent *eos = gst_event_new_eos ();
757 GST_ELEMENT_FLOW_ERROR (demux, ret);
759 GST_DEBUG_OBJECT (stream, "Pushing EOS to parser");
761 /* TODO push this on all pads */
762 gst_event_set_seqnum (eos, stream->demux->priv->segment_seqnum);
763 gst_pad_send_event (stream->parsebin_sink, eos);
764 ret = GST_FLOW_ERROR;
766 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_ERRORED;
773 /* Calculate the low and high download buffering watermarks
774 * in time as MAX (low-watermark-time, low-watermark-fragments) and
775 * MIN (high-watermark-time, high-watermark-fragments) respectively
778 calculate_track_thresholds (GstAdaptiveDemux * demux,
779 GstAdaptiveDemux2Stream * stream,
780 GstClockTime fragment_duration, GstClockTime * low_threshold,
781 GstClockTime * high_threshold)
783 GST_OBJECT_LOCK (demux);
784 *low_threshold = demux->buffering_low_watermark_fragments * fragment_duration;
785 if (*low_threshold == 0 ||
786 (demux->buffering_low_watermark_time != 0
787 && demux->buffering_low_watermark_time > *low_threshold)) {
788 *low_threshold = demux->buffering_low_watermark_time;
791 if (*low_threshold == 0) {
792 /* This implies both low level properties were 0, the default is 10s unless
793 * the subclass has specified a recommended buffering threshold */
794 *low_threshold = 10 * GST_SECOND;
795 if (GST_CLOCK_TIME_IS_VALID (stream->recommended_buffering_threshold))
797 MIN (stream->recommended_buffering_threshold, *low_threshold);
801 demux->buffering_high_watermark_fragments * fragment_duration;
802 if (*high_threshold == 0 || (demux->buffering_high_watermark_time != 0
803 && demux->buffering_high_watermark_time < *high_threshold)) {
804 *high_threshold = demux->buffering_high_watermark_time;
807 /* Make sure the low and high thresholds are less than the maximum buffering
809 if (*high_threshold == 0 ||
810 (demux->max_buffering_time != 0
811 && demux->max_buffering_time < *high_threshold)) {
812 *high_threshold = demux->max_buffering_time;
815 if (*low_threshold == 0 ||
816 (demux->max_buffering_time != 0
817 && demux->max_buffering_time < *low_threshold)) {
818 *low_threshold = demux->max_buffering_time;
821 /* Make sure the high threshold is higher than (or equal to) the low threshold.
822 * It's OK if they are the same, as the minimum download is 1 fragment */
823 if (*high_threshold == 0 ||
824 (*low_threshold != 0 && *low_threshold > *high_threshold)) {
825 *high_threshold = *low_threshold;
828 GST_OBJECT_UNLOCK (demux);
831 #define ABSDIFF(a,b) ((a) < (b) ? (b) - (a) : (a) - (b))
833 gst_adaptive_demux2_stream_wait_for_output_space (GstAdaptiveDemux * demux,
834 GstAdaptiveDemux2Stream * stream, GstClockTime fragment_duration)
836 gboolean need_to_wait = TRUE;
837 gboolean have_any_tracks = FALSE;
838 gboolean have_active_tracks = FALSE;
839 gboolean have_filled_inactive = FALSE;
840 gboolean update_buffering = FALSE;
842 GstClockTime low_threshold = 0, high_threshold = 0;
845 calculate_track_thresholds (demux, stream, fragment_duration,
846 &low_threshold, &high_threshold);
847 GST_DEBUG_OBJECT (stream,
848 "Thresholds low:%" GST_TIME_FORMAT " high:%" GST_TIME_FORMAT
849 " recommended:%" GST_TIME_FORMAT, GST_TIME_ARGS (low_threshold),
850 GST_TIME_ARGS (high_threshold),
851 GST_TIME_ARGS (stream->recommended_buffering_threshold));
853 /* If there are no tracks at all, don't wait. If there are no active
854 * tracks, keep filling until at least one track is full. If there
855 * are active tracks, require that they are all full */
857 for (iter = stream->tracks; iter; iter = iter->next) {
858 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
860 /* Update the buffering threshold if it changed by more than a second */
861 if (ABSDIFF (low_threshold, track->buffering_threshold) > GST_SECOND) {
862 GST_DEBUG_OBJECT (stream, "Updating threshold");
863 /* The buffering threshold for this track changed, make sure to
864 * re-check buffering status */
865 update_buffering = TRUE;
866 track->buffering_threshold = low_threshold;
869 have_any_tracks = TRUE;
871 have_active_tracks = TRUE;
873 if (track->level_time < high_threshold) {
875 need_to_wait = FALSE;
876 GST_DEBUG_OBJECT (stream,
877 "track %s has level %" GST_TIME_FORMAT
878 " - needs more data (target %" GST_TIME_FORMAT
879 ") (fragment duration %" GST_TIME_FORMAT ")",
880 track->stream_id, GST_TIME_ARGS (track->level_time),
881 GST_TIME_ARGS (high_threshold), GST_TIME_ARGS (fragment_duration));
884 } else if (!track->active) { /* track is over threshold and inactive */
885 have_filled_inactive = TRUE;
888 GST_DEBUG_OBJECT (stream,
889 "track %s active (%d) has level %" GST_TIME_FORMAT,
890 track->stream_id, track->active, GST_TIME_ARGS (track->level_time));
893 /* If there are no tracks, don't wait (we might need data to create them),
894 * or if there are active tracks that need more data to hit the threshold,
895 * don't wait. Otherwise it means all active tracks are full and we should wait */
896 if (!have_any_tracks) {
897 GST_DEBUG_OBJECT (stream, "no tracks created yet - not waiting");
898 need_to_wait = FALSE;
899 } else if (!have_active_tracks && !have_filled_inactive) {
900 GST_DEBUG_OBJECT (stream,
901 "have only inactive tracks that need more data - not waiting");
902 need_to_wait = FALSE;
906 stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
908 for (iter = stream->tracks; iter; iter = iter->next) {
909 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
911 GST_DEBUG_OBJECT (stream,
912 "Waiting for queued data on track %s to drop below %"
913 GST_TIME_FORMAT " (fragment duration %" GST_TIME_FORMAT ")",
914 track->stream_id, GST_TIME_ARGS (high_threshold),
915 GST_TIME_ARGS (fragment_duration));
917 /* we want to get woken up when the global output position reaches
918 * a point where the input is closer than "high_threshold" to needing
919 * output, so we can put more data in */
920 GstClockTimeDiff wakeup_time = track->input_time - high_threshold;
922 if (stream->next_input_wakeup_time == GST_CLOCK_STIME_NONE ||
923 wakeup_time < stream->next_input_wakeup_time) {
924 stream->next_input_wakeup_time = wakeup_time;
926 GST_DEBUG_OBJECT (stream,
927 "Track %s level %" GST_TIME_FORMAT ". Input at position %"
928 GST_TIME_FORMAT " next wakeup should be %" GST_TIME_FORMAT " now %"
929 GST_TIME_FORMAT, track->stream_id,
930 GST_TIME_ARGS (track->level_time),
931 GST_TIME_ARGS (track->input_time), GST_TIME_ARGS (wakeup_time),
932 GST_TIME_ARGS (demux->priv->global_output_position));
936 if (stream->next_input_wakeup_time != GST_CLOCK_TIME_NONE) {
937 GST_DEBUG_OBJECT (stream,
938 "Next input wakeup time is now %" GST_TIME_FORMAT,
939 GST_TIME_ARGS (stream->next_input_wakeup_time));
941 /* If this stream needs waking up sooner than any other current one,
942 * update the period wakeup time, which is what the output loop
944 GstAdaptiveDemuxPeriod *period = stream->period;
945 if (period->next_input_wakeup_time == GST_CLOCK_STIME_NONE ||
946 period->next_input_wakeup_time > stream->next_input_wakeup_time) {
947 period->next_input_wakeup_time = stream->next_input_wakeup_time;
952 if (update_buffering) {
953 demux_update_buffering_locked (demux);
954 demux_post_buffering_locked (demux);
957 TRACKS_UNLOCK (demux);
962 static GstAdaptiveDemuxTrack *
963 match_parsebin_to_track (GstAdaptiveDemux2Stream * stream, GstPad * pad)
966 GstAdaptiveDemuxTrack *found_track = NULL, *first_matched_track = NULL;
967 gint num_possible_tracks = 0;
968 GstStream *gst_stream;
969 const gchar *internal_stream_id;
970 GstStreamType stream_type;
972 gst_stream = gst_pad_get_stream (pad);
974 /* FIXME: Edward: Added assertion because I don't see in what cases we would
975 * end up with a pad from parsebin which wouldn't have an associated
977 g_assert (gst_stream);
979 internal_stream_id = gst_stream_get_stream_id (gst_stream);
980 stream_type = gst_stream_get_stream_type (gst_stream);
982 GST_DEBUG_OBJECT (pad,
983 "Trying to match pad from parsebin with internal streamid %s and caps %"
984 GST_PTR_FORMAT, GST_STR_NULL (internal_stream_id),
985 gst_stream_get_caps (gst_stream));
987 /* Try to match directly by the track's pending upstream_stream_id */
988 for (tmp = stream->tracks; tmp; tmp = tmp->next) {
989 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
991 if (stream_type != GST_STREAM_TYPE_UNKNOWN && track->type != stream_type)
994 GST_DEBUG_OBJECT (pad, "track upstream_stream_id: %s",
995 track->upstream_stream_id);
997 if (first_matched_track == NULL)
998 first_matched_track = track;
999 num_possible_tracks++;
1001 /* If this track has a desired upstream stream id, match on it */
1002 if (track->upstream_stream_id == NULL ||
1003 g_strcmp0 (track->upstream_stream_id, internal_stream_id)) {
1004 /* This is not the track for this pad */
1008 /* Remove pending upstream id (we have matched it for the pending
1010 g_free (track->upstream_stream_id);
1011 track->upstream_stream_id = NULL;
1012 found_track = track;
1016 if (found_track == NULL) {
1017 /* If we arrive here, it means the stream is switching pads after
1018 * the stream has already started running */
1019 /* No track is currently waiting for this particular stream id -
1020 * try and match an existing linked track. If there's only 1 possible,
1022 if (num_possible_tracks == 1 && first_matched_track != NULL) {
1023 GST_LOG_OBJECT (pad, "Only one possible track to link to");
1024 found_track = first_matched_track;
1028 if (found_track == NULL) {
1029 /* TODO: There are multiple possible tracks, need to match based
1030 * on language code and caps. Have you found a stream like this? */
1031 GST_FIXME_OBJECT (pad, "Need to match track based on caps and language");
1034 if (found_track != NULL) {
1035 if (!gst_pad_is_linked (found_track->sinkpad)) {
1036 GST_LOG_OBJECT (pad, "Linking to track pad %" GST_PTR_FORMAT,
1037 found_track->sinkpad);
1039 if (gst_pad_link (pad, found_track->sinkpad) != GST_PAD_LINK_OK) {
1040 GST_ERROR_OBJECT (pad, "Couldn't connect to track sinkpad");
1041 /* FIXME : Do something if we can't link ? */
1044 /* Store pad as pending link */
1045 GST_LOG_OBJECT (pad,
1046 "Remembering pad to be linked when current pad is unlinked");
1047 g_assert (found_track->pending_srcpad == NULL);
1048 found_track->pending_srcpad = gst_object_ref (pad);
1053 gst_object_unref (gst_stream);
1059 parsebin_pad_removed_cb (GstElement * parsebin, GstPad * pad,
1060 GstAdaptiveDemux2Stream * stream)
1063 GST_DEBUG_OBJECT (stream, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1065 /* Remove from pending source pad */
1066 TRACKS_LOCK (stream->demux);
1067 for (iter = stream->tracks; iter; iter = iter->next) {
1068 GstAdaptiveDemuxTrack *track = iter->data;
1069 if (track->pending_srcpad == pad) {
1070 gst_object_unref (track->pending_srcpad);
1071 track->pending_srcpad = NULL;
1075 TRACKS_UNLOCK (stream->demux);
1079 parsebin_pad_added_cb (GstElement * parsebin, GstPad * pad,
1080 GstAdaptiveDemux2Stream * stream)
1082 if (!GST_PAD_IS_SRC (pad))
1085 GST_DEBUG_OBJECT (stream, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1087 if (!match_parsebin_to_track (stream, pad))
1088 GST_WARNING_OBJECT (pad, "Found no track to handle pad");
1090 GST_DEBUG_OBJECT (stream->demux, "Done linking");
1094 parsebin_deep_element_added_cb (GstBin * parsebin, GstBin * unused,
1095 GstElement * element, GstAdaptiveDemux * demux)
1097 if (G_OBJECT_TYPE (element) == tsdemux_type) {
1098 GST_DEBUG_OBJECT (demux, "Overriding tsdemux ignore-pcr to TRUE");
1099 g_object_set (element, "ignore-pcr", TRUE, NULL);
1103 /* must be called with manifest_lock taken */
1105 gst_adaptive_demux2_stream_create_parser (GstAdaptiveDemux2Stream * stream)
1107 GstAdaptiveDemux *demux = stream->demux;
1109 if (stream->parsebin == NULL) {
1112 GST_DEBUG_OBJECT (demux, "Setting up new parsing source");
1114 /* Workaround to detect if tsdemux is being used */
1115 if (tsdemux_type == 0) {
1116 GstElement *element = gst_element_factory_make ("tsdemux", NULL);
1118 tsdemux_type = G_OBJECT_TYPE (element);
1119 gst_object_unref (element);
1123 stream->parsebin = gst_element_factory_make ("parsebin", NULL);
1125 g_signal_connect (stream->parsebin, "deep-element-added",
1126 (GCallback) parsebin_deep_element_added_cb, demux);
1127 gst_bin_add (GST_BIN_CAST (demux), gst_object_ref (stream->parsebin));
1128 stream->parsebin_sink =
1129 gst_element_get_static_pad (stream->parsebin, "sink");
1130 stream->pad_added_id = g_signal_connect (stream->parsebin, "pad-added",
1131 G_CALLBACK (parsebin_pad_added_cb), stream);
1132 stream->pad_removed_id = g_signal_connect (stream->parsebin, "pad-removed",
1133 G_CALLBACK (parsebin_pad_removed_cb), stream);
1135 event = gst_event_new_stream_start ("bogus");
1136 if (demux->have_group_id)
1137 gst_event_set_group_id (event, demux->group_id);
1139 gst_pad_send_event (stream->parsebin_sink, event);
1141 /* Not sure if these need to be outside the manifest lock: */
1142 gst_element_sync_state_with_parent (stream->parsebin);
1143 stream->last_status_code = 200; /* default to OK */
1149 on_download_cancellation (DownloadRequest * request, DownloadRequestState state,
1150 GstAdaptiveDemux2Stream * stream)
1155 on_download_error (DownloadRequest * request, DownloadRequestState state,
1156 GstAdaptiveDemux2Stream * stream)
1158 GstAdaptiveDemux *demux = stream->demux;
1159 guint last_status_code = request->status_code;
1162 if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING) {
1163 GST_DEBUG_OBJECT (stream, "Stream state changed to %d. Aborting",
1168 stream->download_active = FALSE;
1169 stream->last_status_code = last_status_code;
1171 GST_DEBUG_OBJECT (stream,
1172 "Download finished with error, request state %d http status %u, dc %d",
1173 request->state, last_status_code, stream->download_error_count);
1175 live = gst_adaptive_demux_is_live (demux);
1176 if (((last_status_code / 100 == 4 && live)
1177 || last_status_code / 100 == 5)) {
1179 /* if current position is before available start, switch to next */
1180 if (!gst_adaptive_demux2_stream_has_next_fragment (demux, stream))
1184 gint64 range_start, range_stop;
1186 if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
1190 if (demux->segment.position < range_start) {
1193 GST_DEBUG_OBJECT (stream, "Retrying once with next segment");
1194 gst_adaptive_demux2_stream_finish_download (stream, GST_FLOW_EOS, NULL);
1196 GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1198 ret = gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
1199 GST_DEBUG_OBJECT (stream, "update_fragment_info ret: %s",
1200 gst_flow_get_name (ret));
1202 if (ret == GST_FLOW_OK)
1205 } else if (demux->segment.position > range_stop) {
1206 /* wait a bit to be in range, we don't have any locks at that point */
1207 GstClockTime wait_time =
1208 gst_adaptive_demux2_stream_get_fragment_waiting_time (demux,
1210 if (wait_time > 0) {
1211 GST_DEBUG_OBJECT (stream,
1212 "Download waiting for %" GST_TIME_FORMAT,
1213 GST_TIME_ARGS (wait_time));
1214 g_assert (stream->pending_cb_id == 0);
1215 GST_LOG_OBJECT (stream, "Scheduling delayed load_a_fragment() call");
1216 stream->pending_cb_id =
1217 gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1219 (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1220 gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1226 if (stream->download_error_count >= MAX_DOWNLOAD_ERROR_COUNT) {
1227 /* looks like there is no way of knowing when a live stream has ended
1228 * Have to assume we are falling behind and cause a manifest reload */
1229 GST_DEBUG_OBJECT (stream, "Converting error of live stream to EOS");
1230 gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1233 } else if (!gst_adaptive_demux2_stream_has_next_fragment (demux, stream)) {
1234 /* If this is the last fragment, consider failures EOS and not actual
1235 * errors. Due to rounding errors in the durations, the last fragment
1236 * might not actually exist */
1237 GST_DEBUG_OBJECT (stream, "Converting error for last fragment to EOS");
1238 gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1241 /* retry same segment */
1242 if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
1243 gst_adaptive_demux2_stream_error (stream);
1250 /* wait a short time in case the server needs a bit to recover */
1251 GST_LOG_OBJECT (stream,
1252 "Scheduling delayed load_a_fragment() call to retry in 10 milliseconds");
1253 g_assert (stream->pending_cb_id == 0);
1254 stream->pending_cb_id = gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task, 10 * GST_MSECOND, /* Retry in 10 ms */
1255 (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1256 gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1260 update_stream_bitrate (GstAdaptiveDemux2Stream * stream,
1261 DownloadRequest * request)
1263 GstClockTimeDiff last_download_duration;
1264 guint64 fragment_bytes_downloaded = request->content_received;
1266 /* The stream last_download time tracks the full download time for now */
1267 stream->last_download_time =
1268 GST_CLOCK_DIFF (request->download_request_time,
1269 request->download_end_time);
1271 /* Here we only track the time the data took to arrive and ignore request delay, so we can estimate bitrate */
1272 last_download_duration =
1273 GST_CLOCK_DIFF (request->download_start_time, request->download_end_time);
1275 /* If the entire response arrived in the first buffer
1276 * though, include the request time to get a valid
1277 * bitrate estimate */
1278 if (last_download_duration < 2 * stream->last_download_time)
1279 last_download_duration = stream->last_download_time;
1281 if (last_download_duration > 0) {
1282 stream->last_bitrate =
1283 gst_util_uint64_scale (fragment_bytes_downloaded,
1284 8 * GST_SECOND, last_download_duration);
1286 GST_DEBUG_OBJECT (stream,
1287 "Updated stream bitrate. %" G_GUINT64_FORMAT
1288 " bytes. download time %" GST_TIME_FORMAT " bitrate %"
1289 G_GUINT64_FORMAT " bps", fragment_bytes_downloaded,
1290 GST_TIME_ARGS (last_download_duration), stream->last_bitrate);
1295 on_download_progress (DownloadRequest * request, DownloadRequestState state,
1296 GstAdaptiveDemux2Stream * stream)
1298 GstAdaptiveDemux *demux = stream->demux;
1299 GstBuffer *buffer = download_request_take_buffer (request);
1304 GST_DEBUG_OBJECT (stream,
1305 "Handling buffer of %" G_GSIZE_FORMAT
1306 " bytes of ongoing download progress - %" G_GUINT64_FORMAT " / %"
1307 G_GUINT64_FORMAT " bytes", gst_buffer_get_size (buffer),
1308 request->content_received, request->content_length);
1310 /* Drop the request lock when parsing data. FIXME: Check and comment why this is needed */
1311 download_request_unlock (request);
1312 ret = gst_adaptive_demux2_stream_parse_buffer (stream, buffer);
1313 download_request_lock (request);
1315 if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
1318 if (ret != GST_FLOW_OK) {
1319 GST_DEBUG_OBJECT (stream,
1320 "Buffer parsing returned: %d %s. Aborting download", ret,
1321 gst_flow_get_name (ret));
1323 if (!stream->downloading_header && !stream->downloading_index)
1324 update_stream_bitrate (stream, request);
1326 downloadhelper_cancel_request (demux->download_helper, request);
1328 /* cancellation is async, so recycle our download request to avoid races */
1329 download_request_unref (stream->download_request);
1330 stream->download_request = download_request_new ();
1332 gst_adaptive_demux2_stream_finish_download (stream, ret, NULL);
1338 on_download_complete (DownloadRequest * request, DownloadRequestState state,
1339 GstAdaptiveDemux2Stream * stream)
1341 GstFlowReturn ret = GST_FLOW_OK;
1344 stream->download_active = FALSE;
1346 if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING) {
1347 GST_DEBUG_OBJECT (stream, "Stream state changed to %d. Aborting",
1352 GST_DEBUG_OBJECT (stream,
1353 "Stream %p %s download for %s is complete with state %d",
1354 stream, uritype (stream), request->uri, request->state);
1356 /* Update bitrate for fragment downloads */
1357 if (!stream->downloading_header && !stream->downloading_index)
1358 update_stream_bitrate (stream, request);
1360 buffer = download_request_take_buffer (request);
1362 ret = gst_adaptive_demux2_stream_parse_buffer (stream, buffer);
1364 GST_DEBUG_OBJECT (stream,
1365 "%s download finished: %s ret %d %s. Stream state %d", uritype (stream),
1366 request->uri, ret, gst_flow_get_name (ret), stream->state);
1368 if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
1371 g_assert (stream->pending_cb_id == 0);
1372 gst_adaptive_demux2_stream_finish_download (stream, ret, NULL);
1375 /* must be called from the scheduler context
1377 * Will submit the request only, which will complete asynchronously
1379 static GstFlowReturn
1380 gst_adaptive_demux2_stream_begin_download_uri (GstAdaptiveDemux * demux,
1381 GstAdaptiveDemux2Stream * stream, const gchar * uri, gint64 start,
1384 DownloadRequest *request = stream->download_request;
1386 GST_DEBUG_OBJECT (demux,
1387 "Downloading %s uri: %s, range:%" G_GINT64_FORMAT " - %" G_GINT64_FORMAT,
1388 uritype (stream), uri, start, end);
1390 if (!gst_adaptive_demux2_stream_create_parser (stream))
1391 return GST_FLOW_ERROR;
1393 /* Configure our download request */
1394 download_request_set_uri (request, uri, start, end);
1396 if (stream->downloading_header || stream->downloading_index) {
1397 download_request_set_callbacks (request,
1398 (DownloadRequestEventCallback) on_download_complete,
1399 (DownloadRequestEventCallback) on_download_error,
1400 (DownloadRequestEventCallback) on_download_cancellation,
1401 (DownloadRequestEventCallback) NULL, stream);
1403 download_request_set_callbacks (request,
1404 (DownloadRequestEventCallback) on_download_complete,
1405 (DownloadRequestEventCallback) on_download_error,
1406 (DownloadRequestEventCallback) on_download_cancellation,
1407 (DownloadRequestEventCallback) on_download_progress, stream);
1410 if (!downloadhelper_submit_request (demux->download_helper,
1411 demux->manifest_uri, DOWNLOAD_FLAG_NONE, request, NULL))
1412 return GST_FLOW_ERROR;
1414 stream->download_active = TRUE;
1419 /* must be called from the scheduler context */
1420 static GstFlowReturn
1421 gst_adaptive_demux2_stream_download_fragment (GstAdaptiveDemux2Stream * stream)
1423 GstAdaptiveDemux *demux = stream->demux;
1424 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1428 /* THERE ARE THREE DIFFERENT VARIABLES FOR THE "BEGINNING" OF A FRAGMENT ! */
1429 if (stream->starting_fragment) {
1430 GST_DEBUG_OBJECT (stream, "Downloading %s%s%s",
1431 stream->fragment.uri ? "FRAGMENT " : "",
1432 stream->need_header && stream->fragment.header_uri ? "HEADER " : "",
1433 stream->need_index && stream->fragment.index_uri ? "INDEX" : "");
1435 if (stream->fragment.uri == NULL && stream->fragment.header_uri == NULL &&
1436 stream->fragment.index_uri == NULL)
1439 stream->first_fragment_buffer = TRUE;
1440 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING;
1443 if (stream->need_header && stream->fragment.header_uri != NULL) {
1445 /* Set the need_index flag when we start the header if we'll also need the index */
1446 stream->need_index = (stream->fragment.index_uri != NULL);
1448 GST_DEBUG_OBJECT (stream, "Fetching header %s %" G_GINT64_FORMAT "-%"
1449 G_GINT64_FORMAT, stream->fragment.header_uri,
1450 stream->fragment.header_range_start, stream->fragment.header_range_end);
1452 stream->downloading_header = TRUE;
1454 return gst_adaptive_demux2_stream_begin_download_uri (demux, stream,
1455 stream->fragment.header_uri, stream->fragment.header_range_start,
1456 stream->fragment.header_range_end);
1459 /* check if we have an index */
1460 if (stream->need_index && stream->fragment.index_uri != NULL) {
1461 GST_DEBUG_OBJECT (stream,
1462 "Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
1463 stream->fragment.index_uri,
1464 stream->fragment.index_range_start, stream->fragment.index_range_end);
1466 stream->downloading_index = TRUE;
1468 return gst_adaptive_demux2_stream_begin_download_uri (demux, stream,
1469 stream->fragment.index_uri, stream->fragment.index_range_start,
1470 stream->fragment.index_range_end);
1473 url = stream->fragment.uri;
1474 GST_DEBUG_OBJECT (stream, "Got url '%s' for stream %p", url, stream);
1478 /* Download the actual fragment, either in chunks or in one go */
1479 stream->first_fragment_buffer = TRUE;
1481 if (klass->need_another_chunk && klass->need_another_chunk (stream)
1482 && stream->fragment.chunk_size != 0) {
1483 /* Handle chunk downloading */
1484 gint64 range_start = stream->fragment.range_start;
1485 gint64 range_end = stream->fragment.range_end;
1486 gint chunk_size = stream->fragment.chunk_size;
1489 /* HTTP ranges are inclusive for the end */
1490 if (chunk_size != -1) {
1491 chunk_end = range_start + chunk_size - 1;
1492 if (range_end != -1 && range_end < chunk_end)
1493 chunk_end = range_end;
1495 chunk_end = range_end;
1498 GST_DEBUG_OBJECT (stream,
1499 "Starting chunked download %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
1500 url, range_start, chunk_end);
1501 return gst_adaptive_demux2_stream_begin_download_uri (demux, stream, url,
1502 range_start, chunk_end);
1505 /* regular single chunk download */
1506 stream->fragment.chunk_size = 0;
1508 return gst_adaptive_demux2_stream_begin_download_uri (demux, stream, url,
1509 stream->fragment.range_start, stream->fragment.range_end);
1513 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1514 (_("Failed to get fragment URL.")),
1515 ("An error happened when getting fragment URL"));
1516 return GST_FLOW_ERROR;
1521 gst_adaptive_demux2_stream_push_event (GstAdaptiveDemux2Stream * stream,
1524 gboolean ret = TRUE;
1527 /* If there's a parsebin, push the event through it */
1528 if (stream->parsebin_sink != NULL) {
1529 pad = gst_object_ref (stream->parsebin_sink);
1530 GST_DEBUG_OBJECT (pad, "Pushing event %" GST_PTR_FORMAT, event);
1531 ret = gst_pad_send_event (pad, gst_event_ref (event));
1532 gst_object_unref (pad);
1535 /* If the event is EOS, ensure that all tracks are EOS. This catches
1536 * the case where the parsebin hasn't parsed anything yet (we switched
1537 * to a never before used track right near EOS, or it didn't parse enough
1538 * to create pads and be able to send EOS through to the tracks.
1540 * We don't need to care about any other events
1542 if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
1545 for (iter = stream->tracks; iter; iter = iter->next) {
1546 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
1547 ret &= gst_pad_send_event (track->sinkpad, gst_event_ref (event));
1551 gst_event_unref (event);
1556 gst_adaptive_demux2_stream_error (GstAdaptiveDemux2Stream * stream)
1558 GstAdaptiveDemux *demux = stream->demux;
1560 GstStructure *details;
1562 details = gst_structure_new_empty ("details");
1563 gst_structure_set (details, "http-status-code", G_TYPE_UINT,
1564 stream->last_status_code, NULL);
1566 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_ERRORED;
1568 if (stream->last_error) {
1569 gchar *debug = g_strdup_printf ("Error on stream %s",
1570 GST_OBJECT_NAME (stream));
1572 gst_message_new_error_with_details (GST_OBJECT_CAST (demux),
1573 stream->last_error, debug, details);
1574 GST_ERROR_OBJECT (stream, "Download error: %s",
1575 stream->last_error->message);
1578 GError *err = g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_NOT_FOUND,
1579 _("Couldn't download fragments"));
1581 gst_message_new_error_with_details (GST_OBJECT_CAST (demux), err,
1582 "Fragment downloading has failed consecutive times", details);
1584 GST_ERROR_OBJECT (stream,
1585 "Download error: Couldn't download fragments, too many failures");
1588 gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
1591 /* Called when a stream reaches the end of a playback segment */
1593 gst_adaptive_demux2_stream_end_of_manifest (GstAdaptiveDemux2Stream * stream)
1595 GstAdaptiveDemux *demux = stream->demux;
1596 GstFlowReturn combined =
1597 gst_adaptive_demux_period_combine_stream_flows (demux->input_period);
1599 GST_DEBUG_OBJECT (stream, "Combined flow %s", gst_flow_get_name (combined));
1601 if (gst_adaptive_demux_has_next_period (demux)) {
1602 if (combined == GST_FLOW_EOS) {
1603 GST_DEBUG_OBJECT (stream, "Next period available, advancing");
1604 gst_adaptive_demux_advance_period (demux);
1606 /* Ensure the 'has_next_period' flag is set on the period before
1607 * pushing EOS to the stream, so that the output loop knows not
1608 * to actually output the event */
1609 GST_DEBUG_OBJECT (stream, "Marking current period has a next one");
1610 demux->input_period->has_next_period = TRUE;
1614 if (demux->priv->outputs) {
1615 GstEvent *eos = gst_event_new_eos ();
1617 GST_DEBUG_OBJECT (stream, "Stream is EOS. Stopping.");
1618 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS;
1620 gst_event_set_seqnum (eos, stream->demux->priv->segment_seqnum);
1621 gst_adaptive_demux2_stream_push_event (stream, eos);
1623 GST_ERROR_OBJECT (demux, "Can't push EOS on non-exposed pad");
1624 gst_adaptive_demux2_stream_error (stream);
1629 gst_adaptive_demux2_stream_reload_manifest_cb (GstAdaptiveDemux2Stream * stream)
1631 GstAdaptiveDemux *demux = stream->demux;
1633 gboolean is_live = gst_adaptive_demux_is_live (demux);
1635 stream->pending_cb_id = 0;
1637 /* Refetch the playlist now after we waited */
1638 /* FIXME: Make this manifest update async and handle it on completion */
1639 if (!is_live && gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
1640 GST_DEBUG_OBJECT (demux, "Updated the playlist");
1643 /* We were called here from a timeout, so if the load function wants to loop
1644 * again, schedule an immediate callback but return G_SOURCE_REMOVE either
1646 while (gst_adaptive_demux2_stream_next_download (stream));
1648 return G_SOURCE_REMOVE;
1652 gst_adaptive_demux2_stream_on_output_space_available_cb (GstAdaptiveDemux2Stream
1655 /* If the state already moved on, the stream was stopped, or another track
1656 * already woke up and needed data */
1657 if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE)
1658 return G_SOURCE_REMOVE;
1660 GstAdaptiveDemux *demux = stream->demux;
1661 TRACKS_LOCK (demux);
1664 for (iter = stream->tracks; iter; iter = iter->next) {
1665 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
1667 /* We need to recompute the track's level_time value, as the
1668 * global output position may have advanced and reduced the
1669 * value, even without anything being dequeued yet */
1670 gst_adaptive_demux_track_update_level_locked (track);
1672 GST_DEBUG_OBJECT (stream, "track %s woken level %" GST_TIME_FORMAT
1673 " input position %" GST_TIME_FORMAT " at %" GST_TIME_FORMAT,
1674 track->stream_id, GST_TIME_ARGS (track->level_time),
1675 GST_TIME_ARGS (track->input_time),
1676 GST_TIME_ARGS (demux->priv->global_output_position));
1678 TRACKS_UNLOCK (demux);
1680 while (gst_adaptive_demux2_stream_load_a_fragment (stream));
1682 return G_SOURCE_REMOVE;
1686 gst_adaptive_demux2_stream_on_output_space_available (GstAdaptiveDemux2Stream *
1689 GstAdaptiveDemux *demux = stream->demux;
1691 stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
1693 GST_LOG_OBJECT (stream, "Scheduling output_space_available() call");
1695 gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
1696 (GSourceFunc) gst_adaptive_demux2_stream_on_output_space_available_cb,
1697 gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1701 gst_adaptive_demux2_stream_on_manifest_update (GstAdaptiveDemux2Stream * stream)
1703 GstAdaptiveDemux *demux = stream->demux;
1705 if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE)
1708 g_assert (stream->pending_cb_id == 0);
1710 GST_LOG_OBJECT (stream, "Scheduling load_a_fragment() call");
1711 stream->pending_cb_id =
1712 gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
1713 (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1714 gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1718 gst_adaptive_demux2_stream_handle_playlist_eos (GstAdaptiveDemux2Stream *
1721 GstAdaptiveDemux *demux = stream->demux;
1723 if (gst_adaptive_demux_is_live (demux) && (demux->segment.rate == 1.0
1724 || gst_adaptive_demux2_stream_in_live_seek_range (demux, stream))) {
1726 if (!gst_adaptive_demux_has_next_period (demux)) {
1727 /* Wait only if we can ensure current manifest has been expired.
1728 * The meaning "we have next period" *WITH* EOS is that, current
1729 * period has been ended but we can continue to the next period */
1730 GST_DEBUG_OBJECT (stream,
1731 "Live playlist EOS - waiting for manifest update");
1732 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE;
1733 /* Clear the stream last_ret EOS state, since we're not actually EOS */
1734 if (stream->last_ret == GST_FLOW_EOS)
1735 stream->last_ret = GST_FLOW_OK;
1736 gst_adaptive_demux2_stream_wants_manifest_update (demux);
1741 gst_adaptive_demux2_stream_end_of_manifest (stream);
1745 gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream)
1747 GstAdaptiveDemux *demux = stream->demux;
1748 gboolean live = gst_adaptive_demux_is_live (demux);
1749 GstFlowReturn ret = GST_FLOW_OK;
1751 stream->pending_cb_id = 0;
1753 GST_LOG_OBJECT (stream, "entering, state = %d.", stream->state);
1755 switch (stream->state) {
1756 case GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART:
1757 case GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT:
1758 case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_LIVE:
1759 case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE:
1760 case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE:
1761 /* Get information about the fragment to download */
1762 GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1763 ret = gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
1764 GST_DEBUG_OBJECT (stream,
1765 "Fragment info update result: %d %s", ret, gst_flow_get_name (ret));
1767 if (ret == GST_FLOW_OK)
1768 stream->starting_fragment = TRUE;
1770 case GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING:
1772 case GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS:
1773 GST_ERROR_OBJECT (stream,
1774 "Unexpected stream state EOS. The stream should not be running now.");
1776 case GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED:
1777 /* The stream was stopped. Just finish up */
1780 GST_ERROR_OBJECT (stream, "Unexpected stream state %d", stream->state);
1781 g_assert_not_reached ();
1785 if (ret == GST_FLOW_OK) {
1786 /* Wait for room in the output tracks */
1787 if (gst_adaptive_demux2_stream_wait_for_output_space (demux, stream,
1788 stream->fragment.duration)) {
1789 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE;
1794 if (ret == GST_FLOW_OK) {
1795 /* wait for live fragments to be available */
1797 GstClockTime wait_time =
1798 gst_adaptive_demux2_stream_get_fragment_waiting_time (demux, stream);
1799 if (wait_time > 0) {
1800 GST_DEBUG_OBJECT (stream,
1801 "Download waiting for %" GST_TIME_FORMAT,
1802 GST_TIME_ARGS (wait_time));
1804 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_LIVE;
1806 GST_LOG_OBJECT (stream, "Scheduling delayed load_a_fragment() call");
1807 g_assert (stream->pending_cb_id == 0);
1808 stream->pending_cb_id =
1809 gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1810 wait_time, (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1811 gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1816 if (gst_adaptive_demux2_stream_download_fragment (stream) != GST_FLOW_OK) {
1817 GST_ERROR_OBJECT (demux,
1818 "Failed to begin fragment download for stream %p", stream);
1823 /* Cast to int avoids a compiler warning that
1824 * GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC is not in the GstFlowReturn enum */
1825 switch ((int) ret) {
1827 break; /* all is good, let's go */
1829 GST_DEBUG_OBJECT (stream, "EOS, checking to stop download loop");
1830 stream->last_ret = ret;
1831 gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1833 case GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC:
1834 GST_DEBUG_OBJECT (stream, "Lost sync, asking reset to current position");
1835 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
1836 gst_adaptive_demux_handle_lost_sync (demux);
1838 case GST_FLOW_NOT_LINKED:
1840 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS;
1842 if (gst_adaptive_demux_period_combine_stream_flows (demux->input_period)
1843 == GST_FLOW_NOT_LINKED) {
1844 GST_ELEMENT_FLOW_ERROR (demux, ret);
1849 case GST_FLOW_FLUSHING:
1850 /* Flushing is normal, the target track might have been unselected */
1851 GST_DEBUG_OBJECT (stream, "Got flushing return. Stopping callback.");
1854 if (ret <= GST_FLOW_ERROR) {
1855 GST_WARNING_OBJECT (demux, "Error while downloading fragment");
1856 if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
1857 gst_adaptive_demux2_stream_error (stream);
1861 g_clear_error (&stream->last_error);
1863 /* First try to update the playlist for non-live playlists
1864 * in case the URIs have changed in the meantime. But only
1865 * try it the first time, after that we're going to wait a
1866 * a bit to not flood the server */
1867 if (stream->download_error_count == 1
1868 && !gst_adaptive_demux_is_live (demux)) {
1869 /* TODO hlsdemux had more options to this function (boolean and err) */
1870 if (gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
1871 /* Retry immediately, the playlist actually has changed */
1872 GST_DEBUG_OBJECT (demux, "Updated the playlist");
1877 /* Wait half the fragment duration before retrying */
1878 GST_LOG_OBJECT (stream, "Scheduling delayed reload_manifest_cb() call");
1879 g_assert (stream->pending_cb_id == 0);
1880 stream->pending_cb_id =
1881 gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1882 stream->fragment.duration / 2,
1883 (GSourceFunc) gst_adaptive_demux2_stream_reload_manifest_cb,
1884 gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1894 gst_adaptive_demux2_stream_next_download (GstAdaptiveDemux2Stream * stream)
1896 GstAdaptiveDemux *demux = stream->demux;
1897 gboolean end_of_manifest = FALSE;
1899 GST_LOG_OBJECT (stream, "Looking for next download");
1901 /* Restarting download, figure out new position
1902 * FIXME : Move this to a separate function ? */
1903 if (G_UNLIKELY (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART)) {
1904 GstClockTimeDiff stream_time = 0;
1906 GST_DEBUG_OBJECT (stream, "Activating stream after restart");
1908 if (stream->parsebin_sink != NULL) {
1909 /* If the parsebin already exists, we need to clear it out (if it doesn't,
1910 * this is the first time we've used this stream, so it's all good) */
1911 gst_adaptive_demux2_stream_push_event (stream,
1912 gst_event_new_flush_start ());
1913 gst_adaptive_demux2_stream_push_event (stream,
1914 gst_event_new_flush_stop (FALSE));
1917 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1918 stream_time = stream->start_position;
1920 GST_DEBUG_OBJECT (stream, "Restarting stream at "
1921 "stream position %" GST_STIME_FORMAT, GST_STIME_ARGS (stream_time));
1923 if (GST_CLOCK_STIME_IS_VALID (stream_time)) {
1924 /* TODO check return */
1925 gst_adaptive_demux2_stream_seek (demux, stream, demux->segment.rate >= 0,
1926 0, stream_time, &stream_time);
1927 stream->current_position = stream->start_position;
1929 GST_DEBUG_OBJECT (stream,
1930 "stream_time after restart seek: %" GST_STIME_FORMAT
1931 " position %" GST_STIME_FORMAT, GST_STIME_ARGS (stream_time),
1932 GST_STIME_ARGS (stream->current_position));
1935 /* Trigger (re)computation of the parsebin input segment */
1936 stream->compute_segment = TRUE;
1938 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1940 stream->discont = TRUE;
1941 stream->need_header = TRUE;
1942 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
1945 /* Check if we're done with our segment */
1946 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1947 if (demux->segment.rate > 0) {
1948 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop)
1949 && stream->current_position >= demux->segment.stop) {
1950 end_of_manifest = TRUE;
1953 if (GST_CLOCK_TIME_IS_VALID (demux->segment.start)
1954 && stream->current_position <= demux->segment.start) {
1955 end_of_manifest = TRUE;
1958 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1960 if (end_of_manifest) {
1961 gst_adaptive_demux2_stream_end_of_manifest (stream);
1964 return gst_adaptive_demux2_stream_load_a_fragment (stream);
1968 gst_adaptive_demux2_stream_can_start (GstAdaptiveDemux2Stream * stream)
1970 GstAdaptiveDemux *demux = stream->demux;
1971 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1973 if (!klass->stream_can_start)
1975 return klass->stream_can_start (demux, stream);
1979 * gst_adaptive_demux2_stream_start:
1980 * @stream: a #GstAdaptiveDemux2Stream
1982 * Start the given @stream. Should be called by subclasses that previously
1983 * returned %FALSE in `GstAdaptiveDemux::stream_can_start()`
1986 gst_adaptive_demux2_stream_start (GstAdaptiveDemux2Stream * stream)
1988 GstAdaptiveDemux *demux;
1990 g_return_if_fail (stream && stream->demux);
1992 demux = stream->demux;
1994 if (stream->pending_cb_id != 0 || stream->download_active) {
1995 /* There is already something active / pending on this stream */
1996 GST_LOG_OBJECT (stream, "Stream already running");
2000 /* Some streams require a delayed start, i.e. they need more information
2001 * before they can actually be started */
2002 if (!gst_adaptive_demux2_stream_can_start (stream)) {
2003 GST_LOG_OBJECT (stream, "Stream will be started asynchronously");
2007 if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS) {
2008 GST_LOG_OBJECT (stream, "Stream is EOS already");
2012 if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED ||
2013 stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART) {
2014 GST_LOG_OBJECT (stream, "Activating stream. Current state %d",
2016 stream->last_ret = GST_FLOW_OK;
2018 if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)
2019 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
2022 GST_LOG_OBJECT (stream, "Scheduling next_download() call");
2023 stream->pending_cb_id =
2024 gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2025 (GSourceFunc) gst_adaptive_demux2_stream_next_download,
2026 gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
2030 gst_adaptive_demux2_stream_stop (GstAdaptiveDemux2Stream * stream)
2032 GstAdaptiveDemux *demux = stream->demux;
2034 GST_DEBUG_OBJECT (stream, "Stopping stream (from state %d)", stream->state);
2035 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
2037 if (stream->pending_cb_id != 0) {
2038 gst_adaptive_demux_loop_cancel_call (demux->priv->scheduler_task,
2039 stream->pending_cb_id);
2040 stream->pending_cb_id = 0;
2043 /* Cancel and drop the existing download request */
2044 downloadhelper_cancel_request (demux->download_helper,
2045 stream->download_request);
2046 download_request_unref (stream->download_request);
2047 stream->downloading_header = stream->downloading_index = FALSE;
2048 stream->download_request = download_request_new ();
2049 stream->download_active = FALSE;
2051 stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
2055 gst_adaptive_demux2_stream_is_running (GstAdaptiveDemux2Stream * stream)
2057 if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)
2059 if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART)
2061 if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS)
2067 gst_adaptive_demux2_stream_is_selected_locked (GstAdaptiveDemux2Stream * stream)
2071 for (tmp = stream->tracks; tmp; tmp = tmp->next) {
2072 GstAdaptiveDemuxTrack *track = tmp->data;
2073 if (track->selected)
2081 * gst_adaptive_demux2_stream_is_selected:
2082 * @stream: A #GstAdaptiveDemux2Stream
2084 * Returns: %TRUE if any of the tracks targetted by @stream is selected
2087 gst_adaptive_demux2_stream_is_selected (GstAdaptiveDemux2Stream * stream)
2091 g_return_val_if_fail (stream && stream->demux, FALSE);
2093 TRACKS_LOCK (stream->demux);
2094 ret = gst_adaptive_demux2_stream_is_selected_locked (stream);
2095 TRACKS_UNLOCK (stream->demux);