From 0173afa38cc6f8a2cb810f99a24ddb1d5d5f4868 Mon Sep 17 00:00:00 2001 From: Vincent Penquerc'h Date: Sat, 13 Aug 2011 14:18:56 +0100 Subject: [PATCH] oggdemux: implement push mode seeking This patch implements seeking in push mode (eg, over the net) in Ogg, using the double bisection method. As a side effect, it also fixes duration determination of network streams, by seeking to the end to check the actual duration. Known issues: - Getting an EOS while seeking stops the streaming task, I can't find a way to prevent this (eg, by issuing a seek in the event handler). - Seeking twice in a VERY short succession with playbin2 fails for streams with subtitles, we end up pushing in a dataqueue which is flushing. Rare in normal use AFAICT. - Seeking is slow on slow links - byte ranges guesses could be made better, decreasing the number of required requests - If no granule position is found in the last 64 KB of a stream, duration will be left unknown (should be pretty rare) https://bugzilla.gnome.org/show_bug.cgi?id=621897 --- ext/ogg/gstoggdemux.c | 888 +++++++++++++++++++++++++++++++++++++++--- ext/ogg/gstoggdemux.h | 27 ++ 2 files changed, 866 insertions(+), 49 deletions(-) diff --git a/ext/ogg/gstoggdemux.c b/ext/ogg/gstoggdemux.c index ac20a4623..16f559e83 100644 --- a/ext/ogg/gstoggdemux.c +++ b/ext/ogg/gstoggdemux.c @@ -47,11 +47,30 @@ #define CHUNKSIZE (8500) /* this is out of vorbisfile */ +/* we hope we get a granpos within this many bytes off the end */ +#define DURATION_CHUNK_OFFSET (64*1024) + +/* stop duration checks within this much of EOS */ +#define EOS_AVOIDANCE_THRESHOLD 8192 + #define GST_FLOW_LIMIT GST_FLOW_CUSTOM_ERROR +#define GST_FLOW_SKIP_PUSH GST_FLOW_CUSTOM_SUCCESS_1 #define GST_CHAIN_LOCK(ogg) g_mutex_lock((ogg)->chain_lock) #define GST_CHAIN_UNLOCK(ogg) g_mutex_unlock((ogg)->chain_lock) +#define GST_PUSH_LOCK(ogg) \ + do { \ + GST_TRACE_OBJECT(ogg, "Push lock"); \ + g_mutex_lock((ogg)->push_lock); \ + } while(0) + +#define GST_PUSH_UNLOCK(ogg) \ + do { \ + GST_TRACE_OBJECT(ogg, "Push unlock"); \ + g_mutex_unlock((ogg)->push_lock); \ + } while(0) + GST_DEBUG_CATEGORY (gst_ogg_demux_debug); GST_DEBUG_CATEGORY (gst_ogg_demux_setup_debug); #define GST_CAT_DEFAULT gst_ogg_demux_debug @@ -125,6 +144,11 @@ static void gst_ogg_demux_sync_streams (GstOggDemux * ogg); GstCaps *gst_ogg_demux_set_header_on_caps (GstOggDemux * ogg, GstCaps * caps, GList * headers); +static gboolean gst_ogg_demux_send_event (GstOggDemux * ogg, GstEvent * event); +static gboolean gst_ogg_demux_perform_seek_push (GstOggDemux * ogg, + GstEvent * event); +static gboolean gst_ogg_demux_check_duration_push (GstOggDemux * ogg, + GstSeekFlags flags, GstEvent * event); GType gst_ogg_pad_get_type (void); G_DEFINE_TYPE (GstOggPad, gst_ogg_pad, GST_TYPE_PAD); @@ -304,8 +328,7 @@ gst_ogg_pad_src_query (GstPad * pad, GstQuery * query) GstOggPad *pad = g_array_index (ogg->current_chain->streams, GstOggPad *, i); - seekable |= (pad->map.index != NULL && pad->map.n_index != 0); - + seekable = TRUE; if (pad->map.index != NULL && pad->map.n_index != 0) { GstOggIndex *idx; GstClockTime idx_time; @@ -318,6 +341,8 @@ gst_ogg_pad_src_query (GstPad * pad, GstQuery * query) stop = idx_time; else stop = MAX (idx_time, stop); + } else { + stop = -1; /* we've no clue, sadly, without seeking */ } } } @@ -455,6 +480,22 @@ gst_ogg_demux_chain_peer (GstOggPad * pad, ogg_packet * packet, cret = GST_FLOW_OK; + GST_PUSH_LOCK (ogg); + if (!ogg->pullmode && ogg->push_state == PUSH_PLAYING + && ogg->push_time_length == GST_CLOCK_TIME_NONE + && !ogg->push_disable_seeking) { + if (!ogg->building_chain) { + /* we got all headers, now try to get duration */ + if (!gst_ogg_demux_check_duration_push (ogg, GST_SEEK_FLAG_FLUSH, NULL)) { + GST_PUSH_UNLOCK (ogg); + return GST_FLOW_OK; + } + } + GST_PUSH_UNLOCK (ogg); + return GST_FLOW_OK; + } + GST_PUSH_UNLOCK (ogg); + GST_DEBUG_OBJECT (ogg, "%p streaming to peer serial %08x", pad, pad->map.serialno); @@ -704,6 +745,36 @@ gst_ogg_demux_collect_start_time (GstOggDemux * ogg, GstOggChain * chain) return start_time; } +static GstClockTime +gst_ogg_demux_collect_sync_time (GstOggDemux * ogg, GstOggChain * chain) +{ + gint i; + GstClockTime sync_time = GST_CLOCK_TIME_NONE; + + if (!chain) { + GST_WARNING_OBJECT (ogg, "No chain!"); + return GST_CLOCK_TIME_NONE; + } + + for (i = 0; i < chain->streams->len; i++) { + GstOggPad *pad = g_array_index (chain->streams, GstOggPad *, i); + + if (pad->map.is_sparse) + continue; + + if (pad->push_sync_time == GST_CLOCK_TIME_NONE) { + sync_time = GST_CLOCK_TIME_NONE; + break; + } else { + if (sync_time == GST_CLOCK_TIME_NONE) + sync_time = pad->push_sync_time; + else + sync_time = MAX (sync_time, pad->push_sync_time); + } + } + return sync_time; +} + /* submit a packet to the oggpad, this function will run the * typefind code for the pad if this is the first packet for this * stream @@ -838,7 +909,10 @@ gst_ogg_pad_submit_packet (GstOggPad * pad, ogg_packet * packet) pad->start_time = gst_ogg_stream_granule_to_time (&pad->map, start_granule); - GST_DEBUG ("start time %" G_GINT64_FORMAT, pad->start_time); + GST_DEBUG_OBJECT (ogg, + "start time %" GST_TIME_FORMAT " (%" GST_TIME_FORMAT ") for %s", + GST_TIME_ARGS (pad->start_time), GST_TIME_ARGS (pad->start_time), + gst_ogg_stream_get_media_type (&pad->map)); } else { packet->granulepos = gst_ogg_stream_granule_to_granulepos (&pad->map, pad->map.accumulated_granule, pad->keyframe_granule); @@ -887,8 +961,25 @@ gst_ogg_pad_submit_packet (GstOggPad * pad, ogg_packet * packet) segment_time = chain->begin_time; /* create the newsegment event we are going to send out */ - event = gst_event_new_new_segment (FALSE, ogg->segment.rate, - GST_FORMAT_TIME, start_time, chain->segment_stop, segment_time); + GST_PUSH_LOCK (ogg); + if (!ogg->pullmode && ogg->push_state == PUSH_LINEAR2) { + /* if we are fast forwarding to the actual seek target, + ensure previous frames are clipped */ + GST_DEBUG_OBJECT (ogg, + "Resynced, starting segment at %" GST_TIME_FORMAT + ", start_time %" GST_TIME_FORMAT, + GST_TIME_ARGS (ogg->push_seek_time_original_target), + GST_TIME_ARGS (start_time)); + event = + gst_event_new_new_segment (FALSE, ogg->push_seek_rate, + GST_FORMAT_TIME, ogg->push_seek_time_original_target, -1, + ogg->push_seek_time_original_target); + ogg->push_state = PUSH_PLAYING; + } else { + event = gst_event_new_new_segment (FALSE, ogg->segment.rate, + GST_FORMAT_TIME, start_time, chain->segment_stop, segment_time); + } + GST_PUSH_UNLOCK (ogg); ogg->resync = FALSE; } @@ -1001,6 +1092,424 @@ could_not_submit: } } +static void +gst_ogg_demux_setup_bisection_bounds (GstOggDemux * ogg) +{ + if (ogg->push_last_seek_time >= ogg->push_seek_time_target) { + GST_DEBUG_OBJECT (ogg, "We overshot by %" GST_TIME_FORMAT, + GST_TIME_ARGS (ogg->push_last_seek_time - ogg->push_seek_time_target)); + ogg->push_offset1 = ogg->push_last_seek_offset; + ogg->push_time1 = ogg->push_last_seek_time; + } else { + GST_DEBUG_OBJECT (ogg, "We undershot by %" GST_TIME_FORMAT, + GST_TIME_ARGS (ogg->push_seek_time_target - ogg->push_last_seek_time)); + ogg->push_offset0 = ogg->push_last_seek_offset; + ogg->push_time0 = ogg->push_last_seek_time; + } +} + +static gint64 +gst_ogg_demux_estimate_bisection_target (GstOggDemux * ogg) +{ + gint64 best; + gint64 segment_bitrate; + + /* we might not know the length of the stream in time, + so push_time1 might not be set */ + GST_DEBUG_OBJECT (ogg, + "push time 1: %" GST_TIME_FORMAT ", dbytes %" G_GINT64_FORMAT, + GST_TIME_ARGS (ogg->push_time1), ogg->push_offset1 - ogg->push_offset0); + if (ogg->push_time1 == GST_CLOCK_TIME_NONE) { + GST_DEBUG_OBJECT (ogg, + "New segment to consider: bytes %" G_GINT64_FORMAT " %" G_GINT64_FORMAT + ", time %" GST_TIME_FORMAT " (open ended)", ogg->push_offset0, + ogg->push_offset1, GST_TIME_ARGS (ogg->push_time0)); + if (ogg->push_last_seek_time == ogg->push_start_time) { + /* if we're at start and don't know the end time, we can't estimate + bitrate, so get the nominal declared bitrate as a failsafe, or some + random constant which will be discarded after we made a (probably + dire) first guess */ + segment_bitrate = (ogg->bitrate > 0 ? ogg->bitrate : 1000); + } else { + segment_bitrate = + gst_util_uint64_scale (ogg->push_last_seek_offset - 0, + 8 * GST_SECOND, ogg->push_last_seek_time - ogg->push_start_time); + } + best = + ogg->push_offset0 + + gst_util_uint64_scale (ogg->push_seek_time_target - ogg->push_time0, + segment_bitrate, 8 * GST_SECOND); + } else { + GST_DEBUG_OBJECT (ogg, + "New segment to consider: bytes %" G_GINT64_FORMAT " %" G_GINT64_FORMAT + ", time %" GST_TIME_FORMAT " %" GST_TIME_FORMAT, ogg->push_offset0, + ogg->push_offset1, GST_TIME_ARGS (ogg->push_time0), + GST_TIME_ARGS (ogg->push_time1)); + if (ogg->push_time0 == ogg->push_time1) { + best = ogg->push_offset0; + } else { + segment_bitrate = + gst_util_uint64_scale (ogg->push_offset1 - ogg->push_offset0, + 8 * GST_SECOND, ogg->push_time1 - ogg->push_time0); + GST_DEBUG_OBJECT (ogg, + "Local bitrate on the %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT + " segment: %" G_GINT64_FORMAT, GST_TIME_ARGS (ogg->push_time0), + GST_TIME_ARGS (ogg->push_time1)); + best = + ogg->push_offset0 + + gst_util_uint64_scale (ogg->push_seek_time_target - ogg->push_time0, + segment_bitrate, 8 * GST_SECOND); + } + } + + /* offset by typical page size */ + best -= CHUNKSIZE; + if (best < ogg->push_offset0) + best = ogg->push_offset0; + if (best < 0) + best = 0; + + return best; +} + +static void +gst_ogg_demux_record_keyframe_time (GstOggDemux * ogg, GstOggPad * pad, + ogg_int64_t granpos) +{ + gint64 kf_granule; + GstClockTime kf_time; + + kf_granule = gst_ogg_stream_granulepos_to_key_granule (&pad->map, granpos); + kf_time = gst_ogg_stream_granule_to_time (&pad->map, kf_granule); + + pad->push_kf_time = kf_time; +} + +/* returns the earliest keyframe time for all non sparse pads in the chain, + * if known, and GST_CLOCK_TIME_NONE if not */ +static GstClockTime +gst_ogg_demux_get_earliest_keyframe_time (GstOggDemux * ogg) +{ + GstClockTime t = GST_CLOCK_TIME_NONE; + GstOggChain *chain = ogg->building_chain; + int i; + + if (!chain) { + GST_WARNING_OBJECT (ogg, "No chain!"); + return GST_CLOCK_TIME_NONE; + } + for (i = 0; i < chain->streams->len; i++) { + GstOggPad *pad = g_array_index (chain->streams, GstOggPad *, i); + + if (pad->map.is_sparse) + continue; + if (pad->push_kf_time == GST_CLOCK_TIME_NONE) + return GST_CLOCK_TIME_NONE; + if (t == GST_CLOCK_TIME_NONE || pad->push_kf_time < t) + t = pad->push_kf_time; + } + + return t; +} + +/* MUST be called with the push lock locked, and will unlock it + regardless of return value. */ +static GstFlowReturn +gst_ogg_demux_seek_back_after_push_duration_check_unlock (GstOggDemux * ogg) +{ + GstEvent *event; + + /* Get the delayed event, if any */ + event = ogg->push_mode_seek_delayed_event; + ogg->push_mode_seek_delayed_event = NULL; + + ogg->push_state = PUSH_PLAYING; + + GST_PUSH_UNLOCK (ogg); + + if (event) { + /* If there is one, perform it */ + gst_ogg_demux_perform_seek_push (ogg, event); + } else { + /* If there wasn't, seek back at start to start normal playback */ + GST_INFO_OBJECT (ogg, "Seeking back to 0 after duration check"); + event = gst_event_new_seek (1.0, GST_FORMAT_BYTES, + GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_FLUSH, + GST_SEEK_TYPE_SET, 1, GST_SEEK_TYPE_NONE, GST_CLOCK_TIME_NONE); + if (!gst_pad_push_event (ogg->sinkpad, event)) { + GST_WARNING_OBJECT (ogg, "Failed seeking back to start"); + return GST_FLOW_ERROR; + } + } + + return GST_FLOW_OK; +} + +static gboolean +gst_ogg_pad_handle_push_mode_state (GstOggPad * pad, ogg_page * page) +{ + GstOggDemux *ogg = pad->ogg; + ogg_int64_t granpos = ogg_page_granulepos (page); + + GST_PUSH_LOCK (ogg); + if (granpos >= 0) { + if (ogg->push_start_time == GST_CLOCK_TIME_NONE) { + ogg->push_start_time = + gst_ogg_stream_get_start_time_for_granulepos (&pad->map, granpos); + GST_DEBUG_OBJECT (ogg, "Stream start time: %" GST_TIME_FORMAT, + GST_TIME_ARGS (ogg->push_start_time)); + } + ogg->push_time_offset = + gst_ogg_stream_get_end_time_for_granulepos (&pad->map, granpos); + if (ogg->push_time_offset > 0) { + GST_DEBUG_OBJECT (ogg, "Bitrate since start: %" G_GUINT64_FORMAT, + gst_util_uint64_scale (ogg->push_byte_offset, 8 * GST_SECOND, + ogg->push_time_offset)); + } + + if (ogg->push_state == PUSH_DURATION) { + GstClockTime t = + gst_ogg_stream_get_end_time_for_granulepos (&pad->map, granpos); + + if (ogg->total_time == GST_CLOCK_TIME_NONE || t > ogg->total_time) { + GST_DEBUG_OBJECT (ogg, "New total time: %" GST_TIME_FORMAT, + GST_TIME_ARGS (t)); + ogg->total_time = t; + ogg->push_time_length = t; + } + + /* If we were determining the duration of the stream, we're now done, + and can get back to sending the original event we delayed. + We stop a bit before the end of the stream, as if we get a EOS + event and there is a queue2 upstream (such as when using playbin2), + it will pause the task *after* we come back from the EOS handler, + so we cannot prevent the pausing by issuing a seek. */ + if (ogg->push_byte_offset + EOS_AVOIDANCE_THRESHOLD >= + ogg->push_byte_length) { + GstMessage *message; + GstFlowReturn res; + + /* tell the pipeline we've just found out the duration */ + GST_INFO_OBJECT (ogg, "New duration found: %" GST_TIME_FORMAT, + GST_TIME_ARGS (ogg->total_time)); + message = + gst_message_new_duration (GST_OBJECT (ogg), GST_FORMAT_TIME, + ogg->total_time); + gst_element_post_message (GST_ELEMENT (ogg), message); + + GST_DEBUG_OBJECT (ogg, + "We're close enough to the end, and we're scared " + "to get too close, seeking back to start"); + + res = gst_ogg_demux_seek_back_after_push_duration_check_unlock (ogg); + if (res != GST_FLOW_OK) + return res; + return GST_FLOW_SKIP_PUSH; + } else { + GST_PUSH_UNLOCK (ogg); + } + return GST_FLOW_SKIP_PUSH; + } + } + + /* if we're seeking, look at time, and decide what to do */ + if (ogg->push_state != PUSH_PLAYING && ogg->push_state != PUSH_LINEAR2) { + GstClockTime t; + gint64 best = -1; + GstEvent *sevent; + int res; + gboolean close_enough; + + /* ignore -1 granpos when seeking, we want to sync on a real granpos */ + if (granpos < 0) { + GST_PUSH_UNLOCK (ogg); + if (ogg_stream_pagein (&pad->map.stream, page) != 0) + goto choked; + return GST_FLOW_SKIP_PUSH; + } + + t = gst_ogg_stream_get_end_time_for_granulepos (&pad->map, granpos); + + if (ogg->push_state == PUSH_BISECT1 || ogg->push_state == PUSH_BISECT2) { + GstClockTime sync_time; + + if (pad->push_sync_time == GST_CLOCK_TIME_NONE) + pad->push_sync_time = t; + GST_DEBUG_OBJECT (ogg, "Got timestamp %" GST_TIME_FORMAT " for %s", + GST_TIME_ARGS (t), gst_ogg_stream_get_media_type (&pad->map)); + sync_time = gst_ogg_demux_collect_sync_time (ogg, ogg->building_chain); + if (sync_time == GST_CLOCK_TIME_NONE) { + GST_PUSH_UNLOCK (ogg); + GST_DEBUG_OBJECT (ogg, + "Not enough timing info collected for sync, waiting for more"); + if (ogg_stream_pagein (&pad->map.stream, page) != 0) + goto choked; + return GST_FLOW_SKIP_PUSH; + } + ogg->push_last_seek_time = sync_time; + + GST_DEBUG_OBJECT (ogg, + "Bisection just seeked at %" G_GINT64_FORMAT ", time %" + GST_TIME_FORMAT ", target was %" GST_TIME_FORMAT, + ogg->push_last_seek_offset, + GST_TIME_ARGS (ogg->push_last_seek_time), + GST_TIME_ARGS (ogg->push_seek_time_target)); + + if (ogg->push_time1 != GST_CLOCK_TIME_NONE) { + GST_DEBUG_OBJECT (ogg, + "Interval was %" G_GINT64_FORMAT " - %" G_GINT64_FORMAT " (%" + G_GINT64_FORMAT "), time %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT + " (%" GST_TIME_FORMAT ")", ogg->push_offset0, ogg->push_offset1, + ogg->push_offset1 - ogg->push_offset0, + GST_TIME_ARGS (ogg->push_time0), GST_TIME_ARGS (ogg->push_time1), + GST_TIME_ARGS (ogg->push_time1 - ogg->push_time0)); + } else { + GST_DEBUG_OBJECT (ogg, + "Interval was %" G_GINT64_FORMAT " - %" G_GINT64_FORMAT " (%" + G_GINT64_FORMAT "), time %" GST_TIME_FORMAT " - unknown", + ogg->push_offset0, ogg->push_offset1, + ogg->push_offset1 - ogg->push_offset0, + GST_TIME_ARGS (ogg->push_time0)); + } + + gst_ogg_demux_setup_bisection_bounds (ogg); + + best = gst_ogg_demux_estimate_bisection_target (ogg); + + if (ogg->push_seek_time_target == 0) { + GST_DEBUG_OBJECT (ogg, "Seeking to 0, deemed close enough"); + close_enough = (ogg->push_last_seek_time == 0); + } else { + /* TODO: make this dependent on framerate ? */ + GstClockTime threshold = GST_SECOND / 2; + + /* We want to be within half a second before the target */ + if (threshold > ogg->push_seek_time_target) + threshold = ogg->push_seek_time_target; + close_enough = ogg->push_last_seek_time < ogg->push_seek_time_target + && ogg->push_last_seek_time >= + ogg->push_seek_time_target - threshold; + GST_DEBUG_OBJECT (ogg, + "testing if we're close enough: %" GST_TIME_FORMAT " <= %" + GST_TIME_FORMAT " < %" GST_TIME_FORMAT " ? %s", + GST_TIME_ARGS (ogg->push_seek_time_target - threshold), + GST_TIME_ARGS (ogg->push_last_seek_time), + GST_TIME_ARGS (ogg->push_seek_time_target), + close_enough ? "Yes" : "No"); + } + + if (close_enough || best == ogg->push_last_seek_offset) { + if (ogg->push_state == PUSH_BISECT1) { + /* we now know the time segment we'll have to search for + the second bisection */ + ogg->push_time0 = ogg->push_start_time; + ogg->push_offset0 = 0; + + GST_DEBUG_OBJECT (ogg, + "Seek to %" GST_TIME_FORMAT + " (%lx) done, now gathering pages for all non-sparse streams", + GST_TIME_ARGS (ogg->push_seek_time_target), (long) granpos); + ogg->push_state = PUSH_LINEAR1; + } else { + /* If we're asked for an accurate seek, we'll go forward till + we get to the original seek target time, else we'll just drop + here at the keyframe */ + if (ogg->push_seek_flags & GST_SEEK_FLAG_ACCURATE) { + GST_INFO_OBJECT (ogg, + "Seek to keyframe at %" GST_TIME_FORMAT " done (we're at %" + GST_TIME_FORMAT "), skipping to original target (%" + GST_TIME_FORMAT ")", + GST_TIME_ARGS (ogg->push_seek_time_target), + GST_TIME_ARGS (sync_time), + GST_TIME_ARGS (ogg->push_seek_time_original_target)); + ogg->push_state = PUSH_LINEAR2; + } else { + GST_DEBUG_OBJECT (ogg, "Seek to keyframe done, playing"); + + /* we're synced to the seek target, so flush stream and stuff + any queued pages into the stream so we start decoding there */ + ogg->push_state = PUSH_PLAYING; + } + GST_INFO_OBJECT (ogg, "Bisection needed %d + %d steps", + ogg->push_bisection_steps[0], ogg->push_bisection_steps[1]); + } + } + } else if (ogg->push_state == PUSH_LINEAR1) { + if (pad->push_kf_time == GST_CLOCK_TIME_NONE) { + GstClockTime earliest_keyframe_time; + + gst_ogg_demux_record_keyframe_time (ogg, pad, granpos); + GST_DEBUG_OBJECT (ogg, + "Previous keyframe for %s stream at %" GST_TIME_FORMAT, + gst_ogg_stream_get_media_type (&pad->map), + GST_TIME_ARGS (pad->push_kf_time)); + earliest_keyframe_time = gst_ogg_demux_get_earliest_keyframe_time (ogg); + if (earliest_keyframe_time != GST_CLOCK_TIME_NONE) { + GST_DEBUG_OBJECT (ogg, + "All non sparse streams now have a previous keyframe time," + "bisecting again to %" GST_TIME_FORMAT, + GST_TIME_ARGS (earliest_keyframe_time)); + ogg->push_seek_time_target = earliest_keyframe_time; + + ogg->push_state = PUSH_BISECT2; + best = gst_ogg_demux_estimate_bisection_target (ogg); + } + } + } + + if (ogg->push_state == PUSH_BISECT1 || ogg->push_state == PUSH_BISECT2) { + gint i; + + ogg_sync_reset (&ogg->sync); + for (i = 0; i < ogg->building_chain->streams->len; i++) { + GstOggPad *pad = + g_array_index (ogg->building_chain->streams, GstOggPad *, i); + + pad->push_sync_time = GST_CLOCK_TIME_NONE; + ogg_stream_reset (&pad->map.stream); + } + + GST_DEBUG_OBJECT (ogg, + "seeking to %" G_GINT64_FORMAT " - %" G_GINT64_FORMAT, best, + (gint64) - 1); + /* do seek */ + g_assert (best != -1); + ogg->push_bisection_steps[ogg->push_state == PUSH_BISECT2 ? 1 : 0]++; + sevent = + gst_event_new_seek (ogg->push_seek_rate, GST_FORMAT_BYTES, + ogg->push_seek_flags, GST_SEEK_TYPE_SET, best, + GST_SEEK_TYPE_NONE, -1); + + GST_PUSH_UNLOCK (ogg); + res = gst_pad_push_event (ogg->sinkpad, sevent); + if (!res) { + /* We failed to send the seek event, notify the pipeline */ + GST_ELEMENT_ERROR (ogg, RESOURCE, SEEK, (NULL), ("Failed to seek")); + return GST_FLOW_ERROR; + } + return GST_FLOW_SKIP_PUSH; + } + + if (ogg->push_state != PUSH_PLAYING) { + GST_PUSH_UNLOCK (ogg); + return GST_FLOW_SKIP_PUSH; + } + } + GST_PUSH_UNLOCK (ogg); + + return GST_FLOW_OK; + +choked: + { + GST_WARNING_OBJECT (ogg, + "ogg stream choked on page (serial %08x), " + "resetting stream", pad->map.serialno); + gst_ogg_pad_reset (pad); + /* we continue to recover */ + return GST_FLOW_SKIP_PUSH; + } +} + /* submit a page to an oggpad, this function will then submit all * the packets in the page. */ @@ -1013,7 +1522,7 @@ gst_ogg_pad_submit_page (GstOggPad * pad, ogg_page * page) ogg = pad->ogg; - /* for negative rates we read pages backwards and must therefore be carefull + /* for negative rates we read pages backwards and must therefore be careful * with continued pages */ if (ogg->segment.rate < 0.0) { gint npackets; @@ -1039,6 +1548,15 @@ gst_ogg_pad_submit_page (GstOggPad * pad, ogg_page * page) } } + /* keep track of time in push mode */ + if (!ogg->pullmode) { + result = gst_ogg_pad_handle_push_mode_state (pad, page); + if (result == GST_FLOW_SKIP_PUSH) + return GST_FLOW_OK; + if (result != GST_FLOW_OK) + return result; + } + if (ogg_stream_pagein (&pad->map.stream, page) != 0) goto choked; @@ -1275,7 +1793,6 @@ static gboolean gst_ogg_demux_sink_activate_push (GstPad * sinkpad, gboolean active); static GstStateChangeReturn gst_ogg_demux_change_state (GstElement * element, GstStateChange transition); -static gboolean gst_ogg_demux_send_event (GstOggDemux * ogg, GstEvent * event); static void gst_ogg_print (GstOggDemux * demux); @@ -1327,6 +1844,7 @@ gst_ogg_demux_init (GstOggDemux * ogg, GstOggDemuxClass * g_class) gst_element_add_pad (GST_ELEMENT (ogg), ogg->sinkpad); ogg->chain_lock = g_mutex_new (); + ogg->push_lock = g_mutex_new (); ogg->chains = g_array_new (FALSE, TRUE, sizeof (GstOggChain *)); ogg->newsegment = NULL; @@ -1341,6 +1859,7 @@ gst_ogg_demux_finalize (GObject * object) g_array_free (ogg->chains, TRUE); g_mutex_free (ogg->chain_lock); + g_mutex_free (ogg->push_lock); ogg_sync_clear (&ogg->sync); if (ogg->newsegment) @@ -1366,6 +1885,7 @@ gst_ogg_demux_reset_streams (GstOggDemux * ogg) stream->map.accumulated_granule = 0; } ogg->building_chain = chain; + GST_DEBUG_OBJECT (ogg, "Resetting current chain"); ogg->current_chain = NULL; ogg->resync = TRUE; } @@ -1386,16 +1906,57 @@ gst_ogg_demux_sink_event (GstPad * pad, GstEvent * event) GST_DEBUG_OBJECT (ogg, "got a flush stop event"); ogg_sync_reset (&ogg->sync); res = gst_ogg_demux_send_event (ogg, event); - gst_ogg_demux_reset_streams (ogg); + if (ogg->pullmode || ogg->push_state != PUSH_DURATION) { + /* it's starting to feel reaaaally dirty :( + if we're on a spliced seek to get duration, don't reset streams, + we'll need them for the delayed seek */ + gst_ogg_demux_reset_streams (ogg); + } break; case GST_EVENT_NEWSEGMENT: GST_DEBUG_OBJECT (ogg, "got a new segment event"); + { + gboolean update; + GstFormat format; + gdouble rate, arate; + gint64 start, stop, time; + + gst_event_parse_new_segment_full (event, &update, &rate, &arate, + &format, &start, &stop, &time); + if (format == GST_FORMAT_BYTES) { + if (!ogg->pullmode) { + GST_PUSH_LOCK (ogg); + ogg->push_byte_offset = start; + ogg->push_last_seek_offset = start; + GST_PUSH_UNLOCK (ogg); + } + } + } gst_event_unref (event); res = TRUE; break; case GST_EVENT_EOS: { GST_DEBUG_OBJECT (ogg, "got an EOS event"); +#if 0 + /* This would be what is needed (recover from EOS by going on to + the next step (issue the delayed seek)), but it does not work + if there is a queue2 upstream - see more details comment in + gst_ogg_pad_submit_page. + If I could find a way to bypass queue2 behavior, this should + be enabled. */ + GST_PUSH_LOCK (ogg); + if (ogg->push_state == PUSH_DURATION) { + GST_DEBUG_OBJECT (ogg, "Got EOS while determining length"); + res = gst_ogg_demux_seek_back_after_push_duration_check_unlock (ogg); + if (res != GST_FLOW_OK) { + GST_DEBUG_OBJECT (ogg, "Error seeking back after duration check: %d", + res); + } + break; + } + GST_PUSH_UNLOCK (ogg); +#endif res = gst_ogg_demux_send_event (ogg, event); if (ogg->current_chain == NULL) { GST_ELEMENT_ERROR (ogg, STREAM, DEMUX, (NULL), @@ -1436,6 +1997,12 @@ gst_ogg_demux_submit_buffer (GstOggDemux * ogg, GstBuffer * buffer) if (G_UNLIKELY (ogg_sync_wrote (&ogg->sync, size) < 0)) goto write_failed; + if (!ogg->pullmode) { + GST_PUSH_LOCK (ogg); + ogg->push_byte_offset += size; + GST_PUSH_UNLOCK (ogg); + } + done: gst_buffer_unref (buffer); @@ -1706,11 +2273,9 @@ gst_ogg_demux_deactivate_current_chain (GstOggDemux * ogg) pad->added = FALSE; } - /* if we cannot seek back to the chain, we can destroy the chain - * completely */ - if (!ogg->pullmode) { - gst_ogg_chain_free (chain); - } + /* With push mode seeking implemented, we can now seek back to the chain, + so we do not destroy it */ + GST_DEBUG_OBJECT (ogg, "Resetting current chain"); ogg->current_chain = NULL; return TRUE; @@ -1757,6 +2322,23 @@ gst_ogg_demux_set_header_on_caps (GstOggDemux * ogg, GstCaps * caps, return caps; } +static void +gst_ogg_demux_push_queued_buffers (GstOggDemux * ogg, GstOggPad * pad) +{ + GList *walk; + + /* push queued packets */ + for (walk = pad->map.queued; walk; walk = g_list_next (walk)) { + ogg_packet *p = walk->data; + + gst_ogg_demux_chain_peer (pad, p, TRUE); + _ogg_packet_free (p); + } + /* and free the queued buffers */ + g_list_free (pad->map.queued); + pad->map.queued = NULL; +} + static gboolean gst_ogg_demux_activate_chain (GstOggDemux * ogg, GstOggChain * chain, GstEvent * event) @@ -1769,6 +2351,11 @@ gst_ogg_demux_activate_chain (GstOggDemux * ogg, GstOggChain * chain, if (chain == ogg->current_chain) { if (event) gst_event_unref (event); + + for (i = 0; i < chain->streams->len; i++) { + GstOggPad *pad = g_array_index (chain->streams, GstOggPad *, i); + gst_ogg_demux_push_queued_buffers (ogg, pad); + } return TRUE; } @@ -1809,6 +2396,7 @@ gst_ogg_demux_activate_chain (GstOggDemux * ogg, GstOggChain * chain, /* after adding the new pads, remove the old pads */ gst_ogg_demux_deactivate_current_chain (ogg); + GST_DEBUG_OBJECT (ogg, "Setting current chain to %p", chain); ogg->current_chain = chain; /* we are finished now */ @@ -1853,16 +2441,7 @@ gst_ogg_demux_activate_chain (GstOggDemux * ogg, GstOggChain * chain, } GST_DEBUG_OBJECT (ogg, "pushing queued buffers"); - /* push queued packets */ - for (walk = pad->map.queued; walk; walk = g_list_next (walk)) { - ogg_packet *p = walk->data; - - gst_ogg_demux_chain_peer (pad, p, TRUE); - _ogg_packet_free (p); - } - /* and free the queued buffers */ - g_list_free (pad->map.queued); - pad->map.queued = NULL; + gst_ogg_demux_push_queued_buffers (ogg, pad); } return TRUE; } @@ -2486,6 +3065,85 @@ no_chain: } } +static gboolean +gst_ogg_demux_get_duration_push (GstOggDemux * ogg, int flags) +{ + /* In push mode, we get to the end of the stream to get the duration */ + gint64 position; + GstEvent *sevent; + gboolean res; + + /* A full Ogg page can be almost 64 KB. There's no guarantee that there'll be a + granpos there, but it's fairly likely */ + position = + ogg->push_byte_length - DURATION_CHUNK_OFFSET - EOS_AVOIDANCE_THRESHOLD; + if (position < 0) + position = 0; + + GST_DEBUG_OBJECT (ogg, + "Getting duration, seeking near the end, to %" G_GINT64_FORMAT, position); + ogg->push_state = PUSH_DURATION; + /* do not read the last byte */ + sevent = gst_event_new_seek (1.0, GST_FORMAT_BYTES, flags, GST_SEEK_TYPE_SET, + position, GST_SEEK_TYPE_SET, ogg->push_byte_length - 1); + res = gst_pad_push_event (ogg->sinkpad, sevent); + if (res) { + GST_DEBUG_OBJECT (ogg, "Seek succesful"); + return TRUE; + } else { + GST_INFO_OBJECT (ogg, "Seek failed, duration will stay unknown"); + ogg->push_state = PUSH_PLAYING; + return FALSE; + } +} + +static gboolean +gst_ogg_demux_check_duration_push (GstOggDemux * ogg, GstSeekFlags flags, + GstEvent * event) +{ + if (ogg->push_byte_length < 0) { + GstPad *peer; + + GST_DEBUG_OBJECT (ogg, "Trying to find byte/time length"); + if ((peer = gst_pad_get_peer (ogg->sinkpad)) != NULL) { + GstFormat format = GST_FORMAT_BYTES; + gint64 length; + int res; + + res = gst_pad_query_duration (peer, &format, &length); + if (res && length > 0) { + ogg->push_byte_length = length; + GST_DEBUG_OBJECT (ogg, + "File byte length %" G_GINT64_FORMAT, ogg->push_byte_length); + } + format = GST_FORMAT_TIME; + res = gst_pad_query_duration (peer, &format, &length); + gst_object_unref (peer); + if (res && length >= 0) { + ogg->push_time_length = length; + GST_DEBUG_OBJECT (ogg, "File time length %" GST_TIME_FORMAT, + GST_TIME_ARGS (ogg->push_time_length)); + } else if (!ogg->push_disable_seeking) { + gboolean res; + + res = gst_ogg_demux_get_duration_push (ogg, flags); + if (res) { + GST_DEBUG_OBJECT (ogg, + "File time length unknown, trying to determine"); + ogg->push_mode_seek_delayed_event = NULL; + if (event) { + GST_DEBUG_OBJECT (ogg, + "Let me intercept this innocent looking seek request"); + ogg->push_mode_seek_delayed_event = gst_event_copy (event); + } + return FALSE; + } + } + } + } + return TRUE; +} + static gboolean gst_ogg_demux_perform_seek_push (GstOggDemux * ogg, GstEvent * event) { @@ -2499,6 +3157,9 @@ gst_ogg_demux_perform_seek_push (GstOggDemux * ogg, GstEvent * event) GstEvent *sevent; GstOggChain *chain; gint64 best, best_time; + gint i; + + GST_DEBUG_OBJECT (ogg, "Push mode seek request received"); gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start, &stop_type, &stop); @@ -2508,39 +3169,129 @@ gst_ogg_demux_perform_seek_push (GstOggDemux * ogg, GstEvent * event) goto error; } + if (start_type != GST_SEEK_TYPE_SET) { + GST_DEBUG_OBJECT (ogg, "can only seek to a SET target"); + goto error; + } + + if (!(flags & GST_SEEK_FLAG_FLUSH)) { + GST_DEBUG_OBJECT (ogg, "can only do flushing seeks"); + goto error; + } + + GST_DEBUG_OBJECT (ogg, "Push mode seek request: %" GST_TIME_FORMAT, + GST_TIME_ARGS (start)); + chain = ogg->current_chain; - if (!chain) + if (!chain) { + GST_WARNING_OBJECT (ogg, "No chain to seek on"); + goto error; + } + + /* start accessing push_* members */ + GST_PUSH_LOCK (ogg); + + /* not if we disabled seeking (chained streams) */ + if (ogg->push_disable_seeking) { + GST_DEBUG_OBJECT (ogg, "Seeking disabled"); + goto error_locked; + } + + /* not when we're trying to work out duration */ + if (ogg->push_state == PUSH_DURATION) { + GST_DEBUG_OBJECT (ogg, "Busy working out duration, try again later"); + goto error_locked; + } + + /* actually, not if we're doing any seeking already */ + if (ogg->push_state != PUSH_PLAYING) { + GST_DEBUG_OBJECT (ogg, "Already doing some seeking, try again later"); + goto error_locked; + } + + /* on the first seek, get length if we can */ + if (!gst_ogg_demux_check_duration_push (ogg, flags, event)) { + GST_PUSH_UNLOCK (ogg); return FALSE; + } if (do_index_search (ogg, chain, 0, -1, 0, -1, start, &best, &best_time)) { /* the index gave some result */ GST_DEBUG_OBJECT (ogg, "found offset %" G_GINT64_FORMAT " with time %" G_GUINT64_FORMAT, best, best_time); - start = best; - } else if ((bitrate = ogg->bitrate) > 0) { - /* try with bitrate convert the seek positions to bytes */ - if (start_type != GST_SEEK_TYPE_NONE) { - start = gst_util_uint64_scale (start, bitrate, 8 * GST_SECOND); - } - if (stop_type != GST_SEEK_TYPE_NONE) { - stop = gst_util_uint64_scale (stop, bitrate, 8 * GST_SECOND); - } } else { - /* we don't know */ - res = FALSE; + if (ogg->push_time_length > 0) { + /* if we know the time length, we know the full segment bitrate */ + GST_DEBUG_OBJECT (ogg, "Using real file bitrate"); + bitrate = + gst_util_uint64_scale (ogg->push_byte_length, 8 * GST_SECOND, + ogg->push_time_length); + } else if (ogg->push_time_offset > 0) { + /* get a first approximation using known bitrate to the current position */ + GST_DEBUG_OBJECT (ogg, "Using file bitrate so far"); + bitrate = + gst_util_uint64_scale (ogg->push_byte_offset, 8 * GST_SECOND, + ogg->push_time_offset); + } else if (ogg->bitrate > 0) { + /* nominal bitrate is better than nothing, even if it lies often */ + GST_DEBUG_OBJECT (ogg, "Using nominal bitrate"); + bitrate = ogg->bitrate; + } else { + /* meh */ + GST_DEBUG_OBJECT (ogg, + "At stream start, and no nominal bitrate, using some random magic " + "number to seed"); + /* the bisection, once started, should give us a better approximation */ + bitrate = 1000; + } + best = gst_util_uint64_scale (start, bitrate, 8 * GST_SECOND); } - if (res) { - GST_DEBUG_OBJECT (ogg, - "seeking to %" G_GINT64_FORMAT " - %" G_GINT64_FORMAT, start, stop); - /* do seek */ - sevent = gst_event_new_seek (rate, GST_FORMAT_BYTES, flags, - start_type, start, stop_type, stop); - - res = gst_pad_push_event (ogg->sinkpad, sevent); + /* offset by typical page length, and ensure our best guess is within + reasonable bounds */ + best -= CHUNKSIZE; + if (best < 0) + best = 0; + if (ogg->push_byte_length > 0 && best >= ogg->push_byte_length) + best = ogg->push_byte_length - 1; + + /* set up bisection search */ + ogg->push_offset0 = 0; + ogg->push_offset1 = ogg->push_byte_length - 1; + ogg->push_time0 = ogg->push_start_time; + ogg->push_time1 = ogg->push_time_length; + ogg->push_seek_time_target = start; + ogg->push_seek_time_original_target = start; + ogg->push_state = PUSH_BISECT1; + + /* reset pad push mode seeking state */ + for (i = 0; i < chain->streams->len; i++) { + GstOggPad *pad = g_array_index (chain->streams, GstOggPad *, i); + pad->push_kf_time = GST_CLOCK_TIME_NONE; + pad->push_sync_time = GST_CLOCK_TIME_NONE; } + GST_DEBUG_OBJECT (ogg, + "Setting up bisection search for %" G_GINT64_FORMAT " - %" G_GINT64_FORMAT + " (time %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT ")", ogg->push_offset0, + ogg->push_offset1, GST_TIME_ARGS (ogg->push_time0), + GST_TIME_ARGS (ogg->push_time1)); + GST_DEBUG_OBJECT (ogg, + "Target time is %" GST_TIME_FORMAT ", best first guess is %" + G_GINT64_FORMAT, GST_TIME_ARGS (ogg->push_seek_time_target), best); + + ogg->push_seek_rate = rate; + ogg->push_seek_flags = flags; + ogg->push_mode_seek_delayed_event = NULL; + ogg->push_bisection_steps[0] = 1; + ogg->push_bisection_steps[1] = 0; + sevent = gst_event_new_seek (rate, GST_FORMAT_BYTES, flags, + start_type, best, GST_SEEK_TYPE_NONE, -1); + + GST_PUSH_UNLOCK (ogg); + res = gst_pad_push_event (ogg->sinkpad, sevent); + return res; /* ERRORS */ @@ -2549,6 +3300,10 @@ error: GST_DEBUG_OBJECT (ogg, "seek failed"); return FALSE; } + +error_locked: + GST_PUSH_UNLOCK (ogg); + goto error; } static gboolean @@ -3142,11 +3897,31 @@ gst_ogg_demux_handle_page (GstOggDemux * ogg, ogg_page * page) if (pad) { result = gst_ogg_pad_submit_page (pad, page); } else { - /* no pad. This means an ogg page without bos has been seen for this - * serialno. we just ignore it but post a warning... */ - GST_ELEMENT_WARNING (ogg, STREAM, DECODE, - (NULL), ("unknown ogg pad for serial %08x detected", serialno)); - return GST_FLOW_OK; + GST_PUSH_LOCK (ogg); + if (!ogg->pullmode && !ogg->push_disable_seeking) { + /* no pad while probing for duration, we must have a chained stream, + and we don't support them, so back off */ + GST_INFO_OBJECT (ogg, "We seem to have a chained stream, we won't seek"); + if (ogg->push_state == PUSH_DURATION) { + GstFlowReturn res; + + res = gst_ogg_demux_seek_back_after_push_duration_check_unlock (ogg); + if (res != GST_FLOW_OK) + return res; + } + + /* only once we seeked back */ + GST_PUSH_LOCK (ogg); + ogg->push_disable_seeking = TRUE; + } else { + GST_PUSH_UNLOCK (ogg); + /* no pad. This means an ogg page without bos has been seen for this + * serialno. we just ignore it but post a warning... */ + GST_ELEMENT_WARNING (ogg, STREAM, DECODE, + (NULL), ("unknown ogg pad for serial %08x detected", serialno)); + return GST_FLOW_OK; + } + GST_PUSH_UNLOCK (ogg); } return result; @@ -3171,8 +3946,11 @@ gst_ogg_demux_chain (GstPad * pad, GstBuffer * buffer) ogg = GST_OGG_DEMUX (GST_OBJECT_PARENT (pad)); - GST_DEBUG_OBJECT (ogg, "chain"); + GST_DEBUG_OBJECT (ogg, "enter"); result = gst_ogg_demux_submit_buffer (ogg, buffer); + if (result < 0) { + GST_DEBUG_OBJECT (ogg, "gst_ogg_demux_submit_buffer returned %d", result); + } while (result == GST_FLOW_OK) { ogg_page page; @@ -3186,11 +3964,15 @@ gst_ogg_demux_chain (GstPad * pad, GstBuffer * buffer) GST_DEBUG_OBJECT (ogg, "discont in page found, continuing"); } else { result = gst_ogg_demux_handle_page (ogg, &page); + if (result < 0) { + GST_DEBUG_OBJECT (ogg, "gst_ogg_demux_handle_page returned %d", result); + } } } if (ret == 0 || result == GST_FLOW_OK) { gst_ogg_demux_sync_streams (ogg); } + GST_DEBUG_OBJECT (ogg, "leave with %d", result); return result; } @@ -3608,6 +4390,14 @@ gst_ogg_demux_change_state (GstElement * element, GstStateChange transition) ogg->bitrate = 0; ogg->segment_running = FALSE; ogg->total_time = -1; + GST_PUSH_LOCK (ogg); + ogg->push_byte_offset = 0; + ogg->push_byte_length = -1; + ogg->push_time_length = GST_CLOCK_TIME_NONE; + ogg->push_time_offset = GST_CLOCK_TIME_NONE; + ogg->push_disable_seeking = FALSE; + ogg->push_state = PUSH_PLAYING; + GST_PUSH_UNLOCK (ogg); gst_segment_init (&ogg->segment, GST_FORMAT_TIME); break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: diff --git a/ext/ogg/gstoggdemux.h b/ext/ogg/gstoggdemux.h index 031627baf..712dc8f05 100644 --- a/ext/ogg/gstoggdemux.h +++ b/ext/ogg/gstoggdemux.h @@ -113,6 +113,10 @@ struct _GstOggPad gboolean is_eos; gboolean added; + + /* push mode seeking */ + GstClockTime push_kf_time; + GstClockTime push_sync_time; }; struct _GstOggPadClass @@ -162,6 +166,29 @@ struct _GstOggDemux gint64 basetime; gint64 prestime; + /* push mode seeking support */ + GMutex *push_lock; /* we need the lock to protect the push mode variables */ + gint64 push_byte_offset; /* where were are at in the stream, in bytes */ + gint64 push_byte_length; /* length in bytes of the stream, -1 if unknown */ + GstClockTime push_time_length; /* length in time of the stream */ + GstClockTime push_start_time; /* start time of the stream */ + GstClockTime push_time_offset; /* where were are at in the stream, in time */ + enum { PUSH_PLAYING, PUSH_DURATION, PUSH_BISECT1, PUSH_LINEAR1, PUSH_BISECT2, PUSH_LINEAR2 } push_state; + + GstClockTime push_seek_time_original_target; + GstClockTime push_seek_time_target; + gint64 push_last_seek_offset; + GstClockTime push_last_seek_time; + gint64 push_offset0, push_offset1; /* bisection search offset bounds */ + GstClockTime push_time0, push_time1; /* bisection search time bounds */ + + double push_seek_rate; + GstSeekFlags push_seek_flags; + GstEvent *push_mode_seek_delayed_event; + gboolean push_disable_seeking; + + gint push_bisection_steps[2]; + /* ogg stuff */ ogg_sync_state sync; }; -- 2.34.1