3 * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4 * Author: Thiago Santos <thiagoss@osg.samsung.com>
6 * Copyright (C) 2021-2022 Centricular Ltd
7 * Author: Edward Hervey <edward@centricular.com>
8 * Author: Jan Schmidt <jan@centricular.com>
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Library General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Library General Public License for more details.
20 * You should have received a copy of the GNU Library General Public
21 * License along with this library; if not, write to the
22 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23 * Boston, MA 02110-1301, USA.
30 #include "gstadaptivedemux.h"
31 #include "gstadaptivedemux-private.h"
33 #include <glib/gi18n-lib.h>
34 #include <gst/app/gstappsrc.h>
36 GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
37 #define GST_CAT_DEFAULT adaptivedemux2_debug
39 static void gst_adaptive_demux2_stream_finalize (GObject * object);
40 static void gst_adaptive_demux2_stream_error (GstAdaptiveDemux2Stream * stream);
42 #define gst_adaptive_demux2_stream_parent_class parent_class
43 G_DEFINE_ABSTRACT_TYPE (GstAdaptiveDemux2Stream, gst_adaptive_demux2_stream,
47 gst_adaptive_demux2_stream_class_init (GstAdaptiveDemux2StreamClass * klass)
49 GObjectClass *gobject_class = (GObjectClass *) klass;
51 gobject_class->finalize = gst_adaptive_demux2_stream_finalize;
54 static GType tsdemux_type = 0;
57 gst_adaptive_demux2_stream_init (GstAdaptiveDemux2Stream * stream)
59 stream->download_request = download_request_new ();
60 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
61 stream->last_ret = GST_FLOW_OK;
62 stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
64 stream->fragment_bitrates =
65 g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS);
67 stream->start_position = stream->current_position = GST_CLOCK_TIME_NONE;
69 gst_segment_init (&stream->parse_segment, GST_FORMAT_TIME);
72 /* must be called with manifest_lock taken.
73 * It will temporarily drop the manifest_lock in order to join the task.
74 * It will join only the old_streams (the demux->streams are joined by
75 * gst_adaptive_demux_stop_tasks before gst_adaptive_demux2_stream_free is
79 gst_adaptive_demux2_stream_finalize (GObject * object)
81 GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) object;
83 GST_LOG_OBJECT (object, "Finalizing");
85 if (stream->download_request)
86 download_request_unref (stream->download_request);
88 stream->cancelled = TRUE;
89 g_clear_error (&stream->last_error);
91 gst_adaptive_demux2_stream_fragment_clear (&stream->fragment);
93 if (stream->pending_events) {
94 g_list_free_full (stream->pending_events, (GDestroyNotify) gst_event_unref);
95 stream->pending_events = NULL;
98 if (stream->parsebin_sink) {
99 gst_object_unref (stream->parsebin_sink);
100 stream->parsebin_sink = NULL;
103 if (stream->pad_added_id)
104 g_signal_handler_disconnect (stream->parsebin, stream->pad_added_id);
105 if (stream->pad_removed_id)
106 g_signal_handler_disconnect (stream->parsebin, stream->pad_removed_id);
108 if (stream->parsebin != NULL) {
109 GST_LOG_OBJECT (stream, "Removing parsebin");
110 gst_bin_remove (GST_BIN_CAST (stream->demux), stream->parsebin);
111 gst_element_set_state (stream->parsebin, GST_STATE_NULL);
112 gst_object_unref (stream->parsebin);
113 stream->parsebin = NULL;
116 g_free (stream->fragment_bitrates);
118 g_list_free_full (stream->tracks,
119 (GDestroyNotify) gst_adaptive_demux_track_unref);
121 if (stream->pending_caps)
122 gst_caps_unref (stream->pending_caps);
124 g_clear_pointer (&stream->pending_tags, gst_tag_list_unref);
125 g_clear_pointer (&stream->stream_collection, gst_object_unref);
127 G_OBJECT_CLASS (parent_class)->finalize (object);
131 * gst_adaptive_demux2_stream_add_track:
132 * @stream: A #GstAdaptiveDemux2Stream
133 * @track: (transfer none): A #GstAdaptiveDemuxTrack to assign to the @stream
135 * This function is called when a subclass knows of a target @track that this
136 * @stream can provide.
139 gst_adaptive_demux2_stream_add_track (GstAdaptiveDemux2Stream * stream,
140 GstAdaptiveDemuxTrack * track)
142 g_return_val_if_fail (track != NULL, FALSE);
144 GST_DEBUG_OBJECT (stream->demux, "stream:%p track:%s", stream,
146 if (g_list_find (stream->tracks, track)) {
147 GST_DEBUG_OBJECT (stream->demux,
148 "track '%s' already handled by this stream", track->stream_id);
152 if (stream->demux->buffering_low_watermark_time)
153 track->buffering_threshold = stream->demux->buffering_low_watermark_time;
154 else if (GST_CLOCK_TIME_IS_VALID (stream->recommended_buffering_threshold))
155 track->buffering_threshold =
156 MIN (10 * GST_SECOND, stream->recommended_buffering_threshold);
158 /* Using a starting default, can be overriden later in
159 * ::update_stream_info() */
160 GST_DEBUG_OBJECT (stream,
161 "Setting default 10s buffering threshold on new track");
162 track->buffering_threshold = 10 * GST_SECOND;
166 g_list_append (stream->tracks, gst_adaptive_demux_track_ref (track));
168 g_assert (stream->period);
169 gst_adaptive_demux_period_add_track (stream->period, track);
175 gst_adaptive_demux2_stream_next_download (GstAdaptiveDemux2Stream * stream);
177 gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream);
179 gst_adaptive_demux2_stream_handle_playlist_eos (GstAdaptiveDemux2Stream *
182 gst_adaptive_demux2_stream_begin_download_uri (GstAdaptiveDemux * demux,
183 GstAdaptiveDemux2Stream * stream, const gchar * uri, gint64 start,
186 #ifndef GST_DISABLE_GST_DEBUG
188 uritype (GstAdaptiveDemux2Stream * s)
190 if (s->downloading_header)
192 if (s->downloading_index)
198 /* Schedules another chunked download (returns TRUE) or FALSE if no more chunks */
200 schedule_another_chunk (GstAdaptiveDemux2Stream * stream)
202 GstAdaptiveDemux *demux = stream->demux;
203 DownloadRequest *request = stream->download_request;
206 gchar *uri = request->uri;
207 gint64 range_start = request->range_start;
208 gint64 range_end = request->range_end;
213 return FALSE; /* This was a request to the end, no more to load */
215 /* The size of the request that just completed: */
216 chunk_size = range_end + 1 - range_start;
218 if (request->content_received < chunk_size)
219 return FALSE; /* Short read - we're done */
221 /* Accumulate the data we just fetched, to figure out the next
222 * request start position and update the target chunk size from
223 * the updated stream fragment info */
224 range_start += chunk_size;
225 range_end = stream->fragment.range_end;
226 chunk_size = stream->fragment.chunk_size;
229 return FALSE; /* Sub-class doesn't want another chunk */
231 /* HTTP ranges are inclusive for the end */
232 if (chunk_size != -1) {
233 chunk_end = range_start + chunk_size - 1;
234 if (range_end != -1 && range_end < chunk_end)
235 chunk_end = range_end;
237 chunk_end = range_end;
240 GST_DEBUG_OBJECT (stream,
241 "Starting next chunk %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT
242 " chunk_size %" G_GINT64_FORMAT, uri, range_start, chunk_end, chunk_size);
245 gst_adaptive_demux2_stream_begin_download_uri (demux, stream, uri,
246 range_start, chunk_end);
247 if (ret != GST_FLOW_OK) {
248 GST_DEBUG_OBJECT (stream,
249 "Stopping stream due to begin download failure - ret %s",
250 gst_flow_get_name (ret));
251 gst_adaptive_demux2_stream_stop (stream);
259 drain_inactive_tracks (GstAdaptiveDemux * demux,
260 GstAdaptiveDemux2Stream * stream)
265 for (iter = stream->tracks; iter; iter = iter->next) {
266 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
267 if (!track->selected) {
268 gst_adaptive_demux_track_drain_to (track,
269 demux->priv->global_output_position);
273 TRACKS_UNLOCK (demux);
276 /* Called to complete a download, either due to failure or completion
277 * Should set up the next download if necessary */
279 gst_adaptive_demux2_stream_finish_download (GstAdaptiveDemux2Stream *
280 stream, GstFlowReturn ret, GError * err)
282 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux);
283 GstAdaptiveDemux *demux = stream->demux;
285 GST_DEBUG_OBJECT (stream,
286 "%s download finish: %d %s - err: %p", uritype (stream), ret,
287 gst_flow_get_name (ret), err);
289 stream->download_finished = TRUE;
291 /* finish_fragment might call gst_adaptive_demux2_stream_advance_fragment,
292 * which can look at the last_ret - so make sure it's stored before calling that.
293 * Also, for not-linked or other errors passed in that are going to make
294 * this stream stop, we'll need to store it */
295 stream->last_ret = ret;
298 g_clear_error (&stream->last_error);
299 stream->last_error = g_error_copy (err);
302 /* For actual errors, stop now, no need to call finish_fragment and get
303 * confused if it returns a non-error status, but if EOS was passed in,
304 * continue and check whether finish_fragment() says we've finished
305 * the whole manifest or just this fragment */
306 if (ret < 0 && ret != GST_FLOW_EOS) {
307 GST_INFO_OBJECT (stream,
308 "Stopping stream due to error ret %s", gst_flow_get_name (ret));
309 gst_adaptive_demux2_stream_stop (stream);
313 /* Handle all the possible flow returns here: */
314 if (ret == GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC) {
315 /* We lost sync, seek back to live and return */
316 GST_WARNING_OBJECT (stream, "Lost sync when downloading");
317 gst_adaptive_demux_handle_lost_sync (demux);
319 } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_END_OF_FRAGMENT) {
320 /* The sub-class wants to stop the fragment immediately */
321 stream->fragment.finished = TRUE;
322 ret = klass->finish_fragment (demux, stream);
324 GST_DEBUG_OBJECT (stream, "finish_fragment ret %d %s", ret,
325 gst_flow_get_name (ret));
326 } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_RESTART_FRAGMENT) {
327 GST_DEBUG_OBJECT (stream, "Restarting download as requested");
328 /* Just mark the fragment as finished */
329 stream->fragment.finished = TRUE;
331 } else if (!klass->need_another_chunk || stream->fragment.chunk_size == -1
332 || !klass->need_another_chunk (stream)
333 || stream->fragment.chunk_size == 0) {
334 stream->fragment.finished = TRUE;
335 ret = klass->finish_fragment (stream->demux, stream);
337 GST_DEBUG_OBJECT (stream, "finish_fragment ret %d %s", ret,
338 gst_flow_get_name (ret));
339 } else if (stream->fragment.chunk_size != 0
340 && schedule_another_chunk (stream)) {
341 /* Another download has already begun, no need to queue anything below */
345 /* For HLS, we might be enqueueing data into tracks that aren't
346 * selected. Drain those ones out */
347 drain_inactive_tracks (stream->demux, stream);
349 /* Now that we've called finish_fragment we can clear these flags the
350 * sub-class might have checked */
351 if (stream->downloading_header) {
352 stream->need_header = FALSE;
353 stream->downloading_header = FALSE;
354 } else if (stream->downloading_index) {
355 stream->need_index = FALSE;
356 stream->downloading_index = FALSE;
357 /* Restart the fragment again now that header + index were loaded
358 * so that get_fragment_info() will be called again */
359 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
361 /* Finishing a fragment data download. Try for another */
362 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
365 /* if GST_FLOW_EOS was passed in that means this download is finished,
366 * but it's the result returned from finish_fragment() we really care
367 * about, as that tells us if the manifest has run out of fragments
369 if (ret == GST_FLOW_EOS) {
370 stream->last_ret = ret;
372 gst_adaptive_demux2_stream_handle_playlist_eos (stream);
376 /* Now finally, if ret is anything other than success, we should stop this
379 GST_DEBUG_OBJECT (stream,
380 "Stopping stream due to finish fragment ret %s",
381 gst_flow_get_name (ret));
382 gst_adaptive_demux2_stream_stop (stream);
386 /* Clear the last_ret marker before starting a fresh download */
387 stream->last_ret = GST_FLOW_OK;
389 GST_LOG_OBJECT (stream, "Scheduling next_download() call");
390 stream->pending_cb_id =
391 gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
392 (GSourceFunc) gst_adaptive_demux2_stream_next_download,
393 gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
396 /* Must be called from the scheduler context */
398 gst_adaptive_demux2_stream_parse_error (GstAdaptiveDemux2Stream * stream,
401 GstAdaptiveDemux *demux = stream->demux;
403 if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
406 downloadhelper_cancel_request (demux->download_helper,
407 stream->download_request);
409 /* cancellation is async, so recycle our download request to avoid races */
410 download_request_unref (stream->download_request);
411 stream->download_request = download_request_new ();
413 gst_adaptive_demux2_stream_finish_download (stream, GST_FLOW_CUSTOM_ERROR,
418 gst_adaptive_demux2_stream_prepare_segment (GstAdaptiveDemux * demux,
419 GstAdaptiveDemux2Stream * stream, gboolean first_and_live)
421 GstClockTime period_start = gst_adaptive_demux_get_period_start_time (demux);
422 GstClockTime offset =
423 gst_adaptive_demux2_stream_get_presentation_offset (demux, stream);
425 stream->parse_segment = demux->segment;
427 /* The demuxer segment is just built from seek events, but for each stream
428 * we have to adjust segments according to the current period and the
429 * stream specific presentation time offset.
431 * For each period, buffer timestamps start again from 0. Additionally the
432 * buffer timestamps are shifted by the stream specific presentation time
433 * offset, so the first buffer timestamp of a period is 0 + presentation
434 * time offset. If the stream contains timestamps itself, this is also
435 * supposed to be the presentation time stored inside the stream.
437 * The stream time over periods is supposed to be continuous, that is the
438 * buffer timestamp 0 + presentation time offset should map to the start
439 * time of the current period.
442 * The adjustment of the stream segments as such works the following.
444 * If the demuxer segment start is bigger than the period start, this
445 * means that we have to drop some media at the beginning of the current
446 * period, e.g. because a seek into the middle of the period has
447 * happened. The amount of media to drop is the difference between the
448 * period start and the demuxer segment start, and as each period starts
449 * again from 0, this difference is going to be the actual stream's
450 * segment start. As all timestamps of the stream are shifted by the
451 * presentation time offset, we will also have to move the segment start
454 * Likewise, the demuxer segment stop value is adjusted in the same
457 * Now the running time and stream time at the stream's segment start has
458 * to be the one that is stored inside the demuxer's segment, which means
459 * that segment.base and segment.time have to be copied over (done just
463 * If the demuxer segment start is smaller than the period start time,
464 * this means that the whole period is inside the segment. As each period
465 * starts timestamps from 0, and additionally timestamps are shifted by
466 * the presentation time offset, the stream's first timestamp (and as such
467 * the stream's segment start) has to be the presentation time offset.
468 * The stream time at the segment start is supposed to be the stream time
469 * of the period start according to the demuxer segment, so the stream
470 * segment's time would be set to that. The same goes for the stream
471 * segment's base, which is supposed to be the running time of the period
472 * start according to the demuxer's segment.
474 * The same logic applies for negative rates with the segment stop and
475 * the period stop time (which gets clamped).
478 * For the first case where not the complete period is inside the segment,
479 * the segment time and base as calculated by the second case would be
482 GST_DEBUG_OBJECT (demux, "Using demux segment %" GST_SEGMENT_FORMAT,
484 GST_DEBUG_OBJECT (demux,
485 "period_start: %" GST_TIME_FORMAT " offset: %" GST_TIME_FORMAT,
486 GST_TIME_ARGS (period_start), GST_TIME_ARGS (offset));
488 * Since stream->parse_segment is initially a copy of demux->segment,
489 * only the values that need updating are modified below. */
490 if (first_and_live) {
491 /* If first and live, demuxer did seek to the current position already */
492 stream->parse_segment.start = demux->segment.start - period_start + offset;
493 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
494 stream->parse_segment.stop = demux->segment.stop - period_start + offset;
495 /* FIXME : Do we need to handle negative rates for this ? */
496 stream->parse_segment.position = stream->parse_segment.start;
497 } else if (demux->segment.start > period_start) {
498 /* seek within a period */
499 stream->parse_segment.start = demux->segment.start - period_start + offset;
500 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
501 stream->parse_segment.stop = demux->segment.stop - period_start + offset;
502 if (stream->parse_segment.rate >= 0)
503 stream->parse_segment.position = offset;
505 stream->parse_segment.position = stream->parse_segment.stop;
507 stream->parse_segment.start = offset;
508 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
509 stream->parse_segment.stop = demux->segment.stop - period_start + offset;
510 if (stream->parse_segment.rate >= 0) {
511 stream->parse_segment.position = offset;
512 stream->parse_segment.base =
513 gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
516 stream->parse_segment.position = stream->parse_segment.stop;
517 stream->parse_segment.base =
518 gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
519 period_start + demux->segment.stop - demux->segment.start);
521 stream->parse_segment.time =
522 gst_segment_to_stream_time (&demux->segment, GST_FORMAT_TIME,
526 stream->send_segment = TRUE;
528 GST_DEBUG_OBJECT (stream, "Prepared segment %" GST_SEGMENT_FORMAT,
529 &stream->parse_segment);
532 /* Segment lock hold */
534 update_buffer_pts_and_demux_position_locked (GstAdaptiveDemux * demux,
535 GstAdaptiveDemux2Stream * stream, GstBuffer * buffer)
537 GstClockTimeDiff pos;
539 GST_DEBUG_OBJECT (stream, "stream->fragment.stream_time %" GST_STIME_FORMAT,
540 GST_STIME_ARGS (stream->fragment.stream_time));
542 pos = stream->fragment.stream_time;
544 if (GST_CLOCK_STIME_IS_VALID (pos)) {
545 GstClockTime offset =
546 gst_adaptive_demux2_stream_get_presentation_offset (demux, stream);
551 GST_WARNING_OBJECT (stream, "Clamping segment and buffer position to 0");
555 GST_BUFFER_PTS (buffer) = pos;
557 GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
560 GST_DEBUG_OBJECT (stream, "Buffer/stream position is now: %" GST_TIME_FORMAT,
561 GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
564 /* Must be called from the scheduler context */
566 gst_adaptive_demux2_stream_push_buffer (GstAdaptiveDemux2Stream * stream,
569 GstAdaptiveDemux *demux = stream->demux;
570 GstFlowReturn ret = GST_FLOW_OK;
571 gboolean discont = FALSE;
573 GstEvent *pending_caps = NULL, *pending_segment = NULL, *pending_tags =
574 NULL, *stream_start = NULL, *buffer_gap = NULL;
575 GList *pending_events = NULL;
577 if (stream->compute_segment) {
578 gst_adaptive_demux2_stream_prepare_segment (demux, stream,
579 stream->first_and_live);
580 stream->compute_segment = FALSE;
581 stream->first_and_live = FALSE;
584 if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DROPPABLE)) {
585 GST_DEBUG_OBJECT (stream, "Creating gap event for droppable buffer");
587 gst_event_new_gap (GST_BUFFER_PTS (buffer),
588 GST_BUFFER_DURATION (buffer));
591 if (stream->first_fragment_buffer) {
592 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
593 if (demux->segment.rate < 0)
594 /* Set DISCONT flag for every first buffer in reverse playback mode
595 * as each fragment for its own has to be reversed */
597 update_buffer_pts_and_demux_position_locked (demux, stream, buffer);
598 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
600 GST_LOG_OBJECT (stream, "Handling initial buffer %" GST_PTR_FORMAT, buffer);
602 /* Do we need to inject STREAM_START and SEGMENT events ?
604 * This can happen when a stream is restarted, and also when switching to a
605 * variant which needs a header (in which case downloading_header will be
608 if (G_UNLIKELY (stream->send_segment || stream->downloading_header)) {
609 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
610 pending_segment = gst_event_new_segment (&stream->parse_segment);
611 gst_event_set_seqnum (pending_segment, demux->priv->segment_seqnum);
612 stream->send_segment = FALSE;
613 GST_DEBUG_OBJECT (stream, "Sending %" GST_PTR_FORMAT, pending_segment);
614 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
615 stream_start = gst_event_new_stream_start ("bogus");
616 if (demux->have_group_id)
617 gst_event_set_group_id (stream_start, demux->group_id);
620 GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
622 stream->first_fragment_buffer = FALSE;
624 if (stream->discont) {
626 stream->discont = FALSE;
630 GST_DEBUG_OBJECT (stream, "Marking fragment as discontinuous");
631 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
633 GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
636 GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
637 GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
638 if (G_UNLIKELY (stream->pending_caps)) {
639 pending_caps = gst_event_new_caps (stream->pending_caps);
640 gst_caps_unref (stream->pending_caps);
641 stream->pending_caps = NULL;
644 if (G_UNLIKELY (stream->pending_tags)) {
645 GstTagList *tags = stream->pending_tags;
647 stream->pending_tags = NULL;
650 pending_tags = gst_event_new_tag (tags);
652 if (G_UNLIKELY (stream->pending_events)) {
653 pending_events = stream->pending_events;
654 stream->pending_events = NULL;
657 /* Do not push events or buffers holding the manifest lock */
658 if (G_UNLIKELY (stream_start)) {
659 GST_DEBUG_OBJECT (stream,
660 "Setting stream start: %" GST_PTR_FORMAT, stream_start);
661 gst_pad_send_event (stream->parsebin_sink, stream_start);
663 if (G_UNLIKELY (pending_caps)) {
664 GST_DEBUG_OBJECT (stream,
665 "Setting pending caps: %" GST_PTR_FORMAT, pending_caps);
666 gst_pad_send_event (stream->parsebin_sink, pending_caps);
668 if (G_UNLIKELY (pending_segment)) {
669 GST_DEBUG_OBJECT (stream,
670 "Sending pending seg: %" GST_PTR_FORMAT, pending_segment);
671 gst_pad_send_event (stream->parsebin_sink, pending_segment);
673 if (G_UNLIKELY (pending_tags)) {
674 GST_DEBUG_OBJECT (stream,
675 "Sending pending tags: %" GST_PTR_FORMAT, pending_tags);
676 gst_pad_send_event (stream->parsebin_sink, pending_tags);
678 while (pending_events != NULL) {
679 GstEvent *event = pending_events->data;
681 GST_DEBUG_OBJECT (stream, "Sending pending event: %" GST_PTR_FORMAT, event);
682 if (!gst_pad_send_event (stream->parsebin_sink, event))
683 GST_ERROR_OBJECT (stream, "Failed to send pending event");
685 pending_events = g_list_delete_link (pending_events, pending_events);
688 GST_DEBUG_OBJECT (stream,
689 "About to push buffer of size %" G_GSIZE_FORMAT " offset %"
690 G_GUINT64_FORMAT, gst_buffer_get_size (buffer),
691 GST_BUFFER_OFFSET (buffer));
693 ret = gst_pad_chain (stream->parsebin_sink, buffer);
696 GST_DEBUG_OBJECT (stream, "Sending %" GST_PTR_FORMAT, buffer_gap);
697 gst_pad_send_event (stream->parsebin_sink, buffer_gap);
700 if (G_UNLIKELY (stream->cancelled)) {
701 GST_LOG_OBJECT (demux, "Stream was cancelled");
702 return GST_FLOW_FLUSHING;
705 GST_LOG_OBJECT (stream, "Push result: %d %s", ret, gst_flow_get_name (ret));
711 gst_adaptive_demux2_stream_parse_buffer (GstAdaptiveDemux2Stream * stream,
714 GstAdaptiveDemux *demux = stream->demux;
715 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
716 GstFlowReturn ret = GST_FLOW_OK;
718 /* do not make any changes if the stream is cancelled */
719 if (G_UNLIKELY (stream->cancelled)) {
720 gst_buffer_unref (buffer);
721 return GST_FLOW_FLUSHING;
724 /* starting_fragment is set to TRUE at the beginning of
725 * _stream_download_fragment()
726 * /!\ If there is a header/index being downloaded, then this will
727 * be TRUE for the first one ... but FALSE for the remaining ones,
728 * including the *actual* fragment ! */
729 if (stream->starting_fragment) {
730 stream->starting_fragment = FALSE;
731 if (klass->start_fragment != NULL && !klass->start_fragment (demux, stream))
732 return GST_FLOW_ERROR;
735 stream->download_total_bytes += gst_buffer_get_size (buffer);
737 GST_TRACE_OBJECT (stream,
738 "Received %s buffer of size %" G_GSIZE_FORMAT, uritype (stream),
739 gst_buffer_get_size (buffer));
741 ret = klass->data_received (demux, stream, buffer);
743 if (ret != GST_FLOW_OK) {
744 GST_DEBUG_OBJECT (stream, "data_received returned %s",
745 gst_flow_get_name (ret));
747 if (ret == GST_FLOW_FLUSHING) {
748 /* do not make any changes if the stream is cancelled */
749 if (G_UNLIKELY (stream->cancelled)) {
754 if (ret < GST_FLOW_EOS) {
755 GstEvent *eos = gst_event_new_eos ();
756 GST_ELEMENT_FLOW_ERROR (demux, ret);
758 GST_DEBUG_OBJECT (stream, "Pushing EOS to parser");
760 /* TODO push this on all pads */
761 gst_event_set_seqnum (eos, stream->demux->priv->segment_seqnum);
762 gst_pad_send_event (stream->parsebin_sink, eos);
763 ret = GST_FLOW_ERROR;
765 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_ERRORED;
772 /* Calculate the low and high download buffering watermarks
773 * in time as MAX (low-watermark-time, low-watermark-fragments) and
774 * MIN (high-watermark-time, high-watermark-fragments) respectively
777 calculate_track_thresholds (GstAdaptiveDemux * demux,
778 GstAdaptiveDemux2Stream * stream,
779 GstClockTime fragment_duration, GstClockTime * low_threshold,
780 GstClockTime * high_threshold)
782 GST_OBJECT_LOCK (demux);
783 *low_threshold = demux->buffering_low_watermark_fragments * fragment_duration;
784 if (*low_threshold == 0 ||
785 (demux->buffering_low_watermark_time != 0
786 && demux->buffering_low_watermark_time > *low_threshold)) {
787 *low_threshold = demux->buffering_low_watermark_time;
790 if (*low_threshold == 0) {
791 /* This implies both low level properties were 0, the default is 10s unless
792 * the subclass has specified a recommended buffering threshold */
793 *low_threshold = 10 * GST_SECOND;
794 if (GST_CLOCK_TIME_IS_VALID (stream->recommended_buffering_threshold))
796 MIN (stream->recommended_buffering_threshold, *low_threshold);
800 demux->buffering_high_watermark_fragments * fragment_duration;
801 if (*high_threshold == 0 || (demux->buffering_high_watermark_time != 0
802 && demux->buffering_high_watermark_time < *high_threshold)) {
803 *high_threshold = demux->buffering_high_watermark_time;
806 /* Make sure the low and high thresholds are less than the maximum buffering
808 if (*high_threshold == 0 ||
809 (demux->max_buffering_time != 0
810 && demux->max_buffering_time < *high_threshold)) {
811 *high_threshold = demux->max_buffering_time;
814 if (*low_threshold == 0 ||
815 (demux->max_buffering_time != 0
816 && demux->max_buffering_time < *low_threshold)) {
817 *low_threshold = demux->max_buffering_time;
820 /* Make sure the high threshold is higher than (or equal to) the low threshold.
821 * It's OK if they are the same, as the minimum download is 1 fragment */
822 if (*high_threshold == 0 ||
823 (*low_threshold != 0 && *low_threshold > *high_threshold)) {
824 *high_threshold = *low_threshold;
827 GST_OBJECT_UNLOCK (demux);
830 #define ABSDIFF(a,b) ((a) < (b) ? (b) - (a) : (a) - (b))
832 gst_adaptive_demux2_stream_wait_for_output_space (GstAdaptiveDemux * demux,
833 GstAdaptiveDemux2Stream * stream, GstClockTime fragment_duration)
835 gboolean need_to_wait = TRUE;
836 gboolean have_any_tracks = FALSE;
837 gboolean have_active_tracks = FALSE;
838 gboolean have_filled_inactive = FALSE;
839 gboolean update_buffering = FALSE;
841 GstClockTime low_threshold = 0, high_threshold = 0;
844 calculate_track_thresholds (demux, stream, fragment_duration,
845 &low_threshold, &high_threshold);
846 GST_DEBUG_OBJECT (stream,
847 "Thresholds low:%" GST_TIME_FORMAT " high:%" GST_TIME_FORMAT
848 " recommended:%" GST_TIME_FORMAT, GST_TIME_ARGS (low_threshold),
849 GST_TIME_ARGS (high_threshold),
850 GST_TIME_ARGS (stream->recommended_buffering_threshold));
852 /* If there are no tracks at all, don't wait. If there are no active
853 * tracks, keep filling until at least one track is full. If there
854 * are active tracks, require that they are all full */
856 for (iter = stream->tracks; iter; iter = iter->next) {
857 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
859 /* Update the buffering threshold if it changed by more than a second */
860 if (ABSDIFF (low_threshold, track->buffering_threshold) > GST_SECOND) {
861 GST_DEBUG_OBJECT (stream, "Updating threshold");
862 /* The buffering threshold for this track changed, make sure to
863 * re-check buffering status */
864 update_buffering = TRUE;
865 track->buffering_threshold = low_threshold;
868 have_any_tracks = TRUE;
870 have_active_tracks = TRUE;
872 if (track->level_time < high_threshold) {
874 need_to_wait = FALSE;
875 GST_DEBUG_OBJECT (stream,
876 "track %s has level %" GST_TIME_FORMAT
877 " - needs more data (target %" GST_TIME_FORMAT
878 ") (fragment duration %" GST_TIME_FORMAT ")",
879 track->stream_id, GST_TIME_ARGS (track->level_time),
880 GST_TIME_ARGS (high_threshold), GST_TIME_ARGS (fragment_duration));
883 } else if (!track->active) { /* track is over threshold and inactive */
884 have_filled_inactive = TRUE;
887 GST_DEBUG_OBJECT (stream,
888 "track %s active (%d) has level %" GST_TIME_FORMAT,
889 track->stream_id, track->active, GST_TIME_ARGS (track->level_time));
892 /* If there are no tracks, don't wait (we might need data to create them),
893 * or if there are active tracks that need more data to hit the threshold,
894 * don't wait. Otherwise it means all active tracks are full and we should wait */
895 if (!have_any_tracks) {
896 GST_DEBUG_OBJECT (stream, "no tracks created yet - not waiting");
897 need_to_wait = FALSE;
898 } else if (!have_active_tracks && !have_filled_inactive) {
899 GST_DEBUG_OBJECT (stream,
900 "have only inactive tracks that need more data - not waiting");
901 need_to_wait = FALSE;
905 stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
907 for (iter = stream->tracks; iter; iter = iter->next) {
908 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
910 GST_DEBUG_OBJECT (stream,
911 "Waiting for queued data on track %s to drop below %"
912 GST_TIME_FORMAT " (fragment duration %" GST_TIME_FORMAT ")",
913 track->stream_id, GST_TIME_ARGS (high_threshold),
914 GST_TIME_ARGS (fragment_duration));
916 /* we want to get woken up when the global output position reaches
917 * a point where the input is closer than "high_threshold" to needing
918 * output, so we can put more data in */
919 GstClockTimeDiff wakeup_time = track->input_time - high_threshold;
921 if (stream->next_input_wakeup_time == GST_CLOCK_STIME_NONE ||
922 wakeup_time < stream->next_input_wakeup_time) {
923 stream->next_input_wakeup_time = wakeup_time;
925 GST_DEBUG_OBJECT (stream,
926 "Track %s level %" GST_TIME_FORMAT ". Input at position %"
927 GST_TIME_FORMAT " next wakeup should be %" GST_TIME_FORMAT " now %"
928 GST_TIME_FORMAT, track->stream_id,
929 GST_TIME_ARGS (track->level_time),
930 GST_TIME_ARGS (track->input_time), GST_TIME_ARGS (wakeup_time),
931 GST_TIME_ARGS (demux->priv->global_output_position));
935 if (stream->next_input_wakeup_time != GST_CLOCK_TIME_NONE) {
936 GST_DEBUG_OBJECT (stream,
937 "Next input wakeup time is now %" GST_TIME_FORMAT,
938 GST_TIME_ARGS (stream->next_input_wakeup_time));
940 /* If this stream needs waking up sooner than any other current one,
941 * update the period wakeup time, which is what the output loop
943 GstAdaptiveDemuxPeriod *period = stream->period;
944 if (period->next_input_wakeup_time == GST_CLOCK_STIME_NONE ||
945 period->next_input_wakeup_time > stream->next_input_wakeup_time) {
946 period->next_input_wakeup_time = stream->next_input_wakeup_time;
951 if (update_buffering) {
952 demux_update_buffering_locked (demux);
953 demux_post_buffering_locked (demux);
956 TRACKS_UNLOCK (demux);
961 static GstAdaptiveDemuxTrack *
962 match_parsebin_to_track (GstAdaptiveDemux2Stream * stream, GstPad * pad)
965 GstAdaptiveDemuxTrack *found_track = NULL, *first_matched_track = NULL;
966 gint num_possible_tracks = 0;
967 GstStream *gst_stream;
968 const gchar *internal_stream_id;
969 GstStreamType stream_type;
971 gst_stream = gst_pad_get_stream (pad);
973 /* FIXME: Edward: Added assertion because I don't see in what cases we would
974 * end up with a pad from parsebin which wouldn't have an associated
976 g_assert (gst_stream);
978 internal_stream_id = gst_stream_get_stream_id (gst_stream);
979 stream_type = gst_stream_get_stream_type (gst_stream);
981 GST_DEBUG_OBJECT (pad,
982 "Trying to match pad from parsebin with internal streamid %s and caps %"
983 GST_PTR_FORMAT, GST_STR_NULL (internal_stream_id),
984 gst_stream_get_caps (gst_stream));
986 /* Try to match directly by the track's pending upstream_stream_id */
987 for (tmp = stream->tracks; tmp; tmp = tmp->next) {
988 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
990 if (stream_type != GST_STREAM_TYPE_UNKNOWN && track->type != stream_type)
993 GST_DEBUG_OBJECT (pad, "track upstream_stream_id: %s",
994 track->upstream_stream_id);
996 if (first_matched_track == NULL)
997 first_matched_track = track;
998 num_possible_tracks++;
1000 /* If this track has a desired upstream stream id, match on it */
1001 if (track->upstream_stream_id == NULL ||
1002 g_strcmp0 (track->upstream_stream_id, internal_stream_id)) {
1003 /* This is not the track for this pad */
1007 /* Remove pending upstream id (we have matched it for the pending
1009 g_free (track->upstream_stream_id);
1010 track->upstream_stream_id = NULL;
1011 found_track = track;
1015 if (found_track == NULL) {
1016 /* If we arrive here, it means the stream is switching pads after
1017 * the stream has already started running */
1018 /* No track is currently waiting for this particular stream id -
1019 * try and match an existing linked track. If there's only 1 possible,
1021 if (num_possible_tracks == 1 && first_matched_track != NULL) {
1022 GST_LOG_OBJECT (pad, "Only one possible track to link to");
1023 found_track = first_matched_track;
1027 if (found_track == NULL) {
1028 /* TODO: There are multiple possible tracks, need to match based
1029 * on language code and caps. Have you found a stream like this? */
1030 GST_FIXME_OBJECT (pad, "Need to match track based on caps and language");
1033 if (found_track != NULL) {
1034 if (!gst_pad_is_linked (found_track->sinkpad)) {
1035 GST_LOG_OBJECT (pad, "Linking to track pad %" GST_PTR_FORMAT,
1036 found_track->sinkpad);
1038 if (gst_pad_link (pad, found_track->sinkpad) != GST_PAD_LINK_OK) {
1039 GST_ERROR_OBJECT (pad, "Couldn't connect to track sinkpad");
1040 /* FIXME : Do something if we can't link ? */
1043 /* Store pad as pending link */
1044 GST_LOG_OBJECT (pad,
1045 "Remembering pad to be linked when current pad is unlinked");
1046 g_assert (found_track->pending_srcpad == NULL);
1047 found_track->pending_srcpad = gst_object_ref (pad);
1052 gst_object_unref (gst_stream);
1058 parsebin_pad_removed_cb (GstElement * parsebin, GstPad * pad,
1059 GstAdaptiveDemux2Stream * stream)
1062 GST_DEBUG_OBJECT (stream, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1064 /* Remove from pending source pad */
1065 TRACKS_LOCK (stream->demux);
1066 for (iter = stream->tracks; iter; iter = iter->next) {
1067 GstAdaptiveDemuxTrack *track = iter->data;
1068 if (track->pending_srcpad == pad) {
1069 gst_object_unref (track->pending_srcpad);
1070 track->pending_srcpad = NULL;
1074 TRACKS_UNLOCK (stream->demux);
1078 parsebin_pad_added_cb (GstElement * parsebin, GstPad * pad,
1079 GstAdaptiveDemux2Stream * stream)
1081 if (!GST_PAD_IS_SRC (pad))
1084 GST_DEBUG_OBJECT (stream, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1086 if (!match_parsebin_to_track (stream, pad))
1087 GST_WARNING_OBJECT (pad, "Found no track to handle pad");
1089 GST_DEBUG_OBJECT (stream->demux, "Done linking");
1093 parsebin_deep_element_added_cb (GstBin * parsebin, GstBin * unused,
1094 GstElement * element, GstAdaptiveDemux * demux)
1096 if (G_OBJECT_TYPE (element) == tsdemux_type) {
1097 GST_DEBUG_OBJECT (demux, "Overriding tsdemux ignore-pcr to TRUE");
1098 g_object_set (element, "ignore-pcr", TRUE, NULL);
1102 /* must be called with manifest_lock taken */
1104 gst_adaptive_demux2_stream_create_parser (GstAdaptiveDemux2Stream * stream)
1106 GstAdaptiveDemux *demux = stream->demux;
1108 if (stream->parsebin == NULL) {
1111 GST_DEBUG_OBJECT (demux, "Setting up new parsing source");
1113 /* Workaround to detect if tsdemux is being used */
1114 if (tsdemux_type == 0) {
1115 GstElement *element = gst_element_factory_make ("tsdemux", NULL);
1117 tsdemux_type = G_OBJECT_TYPE (element);
1118 gst_object_unref (element);
1122 stream->parsebin = gst_element_factory_make ("parsebin", NULL);
1124 g_signal_connect (stream->parsebin, "deep-element-added",
1125 (GCallback) parsebin_deep_element_added_cb, demux);
1126 gst_bin_add (GST_BIN_CAST (demux), gst_object_ref (stream->parsebin));
1127 stream->parsebin_sink =
1128 gst_element_get_static_pad (stream->parsebin, "sink");
1129 stream->pad_added_id = g_signal_connect (stream->parsebin, "pad-added",
1130 G_CALLBACK (parsebin_pad_added_cb), stream);
1131 stream->pad_removed_id = g_signal_connect (stream->parsebin, "pad-removed",
1132 G_CALLBACK (parsebin_pad_removed_cb), stream);
1134 event = gst_event_new_stream_start ("bogus");
1135 if (demux->have_group_id)
1136 gst_event_set_group_id (event, demux->group_id);
1138 gst_pad_send_event (stream->parsebin_sink, event);
1140 /* Not sure if these need to be outside the manifest lock: */
1141 gst_element_sync_state_with_parent (stream->parsebin);
1142 stream->last_status_code = 200; /* default to OK */
1148 on_download_cancellation (DownloadRequest * request, DownloadRequestState state,
1149 GstAdaptiveDemux2Stream * stream)
1154 on_download_error (DownloadRequest * request, DownloadRequestState state,
1155 GstAdaptiveDemux2Stream * stream)
1157 GstAdaptiveDemux *demux = stream->demux;
1158 guint last_status_code = request->status_code;
1161 stream->download_active = FALSE;
1162 stream->last_status_code = last_status_code;
1164 GST_DEBUG_OBJECT (stream,
1165 "Download finished with error, request state %d http status %u, dc %d",
1166 request->state, last_status_code, stream->download_error_count);
1168 live = gst_adaptive_demux_is_live (demux);
1169 if (((last_status_code / 100 == 4 && live)
1170 || last_status_code / 100 == 5)) {
1172 /* if current position is before available start, switch to next */
1173 if (!gst_adaptive_demux2_stream_has_next_fragment (demux, stream))
1177 gint64 range_start, range_stop;
1179 if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
1183 if (demux->segment.position < range_start) {
1186 GST_DEBUG_OBJECT (stream, "Retrying once with next segment");
1187 gst_adaptive_demux2_stream_finish_download (stream, GST_FLOW_EOS, NULL);
1189 GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1191 ret = gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
1192 GST_DEBUG_OBJECT (stream, "update_fragment_info ret: %s",
1193 gst_flow_get_name (ret));
1195 if (ret == GST_FLOW_OK)
1198 } else if (demux->segment.position > range_stop) {
1199 /* wait a bit to be in range, we don't have any locks at that point */
1200 GstClockTime wait_time =
1201 gst_adaptive_demux2_stream_get_fragment_waiting_time (demux,
1203 if (wait_time > 0) {
1204 GST_DEBUG_OBJECT (stream,
1205 "Download waiting for %" GST_TIME_FORMAT,
1206 GST_TIME_ARGS (wait_time));
1207 g_assert (stream->pending_cb_id == 0);
1208 GST_LOG_OBJECT (stream, "Scheduling delayed load_a_fragment() call");
1209 stream->pending_cb_id =
1210 gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1212 (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1213 gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1219 if (stream->download_error_count >= MAX_DOWNLOAD_ERROR_COUNT) {
1220 /* looks like there is no way of knowing when a live stream has ended
1221 * Have to assume we are falling behind and cause a manifest reload */
1222 GST_DEBUG_OBJECT (stream, "Converting error of live stream to EOS");
1223 gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1226 } else if (!gst_adaptive_demux2_stream_has_next_fragment (demux, stream)) {
1227 /* If this is the last fragment, consider failures EOS and not actual
1228 * errors. Due to rounding errors in the durations, the last fragment
1229 * might not actually exist */
1230 GST_DEBUG_OBJECT (stream, "Converting error for last fragment to EOS");
1231 gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1234 /* retry same segment */
1235 if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
1236 gst_adaptive_demux2_stream_error (stream);
1243 /* wait a short time in case the server needs a bit to recover */
1244 GST_LOG_OBJECT (stream,
1245 "Scheduling delayed load_a_fragment() call to retry in 10 milliseconds");
1246 g_assert (stream->pending_cb_id == 0);
1247 stream->pending_cb_id = gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task, 10 * GST_MSECOND, /* Retry in 10 ms */
1248 (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1249 gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1253 update_stream_bitrate (GstAdaptiveDemux2Stream * stream,
1254 DownloadRequest * request)
1256 GstClockTimeDiff last_download_duration;
1257 guint64 fragment_bytes_downloaded = request->content_received;
1259 /* The stream last_download time tracks the full download time for now */
1260 stream->last_download_time =
1261 GST_CLOCK_DIFF (request->download_request_time,
1262 request->download_end_time);
1264 /* Here we only track the time the data took to arrive and ignore request delay, so we can estimate bitrate */
1265 last_download_duration =
1266 GST_CLOCK_DIFF (request->download_start_time, request->download_end_time);
1268 /* If the entire response arrived in the first buffer
1269 * though, include the request time to get a valid
1270 * bitrate estimate */
1271 if (last_download_duration < 2 * stream->last_download_time)
1272 last_download_duration = stream->last_download_time;
1274 if (last_download_duration > 0) {
1275 stream->last_bitrate =
1276 gst_util_uint64_scale (fragment_bytes_downloaded,
1277 8 * GST_SECOND, last_download_duration);
1279 GST_DEBUG_OBJECT (stream,
1280 "Updated stream bitrate. %" G_GUINT64_FORMAT
1281 " bytes. download time %" GST_TIME_FORMAT " bitrate %"
1282 G_GUINT64_FORMAT " bps", fragment_bytes_downloaded,
1283 GST_TIME_ARGS (last_download_duration), stream->last_bitrate);
1288 on_download_progress (DownloadRequest * request, DownloadRequestState state,
1289 GstAdaptiveDemux2Stream * stream)
1291 GstAdaptiveDemux *demux = stream->demux;
1292 GstBuffer *buffer = download_request_take_buffer (request);
1297 GST_DEBUG_OBJECT (stream,
1298 "Handling buffer of %" G_GSIZE_FORMAT
1299 " bytes of ongoing download progress - %" G_GUINT64_FORMAT " / %"
1300 G_GUINT64_FORMAT " bytes", gst_buffer_get_size (buffer),
1301 request->content_received, request->content_length);
1303 /* Drop the request lock when parsing data. FIXME: Check and comment why this is needed */
1304 download_request_unlock (request);
1305 ret = gst_adaptive_demux2_stream_parse_buffer (stream, buffer);
1306 download_request_lock (request);
1308 if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
1311 if (ret != GST_FLOW_OK) {
1312 GST_DEBUG_OBJECT (stream,
1313 "Buffer parsing returned: %d %s. Aborting download", ret,
1314 gst_flow_get_name (ret));
1316 if (!stream->downloading_header && !stream->downloading_index)
1317 update_stream_bitrate (stream, request);
1319 downloadhelper_cancel_request (demux->download_helper, request);
1321 /* cancellation is async, so recycle our download request to avoid races */
1322 download_request_unref (stream->download_request);
1323 stream->download_request = download_request_new ();
1325 gst_adaptive_demux2_stream_finish_download (stream, ret, NULL);
1331 on_download_complete (DownloadRequest * request, DownloadRequestState state,
1332 GstAdaptiveDemux2Stream * stream)
1334 GstFlowReturn ret = GST_FLOW_OK;
1337 stream->download_active = FALSE;
1339 if (G_UNLIKELY (stream->cancelled))
1342 GST_DEBUG_OBJECT (stream,
1343 "Stream %p %s download for %s is complete with state %d",
1344 stream, uritype (stream), request->uri, request->state);
1346 /* Update bitrate for fragment downloads */
1347 if (!stream->downloading_header && !stream->downloading_index)
1348 update_stream_bitrate (stream, request);
1350 buffer = download_request_take_buffer (request);
1352 ret = gst_adaptive_demux2_stream_parse_buffer (stream, buffer);
1354 GST_DEBUG_OBJECT (stream,
1355 "%s download finished: %s ret %d %s. Stream state %d", uritype (stream),
1356 request->uri, ret, gst_flow_get_name (ret), stream->state);
1358 if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING)
1361 g_assert (stream->pending_cb_id == 0);
1362 gst_adaptive_demux2_stream_finish_download (stream, ret, NULL);
1365 /* must be called from the scheduler context
1367 * Will submit the request only, which will complete asynchronously
1369 static GstFlowReturn
1370 gst_adaptive_demux2_stream_begin_download_uri (GstAdaptiveDemux * demux,
1371 GstAdaptiveDemux2Stream * stream, const gchar * uri, gint64 start,
1374 DownloadRequest *request = stream->download_request;
1376 GST_DEBUG_OBJECT (demux,
1377 "Downloading %s uri: %s, range:%" G_GINT64_FORMAT " - %" G_GINT64_FORMAT,
1378 uritype (stream), uri, start, end);
1380 if (!gst_adaptive_demux2_stream_create_parser (stream))
1381 return GST_FLOW_ERROR;
1383 /* Configure our download request */
1384 download_request_set_uri (request, uri, start, end);
1386 if (stream->downloading_header || stream->downloading_index) {
1387 download_request_set_callbacks (request,
1388 (DownloadRequestEventCallback) on_download_complete,
1389 (DownloadRequestEventCallback) on_download_error,
1390 (DownloadRequestEventCallback) on_download_cancellation,
1391 (DownloadRequestEventCallback) NULL, stream);
1393 download_request_set_callbacks (request,
1394 (DownloadRequestEventCallback) on_download_complete,
1395 (DownloadRequestEventCallback) on_download_error,
1396 (DownloadRequestEventCallback) on_download_cancellation,
1397 (DownloadRequestEventCallback) on_download_progress, stream);
1400 if (!downloadhelper_submit_request (demux->download_helper,
1401 demux->manifest_uri, DOWNLOAD_FLAG_NONE, request, NULL))
1402 return GST_FLOW_ERROR;
1404 stream->download_active = TRUE;
1409 /* must be called from the scheduler context */
1410 static GstFlowReturn
1411 gst_adaptive_demux2_stream_download_fragment (GstAdaptiveDemux2Stream * stream)
1413 GstAdaptiveDemux *demux = stream->demux;
1414 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1418 /* THERE ARE THREE DIFFERENT VARIABLES FOR THE "BEGINNING" OF A FRAGMENT ! */
1419 if (stream->starting_fragment) {
1420 GST_DEBUG_OBJECT (stream, "Downloading %s%s%s",
1421 stream->fragment.uri ? "FRAGMENT " : "",
1422 stream->need_header && stream->fragment.header_uri ? "HEADER " : "",
1423 stream->need_index && stream->fragment.index_uri ? "INDEX" : "");
1425 if (stream->fragment.uri == NULL && stream->fragment.header_uri == NULL &&
1426 stream->fragment.index_uri == NULL)
1429 stream->first_fragment_buffer = TRUE;
1430 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING;
1433 if (stream->need_header && stream->fragment.header_uri != NULL) {
1435 /* Set the need_index flag when we start the header if we'll also need the index */
1436 stream->need_index = (stream->fragment.index_uri != NULL);
1438 GST_DEBUG_OBJECT (stream, "Fetching header %s %" G_GINT64_FORMAT "-%"
1439 G_GINT64_FORMAT, stream->fragment.header_uri,
1440 stream->fragment.header_range_start, stream->fragment.header_range_end);
1442 stream->downloading_header = TRUE;
1444 return gst_adaptive_demux2_stream_begin_download_uri (demux, stream,
1445 stream->fragment.header_uri, stream->fragment.header_range_start,
1446 stream->fragment.header_range_end);
1449 /* check if we have an index */
1450 if (stream->need_index && stream->fragment.index_uri != NULL) {
1451 GST_DEBUG_OBJECT (stream,
1452 "Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
1453 stream->fragment.index_uri,
1454 stream->fragment.index_range_start, stream->fragment.index_range_end);
1456 stream->downloading_index = TRUE;
1458 return gst_adaptive_demux2_stream_begin_download_uri (demux, stream,
1459 stream->fragment.index_uri, stream->fragment.index_range_start,
1460 stream->fragment.index_range_end);
1463 url = stream->fragment.uri;
1464 GST_DEBUG_OBJECT (stream, "Got url '%s' for stream %p", url, stream);
1468 /* Download the actual fragment, either in chunks or in one go */
1469 stream->first_fragment_buffer = TRUE;
1471 if (klass->need_another_chunk && klass->need_another_chunk (stream)
1472 && stream->fragment.chunk_size != 0) {
1473 /* Handle chunk downloading */
1474 gint64 range_start = stream->fragment.range_start;
1475 gint64 range_end = stream->fragment.range_end;
1476 gint chunk_size = stream->fragment.chunk_size;
1479 /* HTTP ranges are inclusive for the end */
1480 if (chunk_size != -1) {
1481 chunk_end = range_start + chunk_size - 1;
1482 if (range_end != -1 && range_end < chunk_end)
1483 chunk_end = range_end;
1485 chunk_end = range_end;
1488 GST_DEBUG_OBJECT (stream,
1489 "Starting chunked download %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
1490 url, range_start, chunk_end);
1491 return gst_adaptive_demux2_stream_begin_download_uri (demux, stream, url,
1492 range_start, chunk_end);
1495 /* regular single chunk download */
1496 stream->fragment.chunk_size = 0;
1498 return gst_adaptive_demux2_stream_begin_download_uri (demux, stream, url,
1499 stream->fragment.range_start, stream->fragment.range_end);
1503 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1504 (_("Failed to get fragment URL.")),
1505 ("An error happened when getting fragment URL"));
1506 return GST_FLOW_ERROR;
1511 gst_adaptive_demux2_stream_push_event (GstAdaptiveDemux2Stream * stream,
1514 gboolean ret = TRUE;
1517 /* If there's a parsebin, push the event through it */
1518 if (stream->parsebin_sink != NULL) {
1519 pad = gst_object_ref (stream->parsebin_sink);
1520 GST_DEBUG_OBJECT (pad, "Pushing event %" GST_PTR_FORMAT, event);
1521 ret = gst_pad_send_event (pad, gst_event_ref (event));
1522 gst_object_unref (pad);
1525 /* If the event is EOS, ensure that all tracks are EOS. This catches
1526 * the case where the parsebin hasn't parsed anything yet (we switched
1527 * to a never before used track right near EOS, or it didn't parse enough
1528 * to create pads and be able to send EOS through to the tracks.
1530 * We don't need to care about any other events
1532 if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
1535 for (iter = stream->tracks; iter; iter = iter->next) {
1536 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
1537 ret &= gst_pad_send_event (track->sinkpad, gst_event_ref (event));
1541 gst_event_unref (event);
1546 gst_adaptive_demux2_stream_error (GstAdaptiveDemux2Stream * stream)
1548 GstAdaptiveDemux *demux = stream->demux;
1550 GstStructure *details;
1552 details = gst_structure_new_empty ("details");
1553 gst_structure_set (details, "http-status-code", G_TYPE_UINT,
1554 stream->last_status_code, NULL);
1556 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_ERRORED;
1558 if (stream->last_error) {
1559 gchar *debug = g_strdup_printf ("Error on stream %s",
1560 GST_OBJECT_NAME (stream));
1562 gst_message_new_error_with_details (GST_OBJECT_CAST (demux),
1563 stream->last_error, debug, details);
1564 GST_ERROR_OBJECT (stream, "Download error: %s",
1565 stream->last_error->message);
1568 GError *err = g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_NOT_FOUND,
1569 _("Couldn't download fragments"));
1571 gst_message_new_error_with_details (GST_OBJECT_CAST (demux), err,
1572 "Fragment downloading has failed consecutive times", details);
1574 GST_ERROR_OBJECT (stream,
1575 "Download error: Couldn't download fragments, too many failures");
1578 gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
1581 /* Called when a stream reaches the end of a playback segment */
1583 gst_adaptive_demux2_stream_end_of_manifest (GstAdaptiveDemux2Stream * stream)
1585 GstAdaptiveDemux *demux = stream->demux;
1586 GstFlowReturn combined =
1587 gst_adaptive_demux_period_combine_stream_flows (demux->input_period);
1589 GST_DEBUG_OBJECT (stream, "Combined flow %s", gst_flow_get_name (combined));
1591 if (gst_adaptive_demux_has_next_period (demux)) {
1592 if (combined == GST_FLOW_EOS) {
1593 GST_DEBUG_OBJECT (stream, "Next period available, advancing");
1594 gst_adaptive_demux_advance_period (demux);
1596 /* Ensure the 'has_next_period' flag is set on the period before
1597 * pushing EOS to the stream, so that the output loop knows not
1598 * to actually output the event */
1599 GST_DEBUG_OBJECT (stream, "Marking current period has a next one");
1600 demux->input_period->has_next_period = TRUE;
1604 if (demux->priv->outputs) {
1605 GstEvent *eos = gst_event_new_eos ();
1607 GST_DEBUG_OBJECT (stream, "Stream is EOS. Stopping.");
1608 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS;
1610 gst_event_set_seqnum (eos, stream->demux->priv->segment_seqnum);
1611 gst_adaptive_demux2_stream_push_event (stream, eos);
1613 GST_ERROR_OBJECT (demux, "Can't push EOS on non-exposed pad");
1614 gst_adaptive_demux2_stream_error (stream);
1619 gst_adaptive_demux2_stream_reload_manifest_cb (GstAdaptiveDemux2Stream * stream)
1621 GstAdaptiveDemux *demux = stream->demux;
1623 gboolean is_live = gst_adaptive_demux_is_live (demux);
1625 stream->pending_cb_id = 0;
1627 /* Refetch the playlist now after we waited */
1628 /* FIXME: Make this manifest update async and handle it on completion */
1629 if (!is_live && gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
1630 GST_DEBUG_OBJECT (demux, "Updated the playlist");
1633 /* We were called here from a timeout, so if the load function wants to loop
1634 * again, schedule an immediate callback but return G_SOURCE_REMOVE either
1636 while (gst_adaptive_demux2_stream_next_download (stream));
1638 return G_SOURCE_REMOVE;
1642 gst_adaptive_demux2_stream_on_output_space_available_cb (GstAdaptiveDemux2Stream
1645 /* If the state already moved on, the stream was stopped, or another track
1646 * already woke up and needed data */
1647 if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE)
1648 return G_SOURCE_REMOVE;
1650 GstAdaptiveDemux *demux = stream->demux;
1651 TRACKS_LOCK (demux);
1654 for (iter = stream->tracks; iter; iter = iter->next) {
1655 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
1657 /* We need to recompute the track's level_time value, as the
1658 * global output position may have advanced and reduced the
1659 * value, even without anything being dequeued yet */
1660 gst_adaptive_demux_track_update_level_locked (track);
1662 GST_DEBUG_OBJECT (stream, "track %s woken level %" GST_TIME_FORMAT
1663 " input position %" GST_TIME_FORMAT " at %" GST_TIME_FORMAT,
1664 track->stream_id, GST_TIME_ARGS (track->level_time),
1665 GST_TIME_ARGS (track->input_time),
1666 GST_TIME_ARGS (demux->priv->global_output_position));
1668 TRACKS_UNLOCK (demux);
1670 while (gst_adaptive_demux2_stream_load_a_fragment (stream));
1672 return G_SOURCE_REMOVE;
1676 gst_adaptive_demux2_stream_on_output_space_available (GstAdaptiveDemux2Stream *
1679 GstAdaptiveDemux *demux = stream->demux;
1681 stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
1683 GST_LOG_OBJECT (stream, "Scheduling output_space_available() call");
1685 gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
1686 (GSourceFunc) gst_adaptive_demux2_stream_on_output_space_available_cb,
1687 gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1691 gst_adaptive_demux2_stream_on_manifest_update (GstAdaptiveDemux2Stream * stream)
1693 GstAdaptiveDemux *demux = stream->demux;
1695 if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE)
1698 g_assert (stream->pending_cb_id == 0);
1700 GST_LOG_OBJECT (stream, "Scheduling load_a_fragment() call");
1701 stream->pending_cb_id =
1702 gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
1703 (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1704 gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1708 gst_adaptive_demux2_stream_handle_playlist_eos (GstAdaptiveDemux2Stream *
1711 GstAdaptiveDemux *demux = stream->demux;
1713 if (gst_adaptive_demux_is_live (demux) && (demux->segment.rate == 1.0
1714 || gst_adaptive_demux2_stream_in_live_seek_range (demux, stream))) {
1716 if (!gst_adaptive_demux_has_next_period (demux)) {
1717 /* Wait only if we can ensure current manifest has been expired.
1718 * The meaning "we have next period" *WITH* EOS is that, current
1719 * period has been ended but we can continue to the next period */
1720 GST_DEBUG_OBJECT (stream,
1721 "Live playlist EOS - waiting for manifest update");
1722 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE;
1723 /* Clear the stream last_ret EOS state, since we're not actually EOS */
1724 if (stream->last_ret == GST_FLOW_EOS)
1725 stream->last_ret = GST_FLOW_OK;
1726 gst_adaptive_demux2_stream_wants_manifest_update (demux);
1730 if (stream->replaced)
1734 gst_adaptive_demux2_stream_end_of_manifest (stream);
1738 gst_adaptive_demux2_stream_load_a_fragment (GstAdaptiveDemux2Stream * stream)
1740 GstAdaptiveDemux *demux = stream->demux;
1741 gboolean live = gst_adaptive_demux_is_live (demux);
1742 GstFlowReturn ret = GST_FLOW_OK;
1744 stream->pending_cb_id = 0;
1746 GST_LOG_OBJECT (stream, "entering, state = %d.", stream->state);
1748 switch (stream->state) {
1749 case GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART:
1750 case GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT:
1751 case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_LIVE:
1752 case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE:
1753 case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE:
1754 /* Get information about the fragment to download */
1755 GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1756 ret = gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
1757 GST_DEBUG_OBJECT (stream,
1758 "Fragment info update result: %d %s", ret, gst_flow_get_name (ret));
1760 if (ret == GST_FLOW_OK)
1761 stream->starting_fragment = TRUE;
1763 case GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING:
1765 case GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS:
1766 GST_ERROR_OBJECT (stream,
1767 "Unexpected stream state EOS. The stream should not be running now.");
1770 GST_ERROR_OBJECT (stream, "Unexpected stream state %d", stream->state);
1771 g_assert_not_reached ();
1775 if (ret == GST_FLOW_OK) {
1776 /* Wait for room in the output tracks */
1777 if (gst_adaptive_demux2_stream_wait_for_output_space (demux, stream,
1778 stream->fragment.duration)) {
1779 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE;
1784 if (ret == GST_FLOW_OK) {
1785 /* wait for live fragments to be available */
1787 GstClockTime wait_time =
1788 gst_adaptive_demux2_stream_get_fragment_waiting_time (demux, stream);
1789 if (wait_time > 0) {
1790 GST_DEBUG_OBJECT (stream,
1791 "Download waiting for %" GST_TIME_FORMAT,
1792 GST_TIME_ARGS (wait_time));
1794 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_LIVE;
1796 GST_LOG_OBJECT (stream, "Scheduling delayed load_a_fragment() call");
1797 g_assert (stream->pending_cb_id == 0);
1798 stream->pending_cb_id =
1799 gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1800 wait_time, (GSourceFunc) gst_adaptive_demux2_stream_load_a_fragment,
1801 gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1806 if (gst_adaptive_demux2_stream_download_fragment (stream) != GST_FLOW_OK) {
1807 GST_ERROR_OBJECT (demux,
1808 "Failed to begin fragment download for stream %p", stream);
1813 /* Cast to int avoids a compiler warning that
1814 * GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC is not in the GstFlowReturn enum */
1815 switch ((int) ret) {
1817 break; /* all is good, let's go */
1819 GST_DEBUG_OBJECT (stream, "EOS, checking to stop download loop");
1820 stream->last_ret = ret;
1821 gst_adaptive_demux2_stream_handle_playlist_eos (stream);
1823 case GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC:
1824 GST_DEBUG_OBJECT (stream, "Lost sync, asking reset to current position");
1825 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
1826 gst_adaptive_demux_handle_lost_sync (demux);
1828 case GST_FLOW_NOT_LINKED:
1830 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS;
1832 if (gst_adaptive_demux_period_combine_stream_flows (demux->input_period)
1833 == GST_FLOW_NOT_LINKED) {
1834 GST_ELEMENT_FLOW_ERROR (demux, ret);
1839 case GST_FLOW_FLUSHING:
1840 /* Flushing is normal, the target track might have been unselected */
1841 if (G_UNLIKELY (stream->cancelled))
1845 if (ret <= GST_FLOW_ERROR) {
1846 GST_WARNING_OBJECT (demux, "Error while downloading fragment");
1847 if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
1848 gst_adaptive_demux2_stream_error (stream);
1852 g_clear_error (&stream->last_error);
1854 /* First try to update the playlist for non-live playlists
1855 * in case the URIs have changed in the meantime. But only
1856 * try it the first time, after that we're going to wait a
1857 * a bit to not flood the server */
1858 if (stream->download_error_count == 1
1859 && !gst_adaptive_demux_is_live (demux)) {
1860 /* TODO hlsdemux had more options to this function (boolean and err) */
1861 if (gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
1862 /* Retry immediately, the playlist actually has changed */
1863 GST_DEBUG_OBJECT (demux, "Updated the playlist");
1868 /* Wait half the fragment duration before retrying */
1869 GST_LOG_OBJECT (stream, "Scheduling delayed reload_manifest_cb() call");
1870 g_assert (stream->pending_cb_id == 0);
1871 stream->pending_cb_id =
1872 gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
1873 stream->fragment.duration / 2,
1874 (GSourceFunc) gst_adaptive_demux2_stream_reload_manifest_cb,
1875 gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
1885 gst_adaptive_demux2_stream_next_download (GstAdaptiveDemux2Stream * stream)
1887 GstAdaptiveDemux *demux = stream->demux;
1888 gboolean end_of_manifest = FALSE;
1890 GST_LOG_OBJECT (stream, "Looking for next download");
1892 /* Restarting download, figure out new position
1893 * FIXME : Move this to a separate function ? */
1894 if (G_UNLIKELY (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART)) {
1895 GstClockTimeDiff stream_time = 0;
1897 GST_DEBUG_OBJECT (stream, "Activating stream after restart");
1899 if (stream->parsebin_sink != NULL) {
1900 /* If the parsebin already exists, we need to clear it out (if it doesn't,
1901 * this is the first time we've used this stream, so it's all good) */
1902 gst_adaptive_demux2_stream_push_event (stream,
1903 gst_event_new_flush_start ());
1904 gst_adaptive_demux2_stream_push_event (stream,
1905 gst_event_new_flush_stop (FALSE));
1908 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1909 stream_time = stream->start_position;
1911 GST_DEBUG_OBJECT (stream, "Restarting stream at "
1912 "stream position %" GST_STIME_FORMAT, GST_STIME_ARGS (stream_time));
1914 if (GST_CLOCK_STIME_IS_VALID (stream_time)) {
1915 /* TODO check return */
1916 gst_adaptive_demux2_stream_seek (demux, stream, demux->segment.rate >= 0,
1917 0, stream_time, &stream_time);
1918 stream->current_position = stream->start_position;
1920 GST_DEBUG_OBJECT (stream,
1921 "stream_time after restart seek: %" GST_STIME_FORMAT
1922 " position %" GST_STIME_FORMAT, GST_STIME_ARGS (stream_time),
1923 GST_STIME_ARGS (stream->current_position));
1926 /* Trigger (re)computation of the parsebin input segment */
1927 stream->compute_segment = TRUE;
1929 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1931 stream->discont = TRUE;
1932 stream->need_header = TRUE;
1933 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
1936 /* Check if we're done with our segment */
1937 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1938 if (demux->segment.rate > 0) {
1939 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop)
1940 && stream->current_position >= demux->segment.stop) {
1941 end_of_manifest = TRUE;
1944 if (GST_CLOCK_TIME_IS_VALID (demux->segment.start)
1945 && stream->current_position <= demux->segment.start) {
1946 end_of_manifest = TRUE;
1949 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1951 if (end_of_manifest) {
1952 gst_adaptive_demux2_stream_end_of_manifest (stream);
1955 return gst_adaptive_demux2_stream_load_a_fragment (stream);
1959 gst_adaptive_demux2_stream_can_start (GstAdaptiveDemux2Stream * stream)
1961 GstAdaptiveDemux *demux = stream->demux;
1962 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1964 if (!klass->stream_can_start)
1966 return klass->stream_can_start (demux, stream);
1970 * gst_adaptive_demux2_stream_start:
1971 * @stream: a #GstAdaptiveDemux2Stream
1973 * Start the given @stream. Should be called by subclasses that previously
1974 * returned %FALSE in `GstAdaptiveDemux::stream_can_start()`
1977 gst_adaptive_demux2_stream_start (GstAdaptiveDemux2Stream * stream)
1979 GstAdaptiveDemux *demux;
1981 g_return_if_fail (stream && stream->demux);
1983 demux = stream->demux;
1985 if (stream->pending_cb_id != 0 || stream->download_active) {
1986 /* There is already something active / pending on this stream */
1987 GST_LOG_OBJECT (stream, "Stream already running");
1991 /* Some streams require a delayed start, i.e. they need more information
1992 * before they can actually be started */
1993 if (!gst_adaptive_demux2_stream_can_start (stream)) {
1994 GST_LOG_OBJECT (stream, "Stream will be started asynchronously");
1998 if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS) {
1999 GST_LOG_OBJECT (stream, "Stream is EOS already");
2003 if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED ||
2004 stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART) {
2005 GST_LOG_OBJECT (stream, "Activating stream. Current state %d",
2007 stream->cancelled = FALSE;
2008 stream->replaced = FALSE;
2009 stream->last_ret = GST_FLOW_OK;
2011 if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)
2012 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_START_FRAGMENT;
2015 GST_LOG_OBJECT (stream, "Scheduling next_download() call");
2016 stream->pending_cb_id =
2017 gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2018 (GSourceFunc) gst_adaptive_demux2_stream_next_download,
2019 gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
2023 gst_adaptive_demux2_stream_stop (GstAdaptiveDemux2Stream * stream)
2025 GstAdaptiveDemux *demux = stream->demux;
2027 GST_DEBUG_OBJECT (stream, "Stopping stream (from state %d)", stream->state);
2028 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
2030 if (stream->pending_cb_id != 0) {
2031 gst_adaptive_demux_loop_cancel_call (demux->priv->scheduler_task,
2032 stream->pending_cb_id);
2033 stream->pending_cb_id = 0;
2036 /* Cancel and drop the existing download request */
2037 downloadhelper_cancel_request (demux->download_helper,
2038 stream->download_request);
2039 download_request_unref (stream->download_request);
2040 stream->downloading_header = stream->downloading_index = FALSE;
2041 stream->download_request = download_request_new ();
2042 stream->download_active = FALSE;
2044 stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
2048 gst_adaptive_demux2_stream_is_running (GstAdaptiveDemux2Stream * stream)
2050 if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)
2052 if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART)
2054 if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_EOS)
2060 gst_adaptive_demux2_stream_is_selected_locked (GstAdaptiveDemux2Stream * stream)
2064 for (tmp = stream->tracks; tmp; tmp = tmp->next) {
2065 GstAdaptiveDemuxTrack *track = tmp->data;
2066 if (track->selected)
2074 * gst_adaptive_demux2_stream_is_selected:
2075 * @stream: A #GstAdaptiveDemux2Stream
2077 * Returns: %TRUE if any of the tracks targetted by @stream is selected
2080 gst_adaptive_demux2_stream_is_selected (GstAdaptiveDemux2Stream * stream)
2084 g_return_val_if_fail (stream && stream->demux, FALSE);
2086 TRACKS_LOCK (stream->demux);
2087 ret = gst_adaptive_demux2_stream_is_selected_locked (stream);
2088 TRACKS_UNLOCK (stream->demux);