#include "config.h"
#endif
-#include "gstadaptivedemux.h"
+#include "gstadaptivedemux-stream.h"
#include "gstadaptivedemux-private.h"
#include <glib/gi18n-lib.h>
static void gst_adaptive_demux2_stream_finalize (GObject * object);
static void gst_adaptive_demux2_stream_error (GstAdaptiveDemux2Stream * stream);
+static GstFlowReturn
+gst_adaptive_demux2_stream_data_received_default (GstAdaptiveDemux2Stream *
+ stream, GstBuffer * buffer);
+static GstFlowReturn
+gst_adaptive_demux2_stream_finish_fragment_default (GstAdaptiveDemux2Stream *
+ stream);
+
+guint64
+gst_adaptive_demux2_stream_update_current_bitrate (GstAdaptiveDemux2Stream *
+ stream);
+static void gst_adaptive_demux2_stream_update_track_ids (GstAdaptiveDemux2Stream
+ * stream);
#define gst_adaptive_demux2_stream_parent_class parent_class
G_DEFINE_ABSTRACT_TYPE (GstAdaptiveDemux2Stream, gst_adaptive_demux2_stream,
GObjectClass *gobject_class = (GObjectClass *) klass;
gobject_class->finalize = gst_adaptive_demux2_stream_finalize;
+
+ klass->data_received = gst_adaptive_demux2_stream_data_received_default;
+ klass->finish_fragment = gst_adaptive_demux2_stream_finish_fragment_default;
}
static GType tsdemux_type = 0;
stream->download_request = download_request_new ();
stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
stream->last_ret = GST_FLOW_OK;
+ stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
stream->fragment_bitrates =
g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS);
if (stream->download_request)
download_request_unref (stream->download_request);
- stream->cancelled = TRUE;
g_clear_error (&stream->last_error);
gst_adaptive_demux2_stream_fragment_clear (&stream->fragment);
if (stream->pending_caps)
gst_caps_unref (stream->pending_caps);
- g_clear_pointer (&stream->pending_tags, gst_tag_list_unref);
+ gst_clear_tag_list (&stream->pending_tags);
g_clear_pointer (&stream->stream_collection, gst_object_unref);
G_OBJECT_CLASS (parent_class)->finalize (object);
return FALSE;
}
+ if (stream->demux->buffering_low_watermark_time)
+ track->buffering_threshold = stream->demux->buffering_low_watermark_time;
+ else if (GST_CLOCK_TIME_IS_VALID (stream->recommended_buffering_threshold))
+ track->buffering_threshold =
+ MIN (10 * GST_SECOND, stream->recommended_buffering_threshold);
+ else {
+ /* Using a starting default, can be overriden later in
+ * ::update_stream_info() */
+ GST_DEBUG_OBJECT (stream,
+ "Setting default 10s buffering threshold on new track");
+ track->buffering_threshold = 10 * GST_SECOND;
+ }
+
stream->tracks =
g_list_append (stream->tracks, gst_adaptive_demux_track_ref (track));
if (stream->demux) {
}
static void
-drain_inactive_tracks (GstAdaptiveDemux * demux,
- GstAdaptiveDemux2Stream * stream)
+drain_inactive_tracks (GstAdaptiveDemux2Stream * stream)
{
GList *iter;
+ GstAdaptiveDemux *demux = stream->demux;
TRACKS_LOCK (demux);
for (iter = stream->tracks; iter; iter = iter->next) {
gst_adaptive_demux2_stream_finish_download (GstAdaptiveDemux2Stream *
stream, GstFlowReturn ret, GError * err)
{
- GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux);
- GstAdaptiveDemux *demux = stream->demux;
+ GstAdaptiveDemux2StreamClass *klass =
+ GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
GST_DEBUG_OBJECT (stream,
"%s download finish: %d %s - err: %p", uritype (stream), ret,
}
/* Handle all the possible flow returns here: */
- if (ret == GST_ADAPTIVE_DEMUX_FLOW_END_OF_FRAGMENT) {
+ if (ret == GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC) {
+ /* We lost sync, seek back to live and return */
+ GST_WARNING_OBJECT (stream, "Lost sync when downloading");
+ gst_adaptive_demux_handle_lost_sync (stream->demux);
+ return;
+ } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_END_OF_FRAGMENT) {
/* The sub-class wants to stop the fragment immediately */
stream->fragment.finished = TRUE;
- ret = klass->finish_fragment (demux, stream);
+ ret = klass->finish_fragment (stream);
GST_DEBUG_OBJECT (stream, "finish_fragment ret %d %s", ret,
gst_flow_get_name (ret));
|| !klass->need_another_chunk (stream)
|| stream->fragment.chunk_size == 0) {
stream->fragment.finished = TRUE;
- ret = klass->finish_fragment (stream->demux, stream);
+ ret = klass->finish_fragment (stream);
GST_DEBUG_OBJECT (stream, "finish_fragment ret %d %s", ret,
gst_flow_get_name (ret));
/* For HLS, we might be enqueueing data into tracks that aren't
* selected. Drain those ones out */
- drain_inactive_tracks (stream->demux, stream);
+ drain_inactive_tracks (stream);
/* Now that we've called finish_fragment we can clear these flags the
* sub-class might have checked */
GST_LOG_OBJECT (stream, "Scheduling next_download() call");
stream->pending_cb_id =
- gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
+ gst_adaptive_demux_loop_call (stream->demux->priv->scheduler_task,
(GSourceFunc) gst_adaptive_demux2_stream_next_download,
gst_object_ref (stream), (GDestroyNotify) gst_object_unref);
}
}
static void
-gst_adaptive_demux2_stream_prepare_segment (GstAdaptiveDemux * demux,
- GstAdaptiveDemux2Stream * stream, gboolean first_and_live)
+gst_adaptive_demux2_stream_prepare_segment (GstAdaptiveDemux2Stream * stream,
+ gboolean first_and_live)
{
+ GstAdaptiveDemux *demux = stream->demux;
GstClockTime period_start = gst_adaptive_demux_get_period_start_time (demux);
GstClockTime offset =
- gst_adaptive_demux2_stream_get_presentation_offset (demux, stream);
+ gst_adaptive_demux2_stream_get_presentation_offset (stream);
+ /* FIXME: Add a helper function to retrieve the demuxer segment
+ * using the SEGMENT_LOCK */
stream->parse_segment = demux->segment;
/* The demuxer segment is just built from seek events, but for each stream
* the segment time and base as calculated by the second case would be
* equivalent.
*/
- GST_DEBUG_OBJECT (demux, "Using demux segment %" GST_SEGMENT_FORMAT,
- &demux->segment);
+ GST_DEBUG_OBJECT (stream, "Using demux segment %" GST_SEGMENT_FORMAT,
+ &stream->parse_segment);
+
GST_DEBUG_OBJECT (demux,
"period_start: %" GST_TIME_FORMAT " offset: %" GST_TIME_FORMAT,
GST_TIME_ARGS (period_start), GST_TIME_ARGS (offset));
if (GST_CLOCK_STIME_IS_VALID (pos)) {
GstClockTime offset =
- gst_adaptive_demux2_stream_get_presentation_offset (demux, stream);
+ gst_adaptive_demux2_stream_get_presentation_offset (stream);
pos += offset;
GList *pending_events = NULL;
if (stream->compute_segment) {
- gst_adaptive_demux2_stream_prepare_segment (demux, stream,
- stream->first_and_live);
+ gst_adaptive_demux2_stream_prepare_segment (stream, stream->first_and_live);
stream->compute_segment = FALSE;
stream->first_and_live = FALSE;
}
gst_pad_send_event (stream->parsebin_sink, buffer_gap);
}
- if (G_UNLIKELY (stream->cancelled)) {
+ if (G_UNLIKELY (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)) {
GST_LOG_OBJECT (demux, "Stream was cancelled");
return GST_FLOW_FLUSHING;
}
GstBuffer * buffer)
{
GstAdaptiveDemux *demux = stream->demux;
- GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
+ GstAdaptiveDemux2StreamClass *klass =
+ GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
GstFlowReturn ret = GST_FLOW_OK;
/* do not make any changes if the stream is cancelled */
- if (G_UNLIKELY (stream->cancelled)) {
+ if (G_UNLIKELY (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)) {
+ GST_DEBUG_OBJECT (stream, "Stream was stopped. Aborting");
gst_buffer_unref (buffer);
return GST_FLOW_FLUSHING;
}
* including the *actual* fragment ! */
if (stream->starting_fragment) {
stream->starting_fragment = FALSE;
- if (klass->start_fragment != NULL && !klass->start_fragment (demux, stream))
+ if (klass->start_fragment != NULL && !klass->start_fragment (stream))
return GST_FLOW_ERROR;
}
"Received %s buffer of size %" G_GSIZE_FORMAT, uritype (stream),
gst_buffer_get_size (buffer));
- ret = klass->data_received (demux, stream, buffer);
+ ret = klass->data_received (stream, buffer);
if (ret != GST_FLOW_OK) {
GST_DEBUG_OBJECT (stream, "data_received returned %s",
if (ret == GST_FLOW_FLUSHING) {
/* do not make any changes if the stream is cancelled */
- if (G_UNLIKELY (stream->cancelled)) {
+ if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED) {
+ GST_DEBUG_OBJECT (stream, "Stream was stopped. Aborting");
return ret;
}
}
GST_DEBUG_OBJECT (stream, "Pushing EOS to parser");
/* TODO push this on all pads */
- gst_event_set_seqnum (eos, stream->demux->priv->segment_seqnum);
+ gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
gst_pad_send_event (stream->parsebin_sink, eos);
ret = GST_FLOW_ERROR;
*/
static void
calculate_track_thresholds (GstAdaptiveDemux * demux,
+ GstAdaptiveDemux2Stream * stream,
GstClockTime fragment_duration, GstClockTime * low_threshold,
GstClockTime * high_threshold)
{
*low_threshold = demux->buffering_low_watermark_time;
}
+ if (*low_threshold == 0) {
+ /* This implies both low level properties were 0, the default is 10s unless
+ * the subclass has specified a recommended buffering threshold */
+ *low_threshold = 10 * GST_SECOND;
+ if (GST_CLOCK_TIME_IS_VALID (stream->recommended_buffering_threshold))
+ *low_threshold =
+ MIN (stream->recommended_buffering_threshold, *low_threshold);
+ }
+
*high_threshold =
demux->buffering_high_watermark_fragments * fragment_duration;
if (*high_threshold == 0 || (demux->buffering_high_watermark_time != 0
(*low_threshold != 0 && *low_threshold > *high_threshold)) {
*high_threshold = *low_threshold;
}
+
GST_OBJECT_UNLOCK (demux);
}
+#define ABSDIFF(a,b) ((a) < (b) ? (b) - (a) : (a) - (b))
static gboolean
gst_adaptive_demux2_stream_wait_for_output_space (GstAdaptiveDemux * demux,
GstAdaptiveDemux2Stream * stream, GstClockTime fragment_duration)
GstClockTime low_threshold = 0, high_threshold = 0;
GList *iter;
- calculate_track_thresholds (demux, fragment_duration,
+ calculate_track_thresholds (demux, stream, fragment_duration,
&low_threshold, &high_threshold);
+ GST_DEBUG_OBJECT (stream,
+ "Thresholds low:%" GST_TIME_FORMAT " high:%" GST_TIME_FORMAT
+ " recommended:%" GST_TIME_FORMAT, GST_TIME_ARGS (low_threshold),
+ GST_TIME_ARGS (high_threshold),
+ GST_TIME_ARGS (stream->recommended_buffering_threshold));
/* If there are no tracks at all, don't wait. If there are no active
* tracks, keep filling until at least one track is full. If there
for (iter = stream->tracks; iter; iter = iter->next) {
GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
- /* Update the buffering threshold */
- if (low_threshold != track->buffering_threshold) {
+ /* Update the buffering threshold if it changed by more than a second */
+ if (ABSDIFF (low_threshold, track->buffering_threshold) > GST_SECOND) {
+ GST_DEBUG_OBJECT (stream, "Updating threshold");
/* The buffering threshold for this track changed, make sure to
* re-check buffering status */
update_buffering = TRUE;
if (track->level_time < high_threshold) {
if (track->active) {
need_to_wait = FALSE;
- GST_DEBUG_OBJECT (demux,
- "stream %p track %s has level %" GST_TIME_FORMAT
+ GST_DEBUG_OBJECT (stream,
+ "track %s has level %" GST_TIME_FORMAT
" - needs more data (target %" GST_TIME_FORMAT
- ") (fragment duration %" GST_TIME_FORMAT ")", stream,
+ ") (fragment duration %" GST_TIME_FORMAT ")",
track->stream_id, GST_TIME_ARGS (track->level_time),
GST_TIME_ARGS (high_threshold), GST_TIME_ARGS (fragment_duration));
continue;
have_filled_inactive = TRUE;
}
- GST_DEBUG_OBJECT (demux,
- "stream %p track %s active (%d) has level %" GST_TIME_FORMAT,
- stream, track->stream_id, track->active,
- GST_TIME_ARGS (track->level_time));
+ GST_DEBUG_OBJECT (stream,
+ "track %s active (%d) has level %" GST_TIME_FORMAT,
+ track->stream_id, track->active, GST_TIME_ARGS (track->level_time));
}
/* If there are no tracks, don't wait (we might need data to create them),
* or if there are active tracks that need more data to hit the threshold,
* don't wait. Otherwise it means all active tracks are full and we should wait */
if (!have_any_tracks) {
- GST_DEBUG_OBJECT (demux, "stream %p has no tracks - not waiting", stream);
+ GST_DEBUG_OBJECT (stream, "no tracks created yet - not waiting");
need_to_wait = FALSE;
} else if (!have_active_tracks && !have_filled_inactive) {
- GST_DEBUG_OBJECT (demux,
- "stream %p has inactive tracks that need more data - not waiting",
- stream);
+ GST_DEBUG_OBJECT (stream,
+ "have only inactive tracks that need more data - not waiting");
need_to_wait = FALSE;
}
if (need_to_wait) {
+ stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
+
for (iter = stream->tracks; iter; iter = iter->next) {
GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
- track->waiting_del_level = high_threshold;
- GST_DEBUG_OBJECT (demux,
- "Waiting for queued data on stream %p track %s to drop below %"
+
+ GST_DEBUG_OBJECT (stream,
+ "Waiting for queued data on track %s to drop below %"
GST_TIME_FORMAT " (fragment duration %" GST_TIME_FORMAT ")",
- stream, track->stream_id, GST_TIME_ARGS (track->waiting_del_level),
+ track->stream_id, GST_TIME_ARGS (high_threshold),
GST_TIME_ARGS (fragment_duration));
+
+ /* we want to get woken up when the global output position reaches
+ * a point where the input is closer than "high_threshold" to needing
+ * output, so we can put more data in */
+ GstClockTimeDiff wakeup_time = track->input_time - high_threshold;
+
+ if (stream->next_input_wakeup_time == GST_CLOCK_STIME_NONE ||
+ wakeup_time < stream->next_input_wakeup_time) {
+ stream->next_input_wakeup_time = wakeup_time;
+
+ GST_DEBUG_OBJECT (stream,
+ "Track %s level %" GST_TIME_FORMAT ". Input at position %"
+ GST_TIME_FORMAT " next wakeup should be %" GST_TIME_FORMAT " now %"
+ GST_TIME_FORMAT, track->stream_id,
+ GST_TIME_ARGS (track->level_time),
+ GST_TIME_ARGS (track->input_time), GST_TIME_ARGS (wakeup_time),
+ GST_TIME_ARGS (demux->priv->global_output_position));
+ }
+ }
+
+ if (stream->next_input_wakeup_time != GST_CLOCK_TIME_NONE) {
+ GST_DEBUG_OBJECT (stream,
+ "Next input wakeup time is now %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (stream->next_input_wakeup_time));
+
+ /* If this stream needs waking up sooner than any other current one,
+ * update the period wakeup time, which is what the output loop
+ * will check */
+ GstAdaptiveDemuxPeriod *period = stream->period;
+ if (period->next_input_wakeup_time == GST_CLOCK_STIME_NONE ||
+ period->next_input_wakeup_time > stream->next_input_wakeup_time) {
+ period->next_input_wakeup_time = stream->next_input_wakeup_time;
+ }
}
}
stream_type = gst_stream_get_stream_type (gst_stream);
GST_DEBUG_OBJECT (pad,
- "Trying to match pad from parsebin with internal streamid %s and caps %"
- GST_PTR_FORMAT, GST_STR_NULL (internal_stream_id),
- gst_stream_get_caps (gst_stream));
+ "Trying to match pad from parsebin with internal streamid %s and stream %"
+ GST_PTR_FORMAT, GST_STR_NULL (internal_stream_id), gst_stream);
/* Try to match directly by the track's pending upstream_stream_id */
for (tmp = stream->tracks; tmp; tmp = tmp->next) {
guint last_status_code = request->status_code;
gboolean live;
+ if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING) {
+ GST_DEBUG_OBJECT (stream, "Stream state changed to %d. Aborting",
+ stream->state);
+ return;
+ }
+
stream->download_active = FALSE;
stream->last_status_code = last_status_code;
|| last_status_code / 100 == 5)) {
/* 4xx/5xx */
/* if current position is before available start, switch to next */
- if (!gst_adaptive_demux2_stream_has_next_fragment (demux, stream))
+ if (!gst_adaptive_demux2_stream_has_next_fragment (stream))
goto flushing;
if (live) {
GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
- ret = gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
+ ret = gst_adaptive_demux2_stream_update_fragment_info (stream);
GST_DEBUG_OBJECT (stream, "update_fragment_info ret: %s",
gst_flow_get_name (ret));
} else if (demux->segment.position > range_stop) {
/* wait a bit to be in range, we don't have any locks at that point */
GstClockTime wait_time =
- gst_adaptive_demux2_stream_get_fragment_waiting_time (demux,
- stream);
+ gst_adaptive_demux2_stream_get_fragment_waiting_time (stream);
if (wait_time > 0) {
GST_DEBUG_OBJECT (stream,
"Download waiting for %" GST_TIME_FORMAT,
gst_adaptive_demux2_stream_handle_playlist_eos (stream);
return;
}
- } else if (!gst_adaptive_demux2_stream_has_next_fragment (demux, stream)) {
+ } else if (!gst_adaptive_demux2_stream_has_next_fragment (stream)) {
/* If this is the last fragment, consider failures EOS and not actual
* errors. Due to rounding errors in the durations, the last fragment
* might not actually exist */
stream->download_active = FALSE;
- if (G_UNLIKELY (stream->cancelled))
+ if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_DOWNLOADING) {
+ GST_DEBUG_OBJECT (stream, "Stream state changed to %d. Aborting",
+ stream->state);
return;
+ }
GST_DEBUG_OBJECT (stream,
"Stream %p %s download for %s is complete with state %d",
gst_adaptive_demux2_stream_download_fragment (GstAdaptiveDemux2Stream * stream)
{
GstAdaptiveDemux *demux = stream->demux;
- GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
+ GstAdaptiveDemux2StreamClass *klass =
+ GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
gchar *url = NULL;
/* FIXME : */
if (stream->state != GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_OUTPUT_SPACE)
return G_SOURCE_REMOVE;
+ GstAdaptiveDemux *demux = stream->demux;
+ TRACKS_LOCK (demux);
+
+ GList *iter;
+ for (iter = stream->tracks; iter; iter = iter->next) {
+ GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
+
+ /* We need to recompute the track's level_time value, as the
+ * global output position may have advanced and reduced the
+ * value, even without anything being dequeued yet */
+ gst_adaptive_demux_track_update_level_locked (track);
+
+ GST_DEBUG_OBJECT (stream, "track %s woken level %" GST_TIME_FORMAT
+ " input position %" GST_TIME_FORMAT " at %" GST_TIME_FORMAT,
+ track->stream_id, GST_TIME_ARGS (track->level_time),
+ GST_TIME_ARGS (track->input_time),
+ GST_TIME_ARGS (demux->priv->global_output_position));
+ }
+ TRACKS_UNLOCK (demux);
+
while (gst_adaptive_demux2_stream_load_a_fragment (stream));
return G_SOURCE_REMOVE;
stream)
{
GstAdaptiveDemux *demux = stream->demux;
- GList *iter;
- for (iter = stream->tracks; iter; iter = iter->next) {
- GstAdaptiveDemuxTrack *tmp_track = (GstAdaptiveDemuxTrack *) iter->data;
- tmp_track->waiting_del_level = 0;
- }
+ stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
GST_LOG_OBJECT (stream, "Scheduling output_space_available() call");
GST_DEBUG_OBJECT (stream,
"Live playlist EOS - waiting for manifest update");
stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE;
+ /* Clear the stream last_ret EOS state, since we're not actually EOS */
+ if (stream->last_ret == GST_FLOW_EOS)
+ stream->last_ret = GST_FLOW_OK;
gst_adaptive_demux2_stream_wants_manifest_update (demux);
return;
}
-
- if (stream->replaced)
- return;
}
gst_adaptive_demux2_stream_end_of_manifest (stream);
case GST_ADAPTIVE_DEMUX2_STREAM_STATE_WAITING_MANIFEST_UPDATE:
/* Get information about the fragment to download */
GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
- ret = gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
+ ret = gst_adaptive_demux2_stream_update_fragment_info (stream);
GST_DEBUG_OBJECT (stream,
"Fragment info update result: %d %s", ret, gst_flow_get_name (ret));
GST_ERROR_OBJECT (stream,
"Unexpected stream state EOS. The stream should not be running now.");
return FALSE;
+ case GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED:
+ /* The stream was stopped. Just finish up */
+ return FALSE;
default:
GST_ERROR_OBJECT (stream, "Unexpected stream state %d", stream->state);
g_assert_not_reached ();
/* wait for live fragments to be available */
if (live) {
GstClockTime wait_time =
- gst_adaptive_demux2_stream_get_fragment_waiting_time (demux, stream);
+ gst_adaptive_demux2_stream_get_fragment_waiting_time (stream);
if (wait_time > 0) {
GST_DEBUG_OBJECT (stream,
"Download waiting for %" GST_TIME_FORMAT,
}
}
- switch (ret) {
+ /* Cast to int avoids a compiler warning that
+ * GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC is not in the GstFlowReturn enum */
+ switch ((int) ret) {
case GST_FLOW_OK:
break; /* all is good, let's go */
case GST_FLOW_EOS:
GST_DEBUG_OBJECT (stream, "EOS, checking to stop download loop");
+ stream->last_ret = ret;
gst_adaptive_demux2_stream_handle_playlist_eos (stream);
return FALSE;
case GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC:
GST_DEBUG_OBJECT (stream, "Lost sync, asking reset to current position");
stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED;
- gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
- (GSourceFunc) gst_adaptive_demux_handle_lost_sync, demux, NULL);
+ gst_adaptive_demux_handle_lost_sync (demux);
return FALSE;
case GST_FLOW_NOT_LINKED:
{
case GST_FLOW_FLUSHING:
/* Flushing is normal, the target track might have been unselected */
- if (G_UNLIKELY (stream->cancelled))
- return FALSE;
-
+ GST_DEBUG_OBJECT (stream, "Got flushing return. Stopping callback.");
+ return FALSE;
default:
if (ret <= GST_FLOW_ERROR) {
GST_WARNING_OBJECT (demux, "Error while downloading fragment");
if (GST_CLOCK_STIME_IS_VALID (stream_time)) {
/* TODO check return */
- gst_adaptive_demux2_stream_seek (demux, stream, demux->segment.rate >= 0,
+ gst_adaptive_demux2_stream_seek (stream, demux->segment.rate >= 0,
0, stream_time, &stream_time);
stream->current_position = stream->start_position;
static gboolean
gst_adaptive_demux2_stream_can_start (GstAdaptiveDemux2Stream * stream)
{
- GstAdaptiveDemux *demux = stream->demux;
- GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
+ GstAdaptiveDemux2StreamClass *klass =
+ GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
- if (!klass->stream_can_start)
+ if (!klass->can_start)
return TRUE;
- return klass->stream_can_start (demux, stream);
+ return klass->can_start (stream);
}
/**
stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART) {
GST_LOG_OBJECT (stream, "Activating stream. Current state %d",
stream->state);
- stream->cancelled = FALSE;
- stream->replaced = FALSE;
stream->last_ret = GST_FLOW_OK;
if (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED)
stream->downloading_header = stream->downloading_index = FALSE;
stream->download_request = download_request_new ();
stream->download_active = FALSE;
+
+ stream->next_input_wakeup_time = GST_CLOCK_STIME_NONE;
}
gboolean
return ret;
}
+
+/* Called from the scheduler task */
+GstClockTime
+gst_adaptive_demux2_stream_get_presentation_offset (GstAdaptiveDemux2Stream *
+ stream)
+{
+ GstAdaptiveDemux2StreamClass *klass =
+ GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
+
+ if (klass->get_presentation_offset == NULL)
+ return 0;
+
+ return klass->get_presentation_offset (stream);
+}
+
+GstFlowReturn
+gst_adaptive_demux2_stream_update_fragment_info (GstAdaptiveDemux2Stream *
+ stream)
+{
+ GstAdaptiveDemux2StreamClass *klass =
+ GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
+ GstFlowReturn ret;
+
+ g_return_val_if_fail (klass->update_fragment_info != NULL, GST_FLOW_ERROR);
+
+ /* Make sure the sub-class will update bitrate, or else
+ * we will later */
+ stream->fragment.finished = FALSE;
+
+ GST_LOG_OBJECT (stream, "position %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (stream->current_position));
+
+ ret = klass->update_fragment_info (stream);
+
+ GST_LOG_OBJECT (stream, "ret:%s uri:%s",
+ gst_flow_get_name (ret), stream->fragment.uri);
+ if (ret == GST_FLOW_OK) {
+ GST_LOG_OBJECT (stream,
+ "stream_time %" GST_STIME_FORMAT " duration:%" GST_TIME_FORMAT,
+ GST_STIME_ARGS (stream->fragment.stream_time),
+ GST_TIME_ARGS (stream->fragment.duration));
+ GST_LOG_OBJECT (stream,
+ "range start:%" G_GINT64_FORMAT " end:%" G_GINT64_FORMAT,
+ stream->fragment.range_start, stream->fragment.range_end);
+ }
+
+ return ret;
+}
+
+static GstFlowReturn
+gst_adaptive_demux2_stream_data_received_default (GstAdaptiveDemux2Stream *
+ stream, GstBuffer * buffer)
+{
+ return gst_adaptive_demux2_stream_push_buffer (stream, buffer);
+}
+
+static GstFlowReturn
+gst_adaptive_demux2_stream_finish_fragment_default (GstAdaptiveDemux2Stream *
+ stream)
+{
+ /* No need to advance, this isn't a real fragment */
+ if (G_UNLIKELY (stream->downloading_header || stream->downloading_index))
+ return GST_FLOW_OK;
+
+ return gst_adaptive_demux2_stream_advance_fragment (stream,
+ stream->fragment.duration);
+}
+
+/* must be called from the scheduler */
+gboolean
+gst_adaptive_demux2_stream_has_next_fragment (GstAdaptiveDemux2Stream * stream)
+{
+ GstAdaptiveDemux2StreamClass *klass =
+ GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
+ gboolean ret = TRUE;
+
+ if (klass->has_next_fragment)
+ ret = klass->has_next_fragment (stream);
+
+ return ret;
+}
+
+/* must be called from the scheduler */
+GstFlowReturn
+gst_adaptive_demux2_stream_seek (GstAdaptiveDemux2Stream * stream,
+ gboolean forward, GstSeekFlags flags,
+ GstClockTimeDiff ts, GstClockTimeDiff * final_ts)
+{
+ GstAdaptiveDemux2StreamClass *klass =
+ GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
+
+ if (klass->stream_seek)
+ return klass->stream_seek (stream, forward, flags, ts, final_ts);
+ return GST_FLOW_ERROR;
+}
+
+static gboolean
+gst_adaptive_demux2_stream_select_bitrate (GstAdaptiveDemux *
+ demux, GstAdaptiveDemux2Stream * stream, guint64 bitrate)
+{
+ GstAdaptiveDemux2StreamClass *klass =
+ GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
+
+ if (klass->select_bitrate)
+ return klass->select_bitrate (stream, bitrate);
+ return FALSE;
+}
+
+GstClockTime
+gst_adaptive_demux2_stream_get_fragment_waiting_time (GstAdaptiveDemux2Stream *
+ stream)
+{
+ GstAdaptiveDemux2StreamClass *klass =
+ GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
+
+ if (klass->get_fragment_waiting_time)
+ return klass->get_fragment_waiting_time (stream);
+ return 0;
+}
+
+/* must be called from the scheduler */
+/* Called from: the ::finish_fragment() handlers when an *actual* fragment is
+ * done
+ *
+ * @duration: Is the duration of the advancement starting from
+ * stream->current_position which might not be the fragment duration after a
+ * seek.
+ */
+GstFlowReturn
+gst_adaptive_demux2_stream_advance_fragment (GstAdaptiveDemux2Stream * stream,
+ GstClockTime duration)
+{
+ if (stream->last_ret != GST_FLOW_OK)
+ return stream->last_ret;
+
+ GstAdaptiveDemux2StreamClass *klass =
+ GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
+ GstAdaptiveDemux *demux = stream->demux;
+ GstFlowReturn ret = GST_FLOW_OK;
+
+ g_assert (klass->advance_fragment != NULL);
+
+ GST_LOG_OBJECT (stream,
+ "stream_time %" GST_STIME_FORMAT " duration:%" GST_TIME_FORMAT,
+ GST_STIME_ARGS (stream->fragment.stream_time), GST_TIME_ARGS (duration));
+
+ stream->download_error_count = 0;
+ g_clear_error (&stream->last_error);
+
+#if 0
+ /* FIXME - url has no indication of byte ranges for subsegments */
+ /* FIXME: Reenable statistics sending? */
+ gst_element_post_message (GST_ELEMENT_CAST (demux),
+ gst_message_new_element (GST_OBJECT_CAST (demux),
+ gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
+ "manifest-uri", G_TYPE_STRING,
+ demux->manifest_uri, "uri", G_TYPE_STRING,
+ stream->fragment.uri, "fragment-start-time",
+ GST_TYPE_CLOCK_TIME, stream->download_start_time,
+ "fragment-stop-time", GST_TYPE_CLOCK_TIME,
+ gst_util_get_timestamp (), "fragment-size", G_TYPE_UINT64,
+ stream->download_total_bytes, "fragment-download-time",
+ GST_TYPE_CLOCK_TIME, stream->last_download_time, NULL)));
+#endif
+
+ /* Don't update to the end of the segment if in reverse playback */
+ GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
+ if (GST_CLOCK_TIME_IS_VALID (duration) && demux->segment.rate > 0) {
+ stream->parse_segment.position += duration;
+ stream->current_position += duration;
+
+ GST_DEBUG_OBJECT (stream,
+ "stream position now %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (stream->current_position));
+ }
+ GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
+
+ /* When advancing with a non 1.0 rate on live streams, we need to check
+ * the live seeking range again to make sure we can still advance to
+ * that position */
+ if (demux->segment.rate != 1.0 && gst_adaptive_demux_is_live (demux)) {
+ if (!gst_adaptive_demux2_stream_in_live_seek_range (demux, stream))
+ ret = GST_FLOW_EOS;
+ else
+ ret = klass->advance_fragment (stream);
+ } else if (gst_adaptive_demux_is_live (demux)
+ || gst_adaptive_demux2_stream_has_next_fragment (stream)) {
+ ret = klass->advance_fragment (stream);
+ } else {
+ ret = GST_FLOW_EOS;
+ }
+
+ stream->download_start_time =
+ GST_TIME_AS_USECONDS (gst_adaptive_demux2_get_monotonic_time (demux));
+
+ /* Always check if we need to switch bitrate on OK, or when live
+ * (it's normal to have EOS on advancing in live when we hit the
+ * end of the manifest) */
+ if (ret == GST_FLOW_OK || gst_adaptive_demux_is_live (demux)) {
+ GST_DEBUG_OBJECT (stream, "checking if stream requires bitrate change");
+ if (gst_adaptive_demux2_stream_select_bitrate (demux, stream,
+ gst_adaptive_demux2_stream_update_current_bitrate (stream))) {
+ GST_DEBUG_OBJECT (stream, "Bitrate changed. Returning FLOW_SWITCH");
+ stream->need_header = TRUE;
+ ret = (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH;
+ }
+ }
+
+ stream->last_ret = ret;
+ return stream->last_ret;
+}
+
+/* TRACKS_LOCK held */
+static GstAdaptiveDemuxTrack *
+gst_adaptive_demux2_stream_find_track_of_type (GstAdaptiveDemux2Stream * stream,
+ GstStreamType stream_type)
+{
+ GList *iter;
+
+ for (iter = stream->tracks; iter; iter = iter->next) {
+ GstAdaptiveDemuxTrack *track = iter->data;
+
+ if (track->type == stream_type)
+ return track;
+ }
+
+ return NULL;
+}
+
+/* TRACKS lock held */
+static void
+gst_adaptive_demux2_stream_update_track_ids (GstAdaptiveDemux2Stream * stream)
+{
+ guint i;
+
+ GST_DEBUG_OBJECT (stream, "Updating track information from collection");
+
+ for (i = 0; i < gst_stream_collection_get_size (stream->stream_collection);
+ i++) {
+ GstStream *gst_stream =
+ gst_stream_collection_get_stream (stream->stream_collection, i);
+ GstStreamType stream_type = gst_stream_get_stream_type (gst_stream);
+ GstAdaptiveDemuxTrack *track;
+
+ if (stream_type == GST_STREAM_TYPE_UNKNOWN)
+ continue;
+ track = gst_adaptive_demux2_stream_find_track_of_type (stream, stream_type);
+ if (!track) {
+ GST_DEBUG_OBJECT (stream,
+ "We don't have an existing track to handle stream %" GST_PTR_FORMAT,
+ gst_stream);
+ continue;
+ }
+
+ if (track->upstream_stream_id)
+ g_free (track->upstream_stream_id);
+ track->upstream_stream_id =
+ g_strdup (gst_stream_get_stream_id (gst_stream));
+ }
+
+}
+
+static gboolean
+tags_have_language_info (GstTagList * tags)
+{
+ const gchar *language = NULL;
+
+ if (tags == NULL)
+ return FALSE;
+
+ if (gst_tag_list_peek_string_index (tags, GST_TAG_LANGUAGE_CODE, 0,
+ &language))
+ return TRUE;
+ if (gst_tag_list_peek_string_index (tags, GST_TAG_LANGUAGE_NAME, 0,
+ &language))
+ return TRUE;
+
+ return FALSE;
+}
+
+static gboolean
+can_handle_collection (GstAdaptiveDemux2Stream * stream,
+ GstStreamCollection * collection)
+{
+ guint i;
+ guint nb_audio, nb_video, nb_text;
+ gboolean have_audio_languages = TRUE;
+ gboolean have_text_languages = TRUE;
+
+ nb_audio = nb_video = nb_text = 0;
+
+ for (i = 0; i < gst_stream_collection_get_size (collection); i++) {
+ GstStream *gst_stream = gst_stream_collection_get_stream (collection, i);
+ GstTagList *tags = gst_stream_get_tags (gst_stream);
+
+ GST_DEBUG_OBJECT (stream,
+ "Internal collection stream #%d %" GST_PTR_FORMAT, i, gst_stream);
+ switch (gst_stream_get_stream_type (gst_stream)) {
+ case GST_STREAM_TYPE_AUDIO:
+ have_audio_languages &= tags_have_language_info (tags);
+ nb_audio++;
+ break;
+ case GST_STREAM_TYPE_VIDEO:
+ nb_video++;
+ break;
+ case GST_STREAM_TYPE_TEXT:
+ have_text_languages &= tags_have_language_info (tags);
+ nb_text++;
+ break;
+ default:
+ break;
+ }
+ if (tags)
+ gst_tag_list_unref (tags);
+ }
+
+ /* Check that we either have at most 1 of each track type, or that
+ * we have language tags for each to tell which is which */
+ if (nb_video > 1 ||
+ (nb_audio > 1 && !have_audio_languages) ||
+ (nb_text > 1 && !have_text_languages)) {
+ GST_WARNING
+ ("Collection can't be handled (nb_audio:%d, nb_video:%d, nb_text:%d)",
+ nb_audio, nb_video, nb_text);
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+/* Called from the demuxer when it receives a GstStreamCollection on the bus
+ * for this stream. */
+/* TRACKS lock held */
+gboolean
+gst_adaptive_demux2_stream_handle_collection (GstAdaptiveDemux2Stream * stream,
+ GstStreamCollection * collection, gboolean * had_pending_tracks)
+{
+ g_assert (had_pending_tracks != NULL);
+
+ /* Check whether the collection is "sane" or not.
+ *
+ * In the context of adaptive streaming, we can only handle multiplexed
+ * content where the output sub-streams can be matched reliably to the various
+ * tracks. That is, only a single stream of each type, or if there are
+ * multiple audio/subtitle tracks, they can be differentiated by language
+ * (and possibly in the future by codec).
+ */
+ if (!can_handle_collection (stream, collection)) {
+ return FALSE;
+ }
+
+ /* Store the collection on the stream */
+ gst_object_replace ((GstObject **) & stream->stream_collection,
+ (GstObject *) collection);
+
+ /* If stream is marked as having pending_tracks, ask the subclass to
+ * handle that and create the tracks now */
+ if (stream->pending_tracks) {
+ GstAdaptiveDemux2StreamClass *klass =
+ GST_ADAPTIVE_DEMUX2_STREAM_GET_CLASS (stream);
+ g_assert (klass->create_tracks);
+ klass->create_tracks (stream);
+ stream->pending_tracks = FALSE;
+ *had_pending_tracks = TRUE;
+ } else {
+ g_assert (stream->tracks);
+
+ /* Now we should have assigned tracks, match them to the
+ * collection and update the pending upstream stream_id
+ * for each of them based on the collection information. */
+ gst_adaptive_demux2_stream_update_track_ids (stream);
+ }
+
+ return TRUE;
+}
+
+static guint64
+_update_average_bitrate (GstAdaptiveDemux2Stream * stream, guint64 new_bitrate)
+{
+ gint index = stream->moving_index % NUM_LOOKBACK_FRAGMENTS;
+
+ stream->moving_bitrate -= stream->fragment_bitrates[index];
+ stream->fragment_bitrates[index] = new_bitrate;
+ stream->moving_bitrate += new_bitrate;
+
+ stream->moving_index += 1;
+
+ if (stream->moving_index > NUM_LOOKBACK_FRAGMENTS)
+ return stream->moving_bitrate / NUM_LOOKBACK_FRAGMENTS;
+ return stream->moving_bitrate / stream->moving_index;
+}
+
+guint64
+gst_adaptive_demux2_stream_update_current_bitrate (GstAdaptiveDemux2Stream *
+ stream)
+{
+ guint64 average_bitrate;
+ guint64 fragment_bitrate;
+ guint connection_speed, min_bitrate, max_bitrate, target_download_rate;
+
+ fragment_bitrate = stream->last_bitrate;
+ GST_DEBUG_OBJECT (stream, "Download bitrate is : %" G_GUINT64_FORMAT " bps",
+ fragment_bitrate);
+
+ average_bitrate = _update_average_bitrate (stream, fragment_bitrate);
+
+ GST_INFO_OBJECT (stream,
+ "last fragment bitrate was %" G_GUINT64_FORMAT, fragment_bitrate);
+ GST_INFO_OBJECT (stream,
+ "Last %u fragments average bitrate is %" G_GUINT64_FORMAT,
+ NUM_LOOKBACK_FRAGMENTS, average_bitrate);
+
+ /* Conservative approach, make sure we don't upgrade too fast */
+ stream->current_download_rate = MIN (average_bitrate, fragment_bitrate);
+
+ /* For the video stream, update the demuxer reported download
+ * rate. FIXME: Move all bandwidth estimation to the
+ * download helper and make it the demuxer's responsibility
+ * to select the right set of things to download within
+ * that bandwidth */
+ GstAdaptiveDemux *demux = stream->demux;
+ GST_OBJECT_LOCK (demux);
+
+ /* If this is stream containing our video, update the overall demuxer
+ * reported bitrate and notify, to give the application a
+ * chance to choose a new connection-bitrate */
+ if ((stream->stream_type & GST_STREAM_TYPE_VIDEO) != 0) {
+ demux->current_download_rate = stream->current_download_rate;
+ GST_OBJECT_UNLOCK (demux);
+ g_object_notify (G_OBJECT (demux), "current-bandwidth");
+ GST_OBJECT_LOCK (demux);
+ }
+
+ connection_speed = demux->connection_speed;
+ min_bitrate = demux->min_bitrate;
+ max_bitrate = demux->max_bitrate;
+ GST_OBJECT_UNLOCK (demux);
+
+ if (connection_speed) {
+ GST_LOG_OBJECT (stream, "connection-speed is set to %u kbps, using it",
+ connection_speed / 1000);
+ return connection_speed;
+ }
+
+ /* No explicit connection_speed, so choose the new variant to use as a
+ * fraction of the measured download rate */
+ target_download_rate =
+ CLAMP (stream->current_download_rate, 0,
+ G_MAXUINT) * demux->bandwidth_target_ratio;
+
+ GST_DEBUG_OBJECT (stream, "Bitrate after target ratio limit (%0.2f): %u",
+ demux->bandwidth_target_ratio, target_download_rate);
+
+#if 0
+ /* Debugging code, modulate the bitrate every few fragments */
+ {
+ static guint ctr = 0;
+ if (ctr % 3 == 0) {
+ GST_INFO_OBJECT (stream, "Halving reported bitrate for debugging");
+ target_download_rate /= 2;
+ }
+ ctr++;
+ }
+#endif
+
+ if (min_bitrate > 0 && target_download_rate < min_bitrate) {
+ target_download_rate = min_bitrate;
+ GST_LOG_OBJECT (stream, "Bitrate adjusted due to min-bitrate : %u bits/s",
+ min_bitrate);
+ }
+
+ if (max_bitrate > 0 && target_download_rate > max_bitrate) {
+ target_download_rate = max_bitrate;
+ GST_LOG_OBJECT (stream, "Bitrate adjusted due to max-bitrate : %u bits/s",
+ max_bitrate);
+ }
+
+ GST_DEBUG_OBJECT (stream, "Returning target download rate of %u bps",
+ target_download_rate);
+
+ return target_download_rate;
+}
+
+void
+gst_adaptive_demux2_stream_fragment_clear (GstAdaptiveDemux2StreamFragment * f)
+{
+ g_free (f->uri);
+ f->uri = NULL;
+ f->range_start = 0;
+ f->range_end = -1;
+
+ g_free (f->header_uri);
+ f->header_uri = NULL;
+ f->header_range_start = 0;
+ f->header_range_end = -1;
+
+ g_free (f->index_uri);
+ f->index_uri = NULL;
+ f->index_range_start = 0;
+ f->index_range_end = -1;
+
+ f->stream_time = GST_CLOCK_STIME_NONE;
+ f->duration = GST_CLOCK_TIME_NONE;
+ f->finished = FALSE;
+}