From 72ba426c00648f89d10b80b1ad0716eee8c41030 Mon Sep 17 00:00:00 2001 From: Michael Smith Date: Wed, 5 Oct 2005 11:25:51 +0000 Subject: [PATCH] Improve seek error-resilience. Original commit message from CVS: Improve seek error-resilience. Better error handling generally. --- ChangeLog | 12 +++ gst/realmedia/rmdemux.c | 250 ++++++++++++++++++++++++++++++++++++------------ 2 files changed, 203 insertions(+), 59 deletions(-) diff --git a/ChangeLog b/ChangeLog index 8e97c3b..76be513 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,15 @@ +2005-10-05 Michael Smith + + * gst/realmedia/rmdemux.c: (gst_rmdemux_sink_event), + (gst_rmdemux_src_event), (gst_rmdemux_validate_offset), + (find_seek_offset_bytes), (find_seek_offset_time), + (gst_rmdemux_perform_seek), (gst_rmdemux_src_query), + (gst_rmdemux_loop), (gst_rmdemux_fourcc_isplausible), + (gst_rmdemux_chain), (gst_rmdemux_send_event), + (gst_rmdemux_add_stream), (gst_rmdemux_parse_packet): + Improve seeking error-resilience. + General improvements in error handling. + 2005-10-03 Thomas Vander Stichele * configure.ac: diff --git a/gst/realmedia/rmdemux.c b/gst/realmedia/rmdemux.c index 6b89346..fda2e6e 100644 --- a/gst/realmedia/rmdemux.c +++ b/gst/realmedia/rmdemux.c @@ -3,6 +3,7 @@ * Copyright (C) <2003> David A. Schleef * Copyright (C) <2004> Stephane Loeuillet * Copyright (C) <2005> Owen Fraser-Green + * Copyright (C) <2005> Michael Smith * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -144,8 +145,8 @@ static void gst_rmdemux_parse_data (GstRMDemux * rmdemux, const void *data, int length); static void gst_rmdemux_parse_cont (GstRMDemux * rmdemux, const void *data, int length); -static void gst_rmdemux_parse_packet (GstRMDemux * rmdemux, const void *data, - guint16 version, guint16 length); +static GstFlowReturn gst_rmdemux_parse_packet (GstRMDemux * rmdemux, + const void *data, guint16 version, guint16 length); static void gst_rmdemux_parse_indx_data (GstRMDemux * rmdemux, const void *data, int length); @@ -372,9 +373,134 @@ done_unref: return ret; } +/* Validate that this looks like a reasonable point to seek to */ +static gboolean +gst_rmdemux_validate_offset (GstRMDemux * rmdemux) +{ + GstBuffer *buffer; + GstFlowReturn flowret; + guint16 version, length; + gboolean ret = TRUE; + + flowret = gst_pad_pull_range (rmdemux->sinkpad, rmdemux->offset, 4, &buffer); + + if (flowret != GST_FLOW_OK) { + GST_DEBUG_OBJECT (rmdemux, "Failed to pull data at offset %d", + rmdemux->offset); + return FALSE; + } + + // TODO: Can we also be seeking to a 'DATA' chunk header? Check this. + // Also, for the case we currently handle, can we check any more? It's pretty + // sucky to not be validating a little more heavily than this... + /* This should now be the start of a data packet header. That begins with + * a 2-byte 'version' field, which has to be 0 or 1, then a length. I'm not + * certain what values are valid for length, but it must always be at least + * 4 bytes, and we can check that it won't take us past our known total size + */ + + version = RMDEMUX_GUINT16_GET (GST_BUFFER_DATA (buffer)); + if (version != 0 && version != 1) { + GST_DEBUG_OBJECT (rmdemux, "Expected version 0 or 1, got %d", + (int) version); + ret = FALSE; + } + + length = RMDEMUX_GUINT16_GET (GST_BUFFER_DATA (buffer) + 2); + // TODO: Also check against total stream length + if (length < 4) { + GST_DEBUG_OBJECT (rmdemux, "Expected length >= 4, got %d", (int) length); + ret = FALSE; + } + + if (ret) { + rmdemux->offset += 4; + gst_adapter_clear (rmdemux->adapter); + gst_adapter_push (rmdemux->adapter, buffer); + } else { + GST_WARNING_OBJECT (rmdemux, "Failed to validate seek offset at %d", + rmdemux->offset); + } + + return ret; +} + +static gboolean +find_seek_offset_bytes (GstRMDemux * rmdemux, guint target) +{ + int i, n; + gboolean ret = FALSE; + + for (n = 0; n < rmdemux->n_streams; n++) { + GstRMDemuxStream *stream; + + stream = rmdemux->streams[n]; + + /* Search backwards through this stream's index until we find the first + * timestamp before our target time */ + for (i = stream->index_length - 1; i >= 0; i--) { + if (stream->index[i].offset <= target) { + /* Set the seek_offset for the stream so we don't bother parsing it + * until we've passed that point */ + stream->seek_offset = stream->index[i].offset; + rmdemux->offset = stream->index[i].offset; + ret = TRUE; + break; + } + } + } + return ret; +} + +static gboolean +find_seek_offset_time (GstRMDemux * rmdemux, GstClockTime time) +{ + int i, n; + gboolean ret = FALSE; + GstClockTime earliest = GST_CLOCK_TIME_NONE; + + for (n = 0; n < rmdemux->n_streams; n++) { + GstRMDemuxStream *stream; + + stream = rmdemux->streams[n]; + + /* Search backwards through this stream's index until we find the first + * timestamp before our target time */ + for (i = stream->index_length - 1; i >= 0; i--) { + if (stream->index[i].timestamp <= time) { + /* Set the seek_offset for the stream so we don't bother parsing it + * until we've passed that point */ + stream->seek_offset = stream->index[i].offset; + + /* If it's also the earliest timestamp we've seen of all streams, then + * that's our target! + */ + if (earliest == GST_CLOCK_TIME_NONE || + stream->index[i].timestamp < earliest) { + earliest = stream->index[i].timestamp; + rmdemux->offset = stream->index[i].offset; + GST_DEBUG_OBJECT (rmdemux, + "We're looking for %" GST_TIME_FORMAT + " and we found that stream %d has the latest index at %" + GST_TIME_FORMAT, GST_TIME_ARGS (rmdemux->segment_start), n, + GST_TIME_ARGS (earliest)); + } + + ret = TRUE; + + break; + } + } + } + return ret; +} + static gboolean gst_rmdemux_perform_seek (GstRMDemux * rmdemux, gboolean flush) { + gboolean validated; + gboolean ret = TRUE; + /* nothing configured, play complete file */ if (rmdemux->segment_start == GST_CLOCK_TIME_NONE) rmdemux->segment_start = 0; @@ -402,56 +528,56 @@ gst_rmdemux_perform_seek (GstRMDemux * rmdemux, gboolean flush) gst_pad_pause_task (rmdemux->sinkpad); } + GST_LOG_OBJECT (rmdemux, "Done starting flushes"); + /* now grab the stream lock so that streaming cannot continue, for * non flushing seeks when the element is in PAUSED this could block * forever. */ GST_STREAM_LOCK (rmdemux->sinkpad); + GST_LOG_OBJECT (rmdemux, "Took streamlock"); + /* we need to stop flushing on the sinkpad as we're going to use it * next. We can do this as we have the STREAM lock now. */ gst_pad_push_event (rmdemux->sinkpad, gst_event_new_flush_stop ()); - /* Find the new offset */ - /* Get the video stream */ - { - int i, n; + GST_LOG_OBJECT (rmdemux, "Pushed FLUSH_STOP event"); - rmdemux->offset = 0; - GstClockTime tmp_time = rmdemux->segment_stop; - - /* Find the last offset which occurs before the seek time */ - for (n = 0; n < rmdemux->n_streams; n++) { - GstRMDemuxStream *stream; - - stream = rmdemux->streams[n]; - - for (i = stream->index_length - 1; i >= 0; i--) { - if (stream->index[i].timestamp < rmdemux->segment_start) { - /* Set the seek_offset for the stream so we don't bother parsing it - * until we've passed that point */ - stream->seek_offset = stream->index[i].offset; - if (tmp_time > stream->index[i].timestamp) { - tmp_time = stream->index[i].timestamp; - rmdemux->offset = stream->index[i].offset; - GST_DEBUG_OBJECT (rmdemux, - "We're looking for %" GST_TIME_FORMAT - " and we found that stream %d has the latest index at %" - GST_TIME_FORMAT, GST_TIME_ARGS (rmdemux->segment_start), n, - GST_TIME_ARGS (tmp_time)); - } - break; - } - } + /* For each stream, find the first index offset equal to or before our seek + * target. Of these, find the smallest offset. That's where we seek to. + * + * Then we pull 4 bytes from that offset, validate that we've seeked to a + * DATA chunk (with the DATA fourcc). + * If that fails, restart, with the seek target set to one less than the + * offset we just tried. If we run out of places to try, treat that as a fatal + * error. + */ + if (!find_seek_offset_time (rmdemux, rmdemux->segment_start)) { + GST_LOG_OBJECT (rmdemux, "Failed to find seek offset by time"); + ret = FALSE; + goto done; + } + + GST_LOG_OBJECT (rmdemux, "Validating offset %u", rmdemux->offset); + validated = gst_rmdemux_validate_offset (rmdemux); + while (!validated) { + GST_INFO_OBJECT (rmdemux, "Failed to validate offset at %u", + rmdemux->offset); + if (!find_seek_offset_bytes (rmdemux, rmdemux->offset - 1)) { + ret = FALSE; + goto done; } + validated = gst_rmdemux_validate_offset (rmdemux); } + GST_LOG_OBJECT (rmdemux, "Found final offset. Excellent!"); + /* now we have a new position, prepare for streaming again */ { GstEvent *event; /* Reset the demuxer state */ rmdemux->state = RMDEMUX_STATE_DATA_PACKET; - gst_adapter_clear (rmdemux->adapter); rmdemux->cur_timestamp = GST_CLOCK_TIME_NONE; if (flush) @@ -479,10 +605,12 @@ gst_rmdemux_perform_seek (GstRMDemux * rmdemux, gboolean flush) rmdemux->sinkpad); } +done: + /* streaming can continue now */ GST_STREAM_UNLOCK (rmdemux->sinkpad); - return TRUE; + return ret; } @@ -665,6 +793,7 @@ gst_rmdemux_loop (GstPad * pad) size = rmdemux->avg_packet_size; break; case RMDEMUX_STATE_EOS: + GST_LOG_OBJECT (rmdemux, "At EOS, pausing task"); goto need_pause; default: GST_LOG_OBJECT (rmdemux, "Default: requires %d bytes (state is %d)", @@ -785,14 +914,14 @@ gst_rmdemux_chain (GstPad * pad, GstBuffer * buffer) rmdemux->size = RMDEMUX_GUINT32_GET (data + 4) - HEADER_SIZE; rmdemux->object_version = RMDEMUX_GUINT16_GET (data + 8); - /* Sanity-check, if this is after a seek we might have bad data. - * We assume that the FOURCC is printable ASCII */ + /* Sanity-check. We assume that the FOURCC is printable ASCII */ if (!gst_rmdemux_fourcc_isplausible (rmdemux->object_id)) { - /* Failed. Remain in HEADER state, try again... We flush only the - * actual FOURCC, not the entire header, because we could need to - * resync anywhere at all... really, this should never happen. */ - GST_WARNING_OBJECT (rmdemux, - "Bogus looking header, unprintable FOURCC"); + /* Failed. Remain in HEADER state, try again... We flush only + * the actual FOURCC, not the entire header, because we could + * need to resync anywhere at all... really, this should never + * happen. */ + GST_WARNING_OBJECT (rmdemux, "Bogus looking header, unprintable " + "FOURCC"); gst_adapter_flush (rmdemux->adapter, 4); break; @@ -971,14 +1100,15 @@ gst_rmdemux_chain (GstPad * pad, GstBuffer * buffer) data = gst_adapter_peek (rmdemux->adapter, 4); length = RMDEMUX_GUINT16_GET (data + 2); - if (length == 0) { - gst_adapter_flush (rmdemux->adapter, 2); + if (length < 4) { + /* Invalid, just drop it */ + gst_adapter_flush (rmdemux->adapter, 4); } else { if (gst_adapter_available (rmdemux->adapter) < length) goto unlock; data = gst_adapter_peek (rmdemux->adapter, length); - gst_rmdemux_parse_packet (rmdemux, data + 4, version, length); + gst_rmdemux_parse_packet (rmdemux, data + 4, version, length - 4); rmdemux->chunk_index++; gst_adapter_flush (rmdemux->adapter, length); @@ -990,9 +1120,11 @@ gst_rmdemux_chain (GstPad * pad, GstBuffer * buffer) /* Stream done */ gst_adapter_flush (rmdemux->adapter, 2); - if (rmdemux->data_offset == 0) + if (rmdemux->data_offset == 0) { + GST_LOG_OBJECT (rmdemux, + "No further data, internal demux state EOS"); rmdemux->state = RMDEMUX_STATE_EOS; - else + } else rmdemux->state = RMDEMUX_STATE_HEADER; } break; @@ -1105,9 +1237,8 @@ gst_rmdemux_add_stream (GstRMDemux * rmdemux, GstRMDemuxStream * stream) stream->pad = gst_pad_new_from_template (gst_static_pad_template_get (&gst_rmdemux_audiosrc_template), name); + GST_LOG_OBJECT (rmdemux, "Created audio pad \"%s\"", name); g_free (name); - GST_LOG_OBJECT (rmdemux, "Created audio pad \"%s\"", - gst_pad_get_name (stream->pad)); switch (stream->fourcc) { /* Older RealAudio Codecs */ case GST_RM_AUD_14_4: @@ -1543,7 +1674,7 @@ gst_rmdemux_parse_cont (GstRMDemux * rmdemux, const void *data, int length) g_free (title); } -static void +static GstFlowReturn gst_rmdemux_parse_packet (GstRMDemux * rmdemux, const void *data, guint16 version, guint16 length) { @@ -1559,12 +1690,14 @@ gst_rmdemux_parse_packet (GstRMDemux * rmdemux, const void *data, "Parsing a packet for stream=%d, timestamp=" GST_TIME_FORMAT ", version=%d", id, GST_TIME_ARGS (rmdemux->cur_timestamp), version); + // TODO: This is skipping over either 2 or 3 bytes (version dependent) + // without even reading it. What are these for? if (version == 0) { data += 8; - packet_size = length - 12; + packet_size = length - 8; } else { data += 9; - packet_size = length - 13; + packet_size = length - 9; } stream = gst_rmdemux_get_stream_by_id (rmdemux, id); @@ -1572,28 +1705,27 @@ gst_rmdemux_parse_packet (GstRMDemux * rmdemux, const void *data, if (!stream) { GST_WARNING_OBJECT (rmdemux, "No stream for stream id %d in parsing " "data packet", id); - return; + return GST_FLOW_OK; } - if ((rmdemux->offset + packet_size) > stream->seek_offset) { + if ((rmdemux->offset + packet_size) > stream->seek_offset && + stream && stream->pad && GST_PAD_IS_USABLE (stream->pad)) { if (gst_pad_alloc_buffer (stream->pad, GST_BUFFER_OFFSET_NONE, packet_size, stream->caps, &buffer) != GST_FLOW_OK) { GST_WARNING_OBJECT (rmdemux, "failed to alloc src buffer for stream %d", id); - return; + return GST_FLOW_ERROR; } memcpy (GST_BUFFER_DATA (buffer), (guint8 *) data, packet_size); GST_BUFFER_TIMESTAMP (buffer) = rmdemux->cur_timestamp; - if (stream && stream->pad && GST_PAD_IS_USABLE (stream->pad)) { - GST_DEBUG_OBJECT (rmdemux, "Pushing buffer of size %d to pad", - packet_size); - gst_pad_push (stream->pad, buffer); - } + GST_DEBUG_OBJECT (rmdemux, "Pushing buffer of size %d to pad", packet_size); + return gst_pad_push (stream->pad, buffer); } else { GST_DEBUG_OBJECT (rmdemux, "Stream %d is skipping: seek_offset=%d, offset=%d, packet_size", stream->id, stream->seek_offset, rmdemux->offset, packet_size); + return GST_FLOW_OK; } } -- 2.7.4