From: Mark Nauwelaerts Date: Tue, 14 Feb 2012 18:33:46 +0000 (+0100) Subject: baseparse: modify reverse playback handling X-Git-Tag: RELEASE-0.11.2~12 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=193e1cf16e3005e3e8bfb261b8ebc2b7f93b276a;p=platform%2Fupstream%2Fgstreamer.git baseparse: modify reverse playback handling ... so as to allow the push-mode case to provide data to subclass on a buffer by buffer basis (as in regular forward case), rather than all buffers of a fragment chucked together. Also refactor buffer handling some more, and add some debug. --- diff --git a/libs/gst/base/gstbaseparse.c b/libs/gst/base/gstbaseparse.c index fa3c849..8767e99 100644 --- a/libs/gst/base/gstbaseparse.c +++ b/libs/gst/base/gstbaseparse.c @@ -314,6 +314,7 @@ struct _GstBaseParsePrivate /* reverse playback */ GSList *buffers_pending; + GSList *buffers_head; GSList *buffers_queued; GSList *buffers_send; GstClockTime last_ts; @@ -435,8 +436,11 @@ static gint64 gst_base_parse_find_offset (GstBaseParse * parse, static GstFlowReturn gst_base_parse_locate_time (GstBaseParse * parse, GstClockTime * _time, gint64 * _offset); -static GstFlowReturn gst_base_parse_process_fragment (GstBaseParse * parse, - gboolean push_only); +static GstFlowReturn gst_base_parse_start_fragment (GstBaseParse * parse); +static GstFlowReturn gst_base_parse_finish_fragment (GstBaseParse * parse, + gboolean prev_head); + +static inline GstFlowReturn gst_base_parse_check_sync (GstBaseParse * parse); static gboolean gst_base_parse_is_seekable (GstBaseParse * parse); @@ -450,6 +454,9 @@ gst_base_parse_clear_queues (GstBaseParse * parse) NULL); g_slist_free (parse->priv->buffers_pending); parse->priv->buffers_pending = NULL; + g_slist_foreach (parse->priv->buffers_head, (GFunc) gst_buffer_unref, NULL); + g_slist_free (parse->priv->buffers_head); + parse->priv->buffers_head = NULL; g_slist_foreach (parse->priv->buffers_send, (GFunc) gst_buffer_unref, NULL); g_slist_free (parse->priv->buffers_send); parse->priv->buffers_send = NULL; @@ -1049,7 +1056,7 @@ gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event) if (in_segment->rate > 0.0) gst_base_parse_drain (parse); else - gst_base_parse_process_fragment (parse, FALSE); + gst_base_parse_finish_fragment (parse, FALSE); gst_adapter_clear (parse->priv->adapter); parse->priv->offset = offset; @@ -1088,7 +1095,7 @@ gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event) if (parse->segment.rate > 0.0) gst_base_parse_drain (parse); else - gst_base_parse_process_fragment (parse, FALSE); + gst_base_parse_finish_fragment (parse, TRUE); /* If we STILL have zero frames processed, fire an error */ if (parse->priv->framecount == 0) { @@ -1734,7 +1741,9 @@ gst_base_parse_unprepare_frame (GstBaseParse * parse, GstBaseParseFrame * frame) gst_base_parse_frame_update (parse, frame, NULL); } -/* takes ownership of @buffer */ +/* Wraps buffer in a frame and dispatches to subclass. + * Also manages data skipping and offset handling (including adapter flushing). + * Takes ownership of @buffer */ static GstFlowReturn gst_base_parse_handle_buffer (GstBaseParse * parse, GstBuffer * buffer, gint * skip, gint * flushed) @@ -1745,6 +1754,12 @@ gst_base_parse_handle_buffer (GstBaseParse * parse, GstBuffer * buffer, g_return_val_if_fail (skip != NULL || flushed != NULL, GST_FLOW_ERROR); + GST_LOG_OBJECT (parse, + "handling buffer of size %" G_GSIZE_FORMAT " with ts %" GST_TIME_FORMAT + ", duration %" GST_TIME_FORMAT, gst_buffer_get_size (buffer), + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)), + GST_TIME_ARGS (GST_BUFFER_DURATION (buffer))); + /* track what is being flushed during this single round of frame processing */ parse->priv->flushed = 0; *skip = 0; @@ -1759,15 +1774,16 @@ gst_base_parse_handle_buffer (GstBaseParse * parse, GstBuffer * buffer, ret = klass->handle_frame (parse, frame, skip); gst_base_parse_unprepare_frame (parse, frame); - if (parse->priv->pad_mode == GST_PAD_MODE_PULL) { - gst_adapter_clear (parse->priv->adapter); - } - *flushed = parse->priv->flushed; GST_LOG_OBJECT (parse, "handle_frame skipped %d, flushed %d", *skip, *flushed); + if (ret != GST_FLOW_OK) { + GST_DEBUG_OBJECT (parse, "handle_frame returned %d", ret); + goto exit; + } + /* subclass can only do one of these, or semantics are too unclear */ g_assert (*skip == 0 || *flushed == 0); @@ -1779,13 +1795,41 @@ gst_base_parse_handle_buffer (GstBaseParse * parse, GstBuffer * buffer, parse->priv->prev_frame = NULL; } + /* track skipping */ + if (*skip > 0) { + GstClockTime timestamp; + GstBuffer *outbuf; + + GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", *skip); + if (parse->segment.rate < 0.0 && !parse->priv->buffers_queued) { + /* reverse playback, and no frames found yet, so we are skipping + * the leading part of a fragment, which may form the tail of + * fragment coming later, hopefully subclass skips efficiently ... */ + timestamp = gst_adapter_prev_timestamp (parse->priv->adapter, NULL); + outbuf = gst_adapter_take_buffer (parse->priv->adapter, *skip); + outbuf = gst_buffer_make_writable (outbuf); + GST_BUFFER_TIMESTAMP (outbuf) = timestamp; + parse->priv->buffers_head = + g_slist_prepend (parse->priv->buffers_head, outbuf); + outbuf = NULL; + } else { + gst_adapter_flush (parse->priv->adapter, *skip); + } + if (!parse->priv->discont) + parse->priv->sync_offset = parse->priv->offset; + parse->priv->offset += *skip; + parse->priv->discont = TRUE; + /* check for indefinite skipping */ + if (ret == GST_FLOW_OK) + ret = gst_base_parse_check_sync (parse); + } + parse->priv->offset += *flushed; -#ifndef GST_DISABLE_GST_DEBUG - if (ret != GST_FLOW_OK) { - GST_DEBUG_OBJECT (parse, "handle_frame returned %d", ret); +exit: + if (parse->priv->pad_mode == GST_PAD_MODE_PULL) { + gst_adapter_clear (parse->priv->adapter); } -#endif return ret; } @@ -2281,7 +2325,34 @@ gst_base_parse_send_buffers (GstBaseParse * parse) return ret; } -/* gst_base_parse_process_fragment: +/* gst_base_parse_start_fragment: + * + * Prepares for processing a reverse playback (forward) fragment + * by (re)setting proper state variables. + */ +static GstFlowReturn +gst_base_parse_start_fragment (GstBaseParse * parse) +{ + GST_LOG_OBJECT (parse, "starting fragment"); + + /* invalidate so no fall-back timestamping is performed; + * ok if taken from subclass or upstream */ + parse->priv->next_ts = GST_CLOCK_TIME_NONE; + parse->priv->prev_ts = GST_CLOCK_TIME_NONE; + /* prevent it hanging around stop all the time */ + parse->segment.position = GST_CLOCK_TIME_NONE; + /* mark next run */ + parse->priv->discont = TRUE; + + /* head of previous fragment is now pending tail of current fragment */ + parse->priv->buffers_pending = parse->priv->buffers_head; + parse->priv->buffers_head = NULL; + + return GST_FLOW_OK; +} + + +/* gst_base_parse_finish_fragment: * * Processes a reverse playback (forward) fragment: * - append head of last fragment that was skipped to current fragment data @@ -2290,40 +2361,35 @@ gst_base_parse_send_buffers (GstBaseParse * parse) * - push queued data */ static GstFlowReturn -gst_base_parse_process_fragment (GstBaseParse * parse, gboolean push_only) +gst_base_parse_finish_fragment (GstBaseParse * parse, gboolean prev_head) { GstBuffer *buf; GstFlowReturn ret = GST_FLOW_OK; gboolean seen_key = FALSE, seen_delta = FALSE; - if (push_only) - goto push; + GST_LOG_OBJECT (parse, "finishing fragment"); /* restore order */ parse->priv->buffers_pending = g_slist_reverse (parse->priv->buffers_pending); while (parse->priv->buffers_pending) { buf = GST_BUFFER_CAST (parse->priv->buffers_pending->data); - GST_LOG_OBJECT (parse, "adding pending buffer (size %" G_GSIZE_FORMAT ")", - gst_buffer_get_size (buf)); - gst_adapter_push (parse->priv->adapter, buf); + if (prev_head) { + GST_LOG_OBJECT (parse, "adding pending buffer (size %" G_GSIZE_FORMAT ")", + gst_buffer_get_size (buf)); + gst_adapter_push (parse->priv->adapter, buf); + } else { + GST_LOG_OBJECT (parse, "discarding head buffer"); + gst_buffer_unref (buf); + } parse->priv->buffers_pending = g_slist_delete_link (parse->priv->buffers_pending, parse->priv->buffers_pending); } - /* invalidate so no fall-back timestamping is performed; - * ok if taken from subclass or upstream */ - parse->priv->next_ts = GST_CLOCK_TIME_NONE; - /* prevent it hanging around stop all the time */ - parse->segment.position = GST_CLOCK_TIME_NONE; - /* mark next run */ - parse->priv->discont = TRUE; - /* chain looks for frames and queues resulting ones (in stead of pushing) */ /* initial skipped data is added to buffers_pending */ gst_base_parse_drain (parse); -push: if (parse->priv->buffers_send) { buf = GST_BUFFER_CAST (parse->priv->buffers_send->data); seen_key |= !GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT); @@ -2416,7 +2482,6 @@ gst_base_parse_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) GstBaseParseClass *bclass; GstBaseParse *parse; GstFlowReturn ret = GST_FLOW_OK; - GstBuffer *outbuf = NULL; GstBuffer *tmpbuf = NULL; guint fsize = 1; gint skip = -1; @@ -2516,14 +2581,13 @@ gst_base_parse_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) return ret; } /* upstream feeding us in reverse playback; - * gather each fragment, then process it in single run */ + * finish previous fragment and start new upon DISCONT */ if (parse->segment.rate < 0.0) { if (G_UNLIKELY (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT))) { GST_DEBUG_OBJECT (parse, "buffer starts new reverse playback fragment"); - ret = gst_base_parse_process_fragment (parse, FALSE); + ret = gst_base_parse_finish_fragment (parse, TRUE); + gst_base_parse_start_fragment (parse); } - gst_adapter_push (parse->priv->adapter, buffer); - return ret; } gst_adapter_push (parse->priv->adapter, buffer); } @@ -2577,38 +2641,17 @@ gst_base_parse_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) ret = gst_base_parse_handle_buffer (parse, tmpbuf, &skip, &flush); tmpbuf = NULL; + /* probably already implicitly unmapped due to adapter operation, + * but for good measure ... */ gst_adapter_unmap (parse->priv->adapter); if (ret != GST_FLOW_OK) { goto done; } - if (skip > 0) { - GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", skip); - if (parse->segment.rate < 0.0 && !parse->priv->buffers_queued) { - /* reverse playback, and no frames found yet, so we are skipping - * the leading part of a fragment, which may form the tail of - * fragment coming later, hopefully subclass skips efficiently ... */ - timestamp = gst_adapter_prev_timestamp (parse->priv->adapter, NULL); - outbuf = gst_adapter_take_buffer (parse->priv->adapter, skip); - outbuf = gst_buffer_make_writable (outbuf); - GST_BUFFER_TIMESTAMP (outbuf) = timestamp; - parse->priv->buffers_pending = - g_slist_prepend (parse->priv->buffers_pending, outbuf); - outbuf = NULL; - } else { - gst_adapter_flush (parse->priv->adapter, skip); - } - if (!parse->priv->discont) - parse->priv->sync_offset = parse->priv->offset; - parse->priv->offset += skip; - parse->priv->discont = TRUE; - } else if (!flush) { + if (skip == 0 && flush == 0) { GST_LOG_OBJECT (parse, "nothing skipped and no frames finished, " "breaking to get more data"); goto done; } - if ((ret = gst_base_parse_check_sync (parse)) != GST_FLOW_OK) { - goto done; - } } /* Grab lock to prevent a race with FLUSH_START handler */ @@ -2752,8 +2795,9 @@ gst_base_parse_handle_previous_fragment (GstBaseParse * parse) /* offset will increase again as fragment is processed/parsed */ parse->priv->last_offset = offset; + gst_base_parse_start_fragment (parse); gst_adapter_push (parse->priv->adapter, buffer); - ret = gst_base_parse_process_fragment (parse, FALSE); + ret = gst_base_parse_finish_fragment (parse, TRUE); if (ret != GST_FLOW_OK) goto exit; @@ -2771,7 +2815,7 @@ static GstFlowReturn gst_base_parse_scan_frame (GstBaseParse * parse, GstBaseParseClass * klass, gboolean full) { - GstBuffer *buffer, *outbuf; + GstBuffer *buffer; GstFlowReturn ret = GST_FLOW_OK; guint fsize, min_size; gint flushed = 0; @@ -2789,14 +2833,18 @@ gst_base_parse_scan_frame (GstBaseParse * parse, GstBaseParseClass * klass, while (TRUE) { min_size = MAX (parse->priv->min_frame_size, fsize); + GST_LOG_OBJECT (parse, "reading buffer size %u", min_size); + ret = gst_base_parse_pull_range (parse, min_size, &buffer); if (ret != GST_FLOW_OK) goto done; /* if we got a short read, inform subclass we are draining leftover * and no more is to be expected */ - if (gst_buffer_get_size (buffer) < min_size) + if (gst_buffer_get_size (buffer) < min_size) { + GST_LOG_OBJECT (parse, "... but did not get that; marked draining"); parse->priv->drain = TRUE; + } if (parse->priv->detecting) { ret = klass->detect (parse, buffer); @@ -2822,34 +2870,11 @@ gst_base_parse_scan_frame (GstBaseParse * parse, GstBaseParseClass * klass, /* Else handle this buffer normally */ } - /* might need it later on */ - gst_buffer_ref (buffer); ret = gst_base_parse_handle_buffer (parse, buffer, &skip, &flushed); if (ret != GST_FLOW_OK) break; - if (skip > 0) { - GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", skip); - if (full && parse->segment.rate < 0.0 && !parse->priv->buffers_queued) { - /* reverse playback, and no frames found yet, so we are skipping - * the leading part of a fragment, which may form the tail of - * fragment coming later, hopefully subclass skips efficiently ... */ - outbuf = gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL, 0, skip); - parse->priv->buffers_pending = - g_slist_prepend (parse->priv->buffers_pending, outbuf); - outbuf = NULL; - } - if (!parse->priv->discont) - parse->priv->sync_offset = parse->priv->offset; - parse->priv->offset += skip; - parse->priv->discont = TRUE; - } else { - /* default to reasonable increase */ - fsize += 64 * 1024; - } - /* no longer needed */ - gst_buffer_unref (buffer); - /* changed offset means something happened, + /* something flushed means something happened, * and we should bail out of this loop so as not to occupy * the task thread indefinitely */ if (flushed) { @@ -2863,10 +2888,13 @@ gst_base_parse_scan_frame (GstBaseParse * parse, GstBaseParseClass * klass, ret = GST_FLOW_EOS; break; } - parse->priv->drain = FALSE; - if ((ret = gst_base_parse_check_sync (parse)) != GST_FLOW_OK) { - goto done; + /* otherwise, get some more data + * note that is checked this does not happen indefinitely */ + if (!skip) { + GST_LOG_OBJECT (parse, "getting some more data"); + fsize += 64 * 1024; } + parse->priv->drain = FALSE; } done: @@ -2905,7 +2933,7 @@ gst_base_parse_loop (GstPad * pad) parse->segment.position >= parse->segment.stop) { GST_DEBUG_OBJECT (parse, "downstream has reached end of segment"); /* push what was accumulated during loop run */ - gst_base_parse_process_fragment (parse, TRUE); + gst_base_parse_finish_fragment (parse, FALSE); /* force previous fragment */ parse->priv->offset = -1; ret = GST_FLOW_OK;