/* seek events are temporarily kept to match them with newsegments */
GSList *pending_seeks;
+ /* reverse playback */
+ GSList *buffers_pending;
+ GSList *buffers_queued;
+ GstClockTime last_ts;
+ gint64 last_offset;
+
/* property */
/* number of initial frames to discard */
gint skip;
static void gst_base_parse_post_bitrates (GstBaseParse * parse,
gboolean post_min, gboolean post_avg, gboolean post_max);
+static gint64 gst_base_parse_find_offset (GstBaseParse * parse,
+ GstClockTime time, gboolean before, GstClockTime * _ts);
+
+static GstFlowReturn gst_base_parse_process_fragment (GstBaseParse * parse,
+ gboolean push_only);
+
+static void
+gst_base_parse_clear_queues (GstBaseParse * parse)
+{
+ g_slist_foreach (parse->priv->buffers_queued, (GFunc) gst_buffer_unref, NULL);
+ g_slist_free (parse->priv->buffers_queued);
+ parse->priv->buffers_queued = NULL;
+ g_slist_foreach (parse->priv->buffers_pending, (GFunc) gst_buffer_unref,
+ NULL);
+ g_slist_free (parse->priv->buffers_pending);
+ parse->priv->buffers_pending = NULL;
+}
+
static void
gst_base_parse_finalize (GObject * object)
{
parse->priv->index = NULL;
}
+ gst_base_parse_clear_queues (parse);
+
G_OBJECT_CLASS (parent_class)->finalize (object);
}
parse->priv->idx_interval = 0;
parse->priv->exact_position = TRUE;
+ parse->priv->last_ts = GST_CLOCK_TIME_NONE;
+ parse->priv->last_offset = 0;
+
if (parse->pending_segment)
gst_event_unref (parse->pending_segment);
/* but finish the current segment */
GST_DEBUG_OBJECT (parse, "draining current segment");
- gst_base_parse_drain (parse);
+ if (parse->segment.rate > 0.0)
+ gst_base_parse_drain (parse);
+ else
+ gst_base_parse_process_fragment (parse, FALSE);
gst_adapter_clear (parse->adapter);
parse->priv->offset = offset;
parse->priv->sync_offset = offset;
parse->priv->next_ts = next_ts;
+ parse->priv->last_ts = GST_CLOCK_TIME_NONE;
parse->priv->discont = TRUE;
break;
}
case GST_EVENT_FLUSH_STOP:
gst_adapter_clear (parse->adapter);
+ gst_base_parse_clear_queues (parse);
parse->priv->flushing = FALSE;
parse->priv->discont = TRUE;
+ parse->priv->last_ts = GST_CLOCK_TIME_NONE;
break;
case GST_EVENT_EOS:
- gst_base_parse_drain (parse);
+ if (parse->segment.rate > 0.0)
+ gst_base_parse_drain (parse);
+ else
+ gst_base_parse_process_fragment (parse, FALSE);
/* If we STILL have zero frames processed, fire an error */
if (parse->priv->framecount == 0) {
gst_buffer_set_caps (buffer, GST_PAD_CAPS (parse->srcpad));
/* segment adjustment magic; only if we are running the whole show */
- if (!parse->priv->passthrough &&
+ if (!parse->priv->passthrough && parse->segment.rate > 0.0 &&
(parse->priv->pad_mode == GST_ACTIVATE_PULL ||
parse->priv->upstream_seekable)) {
/* segment times are typically estimates,
gst_buffer_unref (buffer);
ret = GST_FLOW_OK;
} else if (ret == GST_FLOW_OK) {
- if (G_LIKELY (!parse->priv->skip)) {
- ret = gst_pad_push (parse->srcpad, buffer);
- GST_LOG_OBJECT (parse, "frame (%d bytes) pushed: %s",
- GST_BUFFER_SIZE (buffer), gst_flow_get_name (ret));
+ if (parse->segment.rate > 0.0) {
+ if (G_LIKELY (!parse->priv->skip)) {
+ ret = gst_pad_push (parse->srcpad, buffer);
+ GST_LOG_OBJECT (parse, "frame (%d bytes) pushed: %s",
+ GST_BUFFER_SIZE (buffer), gst_flow_get_name (ret));
+ } else {
+ GST_DEBUG_OBJECT (parse, "initial frame (%d bytes) discarded",
+ GST_BUFFER_SIZE (buffer));
+ parse->priv->skip--;
+ }
} else {
- GST_DEBUG_OBJECT (parse, "initial frame (%d bytes) discarded",
+ GST_LOG_OBJECT (parse, "frame (%d bytes) queued for now: %d",
GST_BUFFER_SIZE (buffer));
- parse->priv->skip--;
+ parse->priv->buffers_queued =
+ g_slist_prepend (parse->priv->buffers_queued, buffer);
+ ret = GST_FLOW_OK;
}
} else {
gst_buffer_unref (buffer);
parse->priv->drain = FALSE;
}
+/**
+ * gst_base_parse_process_fragment:
+ * @parse: #GstBaseParse.
+ *
+ * Processes a reverse playback (forward) fragment:
+ * - append head of last fragment that was skipped to current fragment data
+ * - drain the resulting current fragment data (i.e. repeated chain)
+ * - add time/duration (if needed) to frames queued by chain
+ * - push queued data
+ */
+static GstFlowReturn
+gst_base_parse_process_fragment (GstBaseParse * parse, gboolean push_only)
+{
+ GstBuffer *buf;
+ GstFlowReturn ret = GST_FLOW_OK;
+ GSList *send = NULL;
+
+ if (push_only)
+ goto push;
+
+ /* restore order */
+ parse->priv->buffers_pending = g_slist_reverse (parse->priv->buffers_pending);
+ while (parse->priv->buffers_pending) {
+ buf = GST_BUFFER_CAST (parse->priv->buffers_pending->data);
+ GST_LOG_OBJECT (parse, "adding pending buffer (size %d)",
+ GST_BUFFER_SIZE (buf));
+ gst_adapter_push (parse->adapter, buf);
+ parse->priv->buffers_pending =
+ g_slist_delete_link (parse->priv->buffers_pending,
+ parse->priv->buffers_pending);
+ }
+
+ /* invalidate so no fall-back timestamping is performed;
+ * ok if taken from subclass or upstream */
+ parse->priv->next_ts = GST_CLOCK_TIME_NONE;
+ /* prevent it hanging around stop all the time */
+ parse->segment.last_stop = GST_CLOCK_TIME_NONE;
+ /* mark next run */
+ parse->priv->discont = TRUE;
+
+ /* chain looks for frames and queues resulting ones (in stead of pushing) */
+ /* initial skipped data is added to buffers_pending */
+ gst_base_parse_drain (parse);
+
+push:
+ /* add metadata (if needed to queued buffers */
+ GST_LOG_OBJECT (parse, "last timestamp: %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (parse->priv->last_ts));
+ while (parse->priv->buffers_queued) {
+ buf = GST_BUFFER_CAST (parse->priv->buffers_queued->data);
+
+ /* no touching if upstream or parsing provided time */
+ if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
+ GST_LOG_OBJECT (parse, "buffer has time %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
+ } else if (GST_CLOCK_TIME_IS_VALID (parse->priv->last_ts) &&
+ GST_BUFFER_DURATION_IS_VALID (buf)) {
+ if (G_LIKELY (GST_BUFFER_DURATION (buf) <= parse->priv->last_ts))
+ parse->priv->last_ts -= GST_BUFFER_DURATION (buf);
+ else
+ parse->priv->last_ts = 0;
+ GST_BUFFER_TIMESTAMP (buf) = parse->priv->last_ts;
+ GST_LOG_OBJECT (parse, "applied time %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
+ } else {
+ /* no idea, very bad */
+ GST_WARNING_OBJECT (parse, "could not determine time for buffer");
+ }
+
+ /* reverse order for ascending sending */
+ send = g_slist_prepend (send, buf);
+ parse->priv->buffers_queued =
+ g_slist_delete_link (parse->priv->buffers_queued,
+ parse->priv->buffers_queued);
+ }
+
+ /* send buffers */
+ while (send) {
+ buf = GST_BUFFER_CAST (send->data);
+ GST_LOG_OBJECT (parse, "pushing buffer %p, timestamp %"
+ GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
+ ", offset %" G_GINT64_FORMAT, buf,
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
+ GST_TIME_ARGS (GST_BUFFER_DURATION (buf)), GST_BUFFER_OFFSET (buf));
+
+ if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (parse->priv->last_ts)))
+ parse->priv->last_ts = GST_BUFFER_TIMESTAMP (buf);
+
+ /* iterate output queue an push downstream */
+ ret = gst_pad_push (parse->srcpad, buf);
+ send = g_slist_delete_link (send, send);
+
+ /* clear any leftover if error */
+ if (G_UNLIKELY (ret != GST_FLOW_OK)) {
+ while (send) {
+ buf = GST_BUFFER_CAST (send->data);
+ gst_buffer_unref (buf);
+ send = g_slist_delete_link (send, send);
+ }
+ }
+ }
+
+ /* any trailing unused no longer usable (ideally none) */
+ if (G_UNLIKELY (gst_adapter_available (parse->adapter))) {
+ GST_DEBUG_OBJECT (parse, "discarding %d trailing bytes",
+ gst_adapter_available (parse->adapter));
+ gst_adapter_clear (parse->adapter);
+ }
+
+ return ret;
+}
+
/* small helper that checks whether we have been trying to resync too long */
static inline GstFlowReturn
gst_base_parse_check_sync (GstBaseParse * parse)
if (G_UNLIKELY (parse->priv->passthrough)) {
buffer = gst_buffer_make_metadata_writable (buffer);
return gst_base_parse_push_buffer (parse, buffer);
- } else
+ }
+ /* upstream feeding us in reverse playback;
+ * gather each fragment, then process it in single run */
+ if (parse->segment.rate < 0.0) {
+ if (G_UNLIKELY (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT))) {
+ GST_DEBUG_OBJECT (parse, "buffer starts new reverse playback fragment");
+ ret = gst_base_parse_process_fragment (parse, FALSE);
+ }
gst_adapter_push (parse->adapter, buffer);
+ return ret;
+ }
+ gst_adapter_push (parse->adapter, buffer);
}
/* Parse and push as many frames as possible */
}
break;
}
+ if (skip == -1) {
+ /* subclass didn't touch this value. By default we skip 1 byte */
+ skip = 1;
+ }
if (skip > 0) {
GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", skip);
- gst_adapter_flush (parse->adapter, skip);
+ if (parse->segment.rate < 0.0 && !parse->priv->buffers_queued) {
+ /* reverse playback, and no frames found yet, so we are skipping
+ * the leading part of a fragment, which may form the tail of
+ * fragment coming later, hopefully subclass skips efficiently ... */
+ timestamp = gst_adapter_prev_timestamp (parse->adapter, NULL);
+ outbuf = gst_adapter_take_buffer (parse->adapter, skip);
+ outbuf = gst_buffer_make_metadata_writable (outbuf);
+ GST_BUFFER_TIMESTAMP (outbuf) = timestamp;
+ parse->priv->buffers_pending =
+ g_slist_prepend (parse->priv->buffers_pending, outbuf);
+ outbuf = NULL;
+ } else {
+ gst_adapter_flush (parse->adapter, skip);
+ }
parse->priv->offset += skip;
if (!parse->priv->discont)
parse->priv->sync_offset = parse->priv->offset;
parse->priv->discont = TRUE;
- } else if (skip == -1) {
- /* subclass didn't touch this value. By default we skip 1 byte */
- GST_LOG_OBJECT (parse, "finding sync, skipping 1 byte");
- gst_adapter_flush (parse->adapter, 1);
- parse->priv->offset++;
- if (!parse->priv->discont)
- parse->priv->sync_offset = parse->priv->offset;
- parse->priv->discont = TRUE;
}
/* There is a possibility that subclass set the skip value to zero.
This means that it has probably found a frame but wants to ask
return GST_FLOW_OK;
}
+static GstFlowReturn
+gst_base_parse_handle_previous_fragment (GstBaseParse * parse)
+{
+ gint64 offset = 0;
+ GstClockTime ts = 0;
+ GstBuffer *buffer;
+ GstFlowReturn ret;
+
+ GST_DEBUG_OBJECT (parse, "fragment ended; last_ts = %" GST_TIME_FORMAT
+ ", last_offset = %" G_GINT64_FORMAT, GST_TIME_ARGS (parse->priv->last_ts),
+ parse->priv->last_offset);
+
+ if (!parse->priv->last_offset || parse->priv->last_ts <= parse->segment.start) {
+ GST_DEBUG_OBJECT (parse, "past start of segment %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (parse->segment.start));
+ ret = GST_FLOW_UNEXPECTED;
+ goto exit;
+ }
+
+ /* last fragment started at last_offset / last_ts;
+ * seek back 10s capped at 1MB */
+ if (parse->priv->last_ts >= 10 * GST_SECOND)
+ ts = parse->priv->last_ts - 10 * GST_SECOND;
+ /* if we are exact now, we will be more so going backwards */
+ if (parse->priv->exact_position) {
+ offset = gst_base_parse_find_offset (parse, ts, TRUE, NULL);
+ } else {
+ GstFormat dstformat = GST_FORMAT_BYTES;
+
+ if (!gst_pad_query_convert (parse->srcpad, GST_FORMAT_TIME, ts,
+ &dstformat, &offset)) {
+ GST_DEBUG_OBJECT (parse, "conversion failed, only BYTE based");
+ }
+ }
+ offset = CLAMP (offset, parse->priv->last_offset - 1024 * 1024,
+ parse->priv->last_offset - 1024);
+ offset = MAX (0, offset);
+
+ GST_DEBUG_OBJECT (parse, "next fragment from offset %" G_GINT64_FORMAT,
+ offset);
+ parse->priv->offset = offset;
+
+ ret = gst_base_parse_pull_range (parse, parse->priv->last_offset - offset,
+ &buffer);
+ if (ret != GST_FLOW_OK)
+ goto exit;
+
+ gst_adapter_push (parse->adapter, buffer);
+ ret = gst_base_parse_process_fragment (parse, FALSE);
+ if (ret != GST_FLOW_OK)
+ goto exit;
+
+exit:
+ return ret;
+}
+
/**
* gst_base_parse_loop:
* @pad: GstPad
GstBaseParse *parse;
GstBaseParseClass *klass;
GstBuffer *buffer, *outbuf;
- gboolean ret = FALSE;
+ GstFlowReturn ret = FALSE;
guint fsize = 0, min_size;
gint skip = 0;
parse = GST_BASE_PARSE (gst_pad_get_parent (pad));
klass = GST_BASE_PARSE_GET_CLASS (parse);
+ /* reverse playback:
+ * first fragment (closest to stop time) is handled normally below,
+ * then we pull in fragments going backwards */
+ if (parse->segment.rate < 0.0) {
+ if (GST_CLOCK_TIME_IS_VALID (parse->priv->last_ts)) {
+ ret = gst_base_parse_handle_previous_fragment (parse);
+ goto done;
+ }
+ }
+
while (TRUE) {
GST_BASE_PARSE_LOCK (parse);
GST_BASE_PARSE_UNLOCK (parse);
ret = gst_base_parse_pull_range (parse, min_size, &buffer);
-
- if (ret == GST_FLOW_UNEXPECTED)
- goto eos;
- else if (ret != GST_FLOW_OK)
- goto pause;
+ if (ret != GST_FLOW_OK)
+ goto done;
if (parse->priv->discont) {
GST_DEBUG_OBJECT (parse, "marking DISCONT");
break;
}
parse->priv->drain = FALSE;
+ if (skip == -1)
+ skip = 1;
if (skip > 0) {
GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", skip);
+ if (parse->segment.rate < 0.0 && !parse->priv->buffers_queued) {
+ /* reverse playback, and no frames found yet, so we are skipping
+ * the leading part of a fragment, which may form the tail of
+ * fragment coming later, hopefully subclass skips efficiently ... */
+ outbuf = gst_buffer_create_sub (buffer, 0, skip);
+ parse->priv->buffers_pending =
+ g_slist_prepend (parse->priv->buffers_pending, outbuf);
+ outbuf = NULL;
+ }
parse->priv->offset += skip;
if (!parse->priv->discont)
parse->priv->sync_offset = parse->priv->offset;
parse->priv->discont = TRUE;
- } else if (skip == -1) {
- GST_LOG_OBJECT (parse, "finding sync, skipping 1 byte");
- parse->priv->offset++;
- if (!parse->priv->discont)
- parse->priv->sync_offset = parse->priv->offset;
- parse->priv->discont = TRUE;
}
/* skip == 0 should imply subclass set min_size to need more data ... */
GST_DEBUG_OBJECT (parse, "finding sync...");
} else {
gst_buffer_unref (buffer);
ret = gst_base_parse_pull_range (parse, fsize, &outbuf);
-
- if (ret == GST_FLOW_UNEXPECTED)
- goto eos;
- else if (ret != GST_FLOW_OK)
- goto pause;
+ if (ret != GST_FLOW_OK)
+ goto done;
if (GST_BUFFER_SIZE (outbuf) < fsize)
goto eos;
}
/* This always unrefs the outbuf, even if error occurs */
ret = gst_base_parse_handle_and_push_buffer (parse, klass, outbuf);
- if (ret != GST_FLOW_OK)
- goto pause;
+ /* eat expected eos signalling past segment in reverse playback */
+ if (parse->segment.rate < 0.0 && ret == GST_FLOW_UNEXPECTED &&
+ parse->segment.last_stop >= parse->segment.stop) {
+ GST_DEBUG_OBJECT (parse, "downstream has reached end of segment");
+ /* push what was accumulated during loop run */
+ gst_base_parse_process_fragment (parse, TRUE);
+ ret = GST_FLOW_OK;
+ }
done:
+ if (ret == GST_FLOW_UNEXPECTED)
+ goto eos;
+ else if (ret != GST_FLOW_OK)
+ goto pause;
+
gst_object_unref (parse);
return;
gst_event_parse_seek (event, &rate, &format, &flags,
&cur_type, &cur, &stop_type, &stop);
- GST_DEBUG_OBJECT (parse, "seek to format %s, "
+ GST_DEBUG_OBJECT (parse, "seek to format %s, rate %f, "
"start type %d at %" GST_TIME_FORMAT ", end type %d at %"
- GST_TIME_FORMAT, gst_format_get_name (format),
+ GST_TIME_FORMAT, gst_format_get_name (format), rate,
cur_type, GST_TIME_ARGS (cur), stop_type, GST_TIME_ARGS (stop));
/* no negative rates yet */
- if (rate < 0.0)
+ if (rate < 0.0 && parse->priv->pad_mode == GST_ACTIVATE_PUSH)
goto negative_rate;
if (cur_type != GST_SEEK_TYPE_SET ||
}
GST_DEBUG_OBJECT (parse,
- "seek position %" G_GINT64_FORMAT " in bytes: %" G_GINT64_FORMAT, cur,
- seekpos);
+ "seek position %" G_GINT64_FORMAT " in bytes: %" G_GINT64_FORMAT,
+ start_ts, seekpos);
GST_DEBUG_OBJECT (parse,
"seek stop %" G_GINT64_FORMAT " in bytes: %" G_GINT64_FORMAT,
seeksegment.stop, seekstop);
if (flush) {
GST_DEBUG_OBJECT (parse, "sending flush stop");
gst_pad_push_event (parse->srcpad, gst_event_new_flush_stop ());
+ gst_base_parse_clear_queues (parse);
} else {
if (parse->close_segment)
gst_event_unref (parse->close_segment);
/* This will be sent later in _loop() */
parse->pending_segment =
gst_event_new_new_segment (FALSE, parse->segment.rate,
- parse->segment.format, parse->segment.last_stop, parse->segment.stop,
- parse->segment.last_stop);
+ parse->segment.format, parse->segment.start, parse->segment.stop,
+ parse->segment.start);
GST_DEBUG_OBJECT (parse, "Created newseg format %d, "
"start = %" GST_TIME_FORMAT ", stop = %" GST_TIME_FORMAT
", pos = %" GST_TIME_FORMAT, format,
- GST_TIME_ARGS (parse->segment.last_stop),
- GST_TIME_ARGS (stop), GST_TIME_ARGS (parse->segment.last_stop));
+ GST_TIME_ARGS (parse->segment.start),
+ GST_TIME_ARGS (parse->segment.stop),
+ GST_TIME_ARGS (parse->segment.start));
/* mark discont if we are going to stream from another position. */
if (seekpos != parse->priv->offset) {
GST_DEBUG_OBJECT (parse,
"mark DISCONT, we did a seek to another position");
parse->priv->offset = seekpos;
+ parse->priv->last_offset = seekpos;
parse->priv->discont = TRUE;
parse->priv->next_ts = start_ts;
+ parse->priv->last_ts = GST_CLOCK_TIME_NONE;
parse->priv->sync_offset = seekpos;
parse->priv->exact_position = accurate;
}