From cc8d04dad524b6f476073e0f387abdf77b891a7e Mon Sep 17 00:00:00 2001 From: Mark Nauwelaerts Date: Wed, 29 Sep 2010 16:12:42 +0200 Subject: [PATCH] baseparse: support reverse playback ... in pull mode or upstream driven. --- gst/audioparsers/gstbaseparse.c | 351 ++++++++++++++++++++++++++++++++++------ 1 file changed, 304 insertions(+), 47 deletions(-) diff --git a/gst/audioparsers/gstbaseparse.c b/gst/audioparsers/gstbaseparse.c index 90ea9c5..bec23ff 100644 --- a/gst/audioparsers/gstbaseparse.c +++ b/gst/audioparsers/gstbaseparse.c @@ -269,6 +269,12 @@ struct _GstBaseParsePrivate /* seek events are temporarily kept to match them with newsegments */ GSList *pending_seeks; + /* reverse playback */ + GSList *buffers_pending; + GSList *buffers_queued; + GstClockTime last_ts; + gint64 last_offset; + /* property */ /* number of initial frames to discard */ gint skip; @@ -377,6 +383,24 @@ static void gst_base_parse_drain (GstBaseParse * parse); static void gst_base_parse_post_bitrates (GstBaseParse * parse, gboolean post_min, gboolean post_avg, gboolean post_max); +static gint64 gst_base_parse_find_offset (GstBaseParse * parse, + GstClockTime time, gboolean before, GstClockTime * _ts); + +static GstFlowReturn gst_base_parse_process_fragment (GstBaseParse * parse, + gboolean push_only); + +static void +gst_base_parse_clear_queues (GstBaseParse * parse) +{ + g_slist_foreach (parse->priv->buffers_queued, (GFunc) gst_buffer_unref, NULL); + g_slist_free (parse->priv->buffers_queued); + parse->priv->buffers_queued = NULL; + g_slist_foreach (parse->priv->buffers_pending, (GFunc) gst_buffer_unref, + NULL); + g_slist_free (parse->priv->buffers_pending); + parse->priv->buffers_pending = NULL; +} + static void gst_base_parse_finalize (GObject * object) { @@ -410,6 +434,8 @@ gst_base_parse_finalize (GObject * object) parse->priv->index = NULL; } + gst_base_parse_clear_queues (parse); + G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -540,6 +566,9 @@ gst_base_parse_reset (GstBaseParse * parse) parse->priv->idx_interval = 0; parse->priv->exact_position = TRUE; + parse->priv->last_ts = GST_CLOCK_TIME_NONE; + parse->priv->last_offset = 0; + if (parse->pending_segment) gst_event_unref (parse->pending_segment); @@ -829,11 +858,15 @@ gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event) /* but finish the current segment */ GST_DEBUG_OBJECT (parse, "draining current segment"); - gst_base_parse_drain (parse); + if (parse->segment.rate > 0.0) + gst_base_parse_drain (parse); + else + gst_base_parse_process_fragment (parse, FALSE); gst_adapter_clear (parse->adapter); parse->priv->offset = offset; parse->priv->sync_offset = offset; parse->priv->next_ts = next_ts; + parse->priv->last_ts = GST_CLOCK_TIME_NONE; parse->priv->discont = TRUE; break; } @@ -849,12 +882,17 @@ gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event) case GST_EVENT_FLUSH_STOP: gst_adapter_clear (parse->adapter); + gst_base_parse_clear_queues (parse); parse->priv->flushing = FALSE; parse->priv->discont = TRUE; + parse->priv->last_ts = GST_CLOCK_TIME_NONE; break; case GST_EVENT_EOS: - gst_base_parse_drain (parse); + if (parse->segment.rate > 0.0) + gst_base_parse_drain (parse); + else + gst_base_parse_process_fragment (parse, FALSE); /* If we STILL have zero frames processed, fire an error */ if (parse->priv->framecount == 0) { @@ -1402,7 +1440,7 @@ gst_base_parse_push_buffer (GstBaseParse * parse, GstBuffer * buffer) gst_buffer_set_caps (buffer, GST_PAD_CAPS (parse->srcpad)); /* segment adjustment magic; only if we are running the whole show */ - if (!parse->priv->passthrough && + if (!parse->priv->passthrough && parse->segment.rate > 0.0 && (parse->priv->pad_mode == GST_ACTIVATE_PULL || parse->priv->upstream_seekable)) { /* segment times are typically estimates, @@ -1537,14 +1575,22 @@ gst_base_parse_push_buffer (GstBaseParse * parse, GstBuffer * buffer) gst_buffer_unref (buffer); ret = GST_FLOW_OK; } else if (ret == GST_FLOW_OK) { - if (G_LIKELY (!parse->priv->skip)) { - ret = gst_pad_push (parse->srcpad, buffer); - GST_LOG_OBJECT (parse, "frame (%d bytes) pushed: %s", - GST_BUFFER_SIZE (buffer), gst_flow_get_name (ret)); + if (parse->segment.rate > 0.0) { + if (G_LIKELY (!parse->priv->skip)) { + ret = gst_pad_push (parse->srcpad, buffer); + GST_LOG_OBJECT (parse, "frame (%d bytes) pushed: %s", + GST_BUFFER_SIZE (buffer), gst_flow_get_name (ret)); + } else { + GST_DEBUG_OBJECT (parse, "initial frame (%d bytes) discarded", + GST_BUFFER_SIZE (buffer)); + parse->priv->skip--; + } } else { - GST_DEBUG_OBJECT (parse, "initial frame (%d bytes) discarded", + GST_LOG_OBJECT (parse, "frame (%d bytes) queued for now: %d", GST_BUFFER_SIZE (buffer)); - parse->priv->skip--; + parse->priv->buffers_queued = + g_slist_prepend (parse->priv->buffers_queued, buffer); + ret = GST_FLOW_OK; } } else { gst_buffer_unref (buffer); @@ -1602,6 +1648,118 @@ gst_base_parse_drain (GstBaseParse * parse) parse->priv->drain = FALSE; } +/** + * gst_base_parse_process_fragment: + * @parse: #GstBaseParse. + * + * Processes a reverse playback (forward) fragment: + * - append head of last fragment that was skipped to current fragment data + * - drain the resulting current fragment data (i.e. repeated chain) + * - add time/duration (if needed) to frames queued by chain + * - push queued data + */ +static GstFlowReturn +gst_base_parse_process_fragment (GstBaseParse * parse, gboolean push_only) +{ + GstBuffer *buf; + GstFlowReturn ret = GST_FLOW_OK; + GSList *send = NULL; + + if (push_only) + goto push; + + /* restore order */ + parse->priv->buffers_pending = g_slist_reverse (parse->priv->buffers_pending); + while (parse->priv->buffers_pending) { + buf = GST_BUFFER_CAST (parse->priv->buffers_pending->data); + GST_LOG_OBJECT (parse, "adding pending buffer (size %d)", + GST_BUFFER_SIZE (buf)); + gst_adapter_push (parse->adapter, buf); + parse->priv->buffers_pending = + g_slist_delete_link (parse->priv->buffers_pending, + parse->priv->buffers_pending); + } + + /* invalidate so no fall-back timestamping is performed; + * ok if taken from subclass or upstream */ + parse->priv->next_ts = GST_CLOCK_TIME_NONE; + /* prevent it hanging around stop all the time */ + parse->segment.last_stop = GST_CLOCK_TIME_NONE; + /* mark next run */ + parse->priv->discont = TRUE; + + /* chain looks for frames and queues resulting ones (in stead of pushing) */ + /* initial skipped data is added to buffers_pending */ + gst_base_parse_drain (parse); + +push: + /* add metadata (if needed to queued buffers */ + GST_LOG_OBJECT (parse, "last timestamp: %" GST_TIME_FORMAT, + GST_TIME_ARGS (parse->priv->last_ts)); + while (parse->priv->buffers_queued) { + buf = GST_BUFFER_CAST (parse->priv->buffers_queued->data); + + /* no touching if upstream or parsing provided time */ + if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) { + GST_LOG_OBJECT (parse, "buffer has time %" GST_TIME_FORMAT, + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf))); + } else if (GST_CLOCK_TIME_IS_VALID (parse->priv->last_ts) && + GST_BUFFER_DURATION_IS_VALID (buf)) { + if (G_LIKELY (GST_BUFFER_DURATION (buf) <= parse->priv->last_ts)) + parse->priv->last_ts -= GST_BUFFER_DURATION (buf); + else + parse->priv->last_ts = 0; + GST_BUFFER_TIMESTAMP (buf) = parse->priv->last_ts; + GST_LOG_OBJECT (parse, "applied time %" GST_TIME_FORMAT, + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf))); + } else { + /* no idea, very bad */ + GST_WARNING_OBJECT (parse, "could not determine time for buffer"); + } + + /* reverse order for ascending sending */ + send = g_slist_prepend (send, buf); + parse->priv->buffers_queued = + g_slist_delete_link (parse->priv->buffers_queued, + parse->priv->buffers_queued); + } + + /* send buffers */ + while (send) { + buf = GST_BUFFER_CAST (send->data); + GST_LOG_OBJECT (parse, "pushing buffer %p, timestamp %" + GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT + ", offset %" G_GINT64_FORMAT, buf, + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)), + GST_TIME_ARGS (GST_BUFFER_DURATION (buf)), GST_BUFFER_OFFSET (buf)); + + if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (parse->priv->last_ts))) + parse->priv->last_ts = GST_BUFFER_TIMESTAMP (buf); + + /* iterate output queue an push downstream */ + ret = gst_pad_push (parse->srcpad, buf); + send = g_slist_delete_link (send, send); + + /* clear any leftover if error */ + if (G_UNLIKELY (ret != GST_FLOW_OK)) { + while (send) { + buf = GST_BUFFER_CAST (send->data); + gst_buffer_unref (buf); + send = g_slist_delete_link (send, send); + } + } + } + + /* any trailing unused no longer usable (ideally none) */ + if (G_UNLIKELY (gst_adapter_available (parse->adapter))) { + GST_DEBUG_OBJECT (parse, "discarding %d trailing bytes", + gst_adapter_available (parse->adapter)); + gst_adapter_clear (parse->adapter); + } + + return ret; +} + /* small helper that checks whether we have been trying to resync too long */ static inline GstFlowReturn gst_base_parse_check_sync (GstBaseParse * parse) @@ -1647,8 +1805,18 @@ gst_base_parse_chain (GstPad * pad, GstBuffer * buffer) if (G_UNLIKELY (parse->priv->passthrough)) { buffer = gst_buffer_make_metadata_writable (buffer); return gst_base_parse_push_buffer (parse, buffer); - } else + } + /* upstream feeding us in reverse playback; + * gather each fragment, then process it in single run */ + if (parse->segment.rate < 0.0) { + if (G_UNLIKELY (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT))) { + GST_DEBUG_OBJECT (parse, "buffer starts new reverse playback fragment"); + ret = gst_base_parse_process_fragment (parse, FALSE); + } gst_adapter_push (parse->adapter, buffer); + return ret; + } + gst_adapter_push (parse->adapter, buffer); } /* Parse and push as many frames as possible */ @@ -1701,21 +1869,30 @@ gst_base_parse_chain (GstPad * pad, GstBuffer * buffer) } break; } + if (skip == -1) { + /* subclass didn't touch this value. By default we skip 1 byte */ + skip = 1; + } if (skip > 0) { GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", skip); - gst_adapter_flush (parse->adapter, skip); + if (parse->segment.rate < 0.0 && !parse->priv->buffers_queued) { + /* reverse playback, and no frames found yet, so we are skipping + * the leading part of a fragment, which may form the tail of + * fragment coming later, hopefully subclass skips efficiently ... */ + timestamp = gst_adapter_prev_timestamp (parse->adapter, NULL); + outbuf = gst_adapter_take_buffer (parse->adapter, skip); + outbuf = gst_buffer_make_metadata_writable (outbuf); + GST_BUFFER_TIMESTAMP (outbuf) = timestamp; + parse->priv->buffers_pending = + g_slist_prepend (parse->priv->buffers_pending, outbuf); + outbuf = NULL; + } else { + gst_adapter_flush (parse->adapter, skip); + } parse->priv->offset += skip; if (!parse->priv->discont) parse->priv->sync_offset = parse->priv->offset; parse->priv->discont = TRUE; - } else if (skip == -1) { - /* subclass didn't touch this value. By default we skip 1 byte */ - GST_LOG_OBJECT (parse, "finding sync, skipping 1 byte"); - gst_adapter_flush (parse->adapter, 1); - parse->priv->offset++; - if (!parse->priv->discont) - parse->priv->sync_offset = parse->priv->offset; - parse->priv->discont = TRUE; } /* There is a possibility that subclass set the skip value to zero. This means that it has probably found a frame but wants to ask @@ -1849,6 +2026,62 @@ gst_base_parse_pull_range (GstBaseParse * parse, guint size, return GST_FLOW_OK; } +static GstFlowReturn +gst_base_parse_handle_previous_fragment (GstBaseParse * parse) +{ + gint64 offset = 0; + GstClockTime ts = 0; + GstBuffer *buffer; + GstFlowReturn ret; + + GST_DEBUG_OBJECT (parse, "fragment ended; last_ts = %" GST_TIME_FORMAT + ", last_offset = %" G_GINT64_FORMAT, GST_TIME_ARGS (parse->priv->last_ts), + parse->priv->last_offset); + + if (!parse->priv->last_offset || parse->priv->last_ts <= parse->segment.start) { + GST_DEBUG_OBJECT (parse, "past start of segment %" GST_TIME_FORMAT, + GST_TIME_ARGS (parse->segment.start)); + ret = GST_FLOW_UNEXPECTED; + goto exit; + } + + /* last fragment started at last_offset / last_ts; + * seek back 10s capped at 1MB */ + if (parse->priv->last_ts >= 10 * GST_SECOND) + ts = parse->priv->last_ts - 10 * GST_SECOND; + /* if we are exact now, we will be more so going backwards */ + if (parse->priv->exact_position) { + offset = gst_base_parse_find_offset (parse, ts, TRUE, NULL); + } else { + GstFormat dstformat = GST_FORMAT_BYTES; + + if (!gst_pad_query_convert (parse->srcpad, GST_FORMAT_TIME, ts, + &dstformat, &offset)) { + GST_DEBUG_OBJECT (parse, "conversion failed, only BYTE based"); + } + } + offset = CLAMP (offset, parse->priv->last_offset - 1024 * 1024, + parse->priv->last_offset - 1024); + offset = MAX (0, offset); + + GST_DEBUG_OBJECT (parse, "next fragment from offset %" G_GINT64_FORMAT, + offset); + parse->priv->offset = offset; + + ret = gst_base_parse_pull_range (parse, parse->priv->last_offset - offset, + &buffer); + if (ret != GST_FLOW_OK) + goto exit; + + gst_adapter_push (parse->adapter, buffer); + ret = gst_base_parse_process_fragment (parse, FALSE); + if (ret != GST_FLOW_OK) + goto exit; + +exit: + return ret; +} + /** * gst_base_parse_loop: * @pad: GstPad @@ -1861,13 +2094,23 @@ gst_base_parse_loop (GstPad * pad) GstBaseParse *parse; GstBaseParseClass *klass; GstBuffer *buffer, *outbuf; - gboolean ret = FALSE; + GstFlowReturn ret = FALSE; guint fsize = 0, min_size; gint skip = 0; parse = GST_BASE_PARSE (gst_pad_get_parent (pad)); klass = GST_BASE_PARSE_GET_CLASS (parse); + /* reverse playback: + * first fragment (closest to stop time) is handled normally below, + * then we pull in fragments going backwards */ + if (parse->segment.rate < 0.0) { + if (GST_CLOCK_TIME_IS_VALID (parse->priv->last_ts)) { + ret = gst_base_parse_handle_previous_fragment (parse); + goto done; + } + } + while (TRUE) { GST_BASE_PARSE_LOCK (parse); @@ -1875,11 +2118,8 @@ gst_base_parse_loop (GstPad * pad) GST_BASE_PARSE_UNLOCK (parse); ret = gst_base_parse_pull_range (parse, min_size, &buffer); - - if (ret == GST_FLOW_UNEXPECTED) - goto eos; - else if (ret != GST_FLOW_OK) - goto pause; + if (ret != GST_FLOW_OK) + goto done; if (parse->priv->discont) { GST_DEBUG_OBJECT (parse, "marking DISCONT"); @@ -1897,18 +2137,23 @@ gst_base_parse_loop (GstPad * pad) break; } parse->priv->drain = FALSE; + if (skip == -1) + skip = 1; if (skip > 0) { GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", skip); + if (parse->segment.rate < 0.0 && !parse->priv->buffers_queued) { + /* reverse playback, and no frames found yet, so we are skipping + * the leading part of a fragment, which may form the tail of + * fragment coming later, hopefully subclass skips efficiently ... */ + outbuf = gst_buffer_create_sub (buffer, 0, skip); + parse->priv->buffers_pending = + g_slist_prepend (parse->priv->buffers_pending, outbuf); + outbuf = NULL; + } parse->priv->offset += skip; if (!parse->priv->discont) parse->priv->sync_offset = parse->priv->offset; parse->priv->discont = TRUE; - } else if (skip == -1) { - GST_LOG_OBJECT (parse, "finding sync, skipping 1 byte"); - parse->priv->offset++; - if (!parse->priv->discont) - parse->priv->sync_offset = parse->priv->offset; - parse->priv->discont = TRUE; } /* skip == 0 should imply subclass set min_size to need more data ... */ GST_DEBUG_OBJECT (parse, "finding sync..."); @@ -1925,11 +2170,8 @@ gst_base_parse_loop (GstPad * pad) } else { gst_buffer_unref (buffer); ret = gst_base_parse_pull_range (parse, fsize, &outbuf); - - if (ret == GST_FLOW_UNEXPECTED) - goto eos; - else if (ret != GST_FLOW_OK) - goto pause; + if (ret != GST_FLOW_OK) + goto done; if (GST_BUFFER_SIZE (outbuf) < fsize) goto eos; } @@ -1943,10 +2185,21 @@ gst_base_parse_loop (GstPad * pad) /* This always unrefs the outbuf, even if error occurs */ ret = gst_base_parse_handle_and_push_buffer (parse, klass, outbuf); - if (ret != GST_FLOW_OK) - goto pause; + /* eat expected eos signalling past segment in reverse playback */ + if (parse->segment.rate < 0.0 && ret == GST_FLOW_UNEXPECTED && + parse->segment.last_stop >= parse->segment.stop) { + GST_DEBUG_OBJECT (parse, "downstream has reached end of segment"); + /* push what was accumulated during loop run */ + gst_base_parse_process_fragment (parse, TRUE); + ret = GST_FLOW_OK; + } done: + if (ret == GST_FLOW_UNEXPECTED) + goto eos; + else if (ret != GST_FLOW_OK) + goto pause; + gst_object_unref (parse); return; @@ -2599,13 +2852,13 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event) gst_event_parse_seek (event, &rate, &format, &flags, &cur_type, &cur, &stop_type, &stop); - GST_DEBUG_OBJECT (parse, "seek to format %s, " + GST_DEBUG_OBJECT (parse, "seek to format %s, rate %f, " "start type %d at %" GST_TIME_FORMAT ", end type %d at %" - GST_TIME_FORMAT, gst_format_get_name (format), + GST_TIME_FORMAT, gst_format_get_name (format), rate, cur_type, GST_TIME_ARGS (cur), stop_type, GST_TIME_ARGS (stop)); /* no negative rates yet */ - if (rate < 0.0) + if (rate < 0.0 && parse->priv->pad_mode == GST_ACTIVATE_PUSH) goto negative_rate; if (cur_type != GST_SEEK_TYPE_SET || @@ -2671,8 +2924,8 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event) } GST_DEBUG_OBJECT (parse, - "seek position %" G_GINT64_FORMAT " in bytes: %" G_GINT64_FORMAT, cur, - seekpos); + "seek position %" G_GINT64_FORMAT " in bytes: %" G_GINT64_FORMAT, + start_ts, seekpos); GST_DEBUG_OBJECT (parse, "seek stop %" G_GINT64_FORMAT " in bytes: %" G_GINT64_FORMAT, seeksegment.stop, seekstop); @@ -2706,6 +2959,7 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event) if (flush) { GST_DEBUG_OBJECT (parse, "sending flush stop"); gst_pad_push_event (parse->srcpad, gst_event_new_flush_stop ()); + gst_base_parse_clear_queues (parse); } else { if (parse->close_segment) gst_event_unref (parse->close_segment); @@ -2734,22 +2988,25 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event) /* This will be sent later in _loop() */ parse->pending_segment = gst_event_new_new_segment (FALSE, parse->segment.rate, - parse->segment.format, parse->segment.last_stop, parse->segment.stop, - parse->segment.last_stop); + parse->segment.format, parse->segment.start, parse->segment.stop, + parse->segment.start); GST_DEBUG_OBJECT (parse, "Created newseg format %d, " "start = %" GST_TIME_FORMAT ", stop = %" GST_TIME_FORMAT ", pos = %" GST_TIME_FORMAT, format, - GST_TIME_ARGS (parse->segment.last_stop), - GST_TIME_ARGS (stop), GST_TIME_ARGS (parse->segment.last_stop)); + GST_TIME_ARGS (parse->segment.start), + GST_TIME_ARGS (parse->segment.stop), + GST_TIME_ARGS (parse->segment.start)); /* mark discont if we are going to stream from another position. */ if (seekpos != parse->priv->offset) { GST_DEBUG_OBJECT (parse, "mark DISCONT, we did a seek to another position"); parse->priv->offset = seekpos; + parse->priv->last_offset = seekpos; parse->priv->discont = TRUE; parse->priv->next_ts = start_ts; + parse->priv->last_ts = GST_CLOCK_TIME_NONE; parse->priv->sync_offset = seekpos; parse->priv->exact_position = accurate; } -- 2.7.4