guint max_bitrate;
guint posted_avg_bitrate;
- GList *pending_events;
-
/* frames/buffers that are queued and ready to go on OK */
GQueue queued_frames;
/* reverse playback */
GSList *buffers_pending;
+ GSList *buffers_head;
GSList *buffers_queued;
GSList *buffers_send;
GstClockTime last_ts;
gint64 last_offset;
+ /* Pending serialized events */
+ GList *pending_events;
/* Newsegment event to be sent after SEEK */
- GstEvent *pending_segment;
+ gboolean pending_segment;
- /* frame previously passed to subclass (might be re-used) */
- GstBaseParseFrame *prev_frame;
- /* offset corresponding to above frame */
+ /* offset of last parsed frame/data */
gint64 prev_offset;
+ /* force a new frame, regardless of offset */
+ gboolean new_frame;
/* whether we are merely scanning for a frame */
gboolean scanning;
/* ... and resulting frame, if any */
static GstFlowReturn gst_base_parse_locate_time (GstBaseParse * parse,
GstClockTime * _time, gint64 * _offset);
-static GstFlowReturn gst_base_parse_process_fragment (GstBaseParse * parse,
- gboolean push_only);
+static GstFlowReturn gst_base_parse_start_fragment (GstBaseParse * parse);
+static GstFlowReturn gst_base_parse_finish_fragment (GstBaseParse * parse,
+ gboolean prev_head);
+
+static inline GstFlowReturn gst_base_parse_check_sync (GstBaseParse * parse);
static gboolean gst_base_parse_is_seekable (GstBaseParse * parse);
NULL);
g_slist_free (parse->priv->buffers_pending);
parse->priv->buffers_pending = NULL;
+ g_slist_foreach (parse->priv->buffers_head, (GFunc) gst_buffer_unref, NULL);
+ g_slist_free (parse->priv->buffers_head);
+ parse->priv->buffers_head = NULL;
g_slist_foreach (parse->priv->buffers_send, (GFunc) gst_buffer_unref, NULL);
g_slist_free (parse->priv->buffers_send);
parse->priv->buffers_send = NULL;
g_queue_foreach (&parse->priv->queued_frames,
(GFunc) gst_base_parse_frame_free, NULL);
g_queue_clear (&parse->priv->queued_frames);
+
+ gst_buffer_replace (&parse->priv->cache, NULL);
+
+ g_list_foreach (parse->priv->pending_events, (GFunc) gst_event_unref, NULL);
+ g_list_free (parse->priv->pending_events);
+ parse->priv->pending_events = NULL;
+ parse->priv->pending_segment = FALSE;
}
static void
gst_base_parse_finalize (GObject * object)
{
GstBaseParse *parse = GST_BASE_PARSE (object);
- GstEvent **p_ev;
g_object_unref (parse->priv->adapter);
- if (parse->priv->pending_segment) {
- p_ev = &parse->priv->pending_segment;
- gst_event_replace (p_ev, NULL);
- }
-
if (parse->priv->cache) {
gst_buffer_unref (parse->priv->cache);
parse->priv->cache = NULL;
NULL);
g_list_free (parse->priv->pending_events);
parse->priv->pending_events = NULL;
+ parse->priv->pending_segment = FALSE;
if (parse->priv->index) {
gst_object_unref (parse->priv->index);
#endif
/* Default handlers */
+ klass->sink_event = gst_base_parse_sink_eventfunc;
klass->src_event = gst_base_parse_src_eventfunc;
klass->convert = gst_base_parse_convert_default;
GST_DEBUG_FUNCPTR (gst_base_parse_sink_activate));
gst_pad_set_activatemode_function (parse->sinkpad,
GST_DEBUG_FUNCPTR (gst_base_parse_sink_activate_mode));
+ GST_PAD_SET_PROXY_ALLOCATION (parse->sinkpad);
gst_element_add_pad (GST_ELEMENT (parse), parse->sinkpad);
GST_DEBUG_OBJECT (parse, "sinkpad created");
* then use gst_base_parse_frame_init() to initialise it.
*
* Returns: a newly-allocated #GstBaseParseFrame. Free with
- * gst_base_parse_frame_free() when no longer needed, unless you gave
- * away ownership to gst_base_parse_push_frame().
+ * gst_base_parse_frame_free() when no longer needed.
*
* Since: 0.10.33
*/
parse->priv->last_ts = GST_CLOCK_TIME_NONE;
parse->priv->last_offset = 0;
- if (parse->priv->pending_segment) {
- gst_event_unref (parse->priv->pending_segment);
- parse->priv->pending_segment = NULL;
- }
-
g_list_foreach (parse->priv->pending_events, (GFunc) gst_mini_object_unref,
NULL);
g_list_free (parse->priv->pending_events);
parse->priv->pending_events = NULL;
+ parse->priv->pending_segment = FALSE;
if (parse->priv->cache) {
gst_buffer_unref (parse->priv->cache);
if (parse->priv->adapter)
gst_adapter_clear (parse->priv->adapter);
- /* we know it is not alloc'ed, but maybe other stuff to free, some day ... */
- if (parse->priv->prev_frame) {
- gst_base_parse_frame_free (parse->priv->prev_frame);
- parse->priv->prev_frame = NULL;
- }
+ parse->priv->new_frame = TRUE;
g_list_foreach (parse->priv->detect_buffers, (GFunc) gst_buffer_unref, NULL);
g_list_free (parse->priv->detect_buffers);
{
GstBaseParse *parse;
GstBaseParseClass *bclass;
- gboolean handled = FALSE;
- gboolean ret = TRUE;
+ gboolean ret;
parse = GST_BASE_PARSE (parent);
bclass = GST_BASE_PARSE_GET_CLASS (parse);
GST_DEBUG_OBJECT (parse, "handling event %d, %s", GST_EVENT_TYPE (event),
GST_EVENT_TYPE_NAME (event));
- /* Cache all events except EOS, SEGMENT and FLUSH_STOP if we have a
+ /* Cache all serialized events except EOS, SEGMENT and FLUSH_STOP if we have a
* pending segment */
- if (parse->priv->pending_segment && GST_EVENT_TYPE (event) != GST_EVENT_EOS
+ if (parse->priv->pending_segment && GST_EVENT_IS_SERIALIZED (event)
+ && GST_EVENT_TYPE (event) != GST_EVENT_EOS
&& GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT
&& GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_START
&& GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP
g_list_append (parse->priv->pending_events, event);
ret = TRUE;
} else {
-
if (GST_EVENT_TYPE (event) == GST_EVENT_EOS &&
parse->priv->framecount < MIN_FRAMES_TO_POST_BITRATE)
/* We've not posted bitrate tags yet - do so now */
gst_base_parse_post_bitrates (parse, TRUE, TRUE, TRUE);
- if (bclass->event)
- handled = bclass->event (parse, event);
-
- if (!handled)
- handled = gst_base_parse_sink_eventfunc (parse, event);
-
- if (!handled)
- ret = gst_pad_event_default (pad, parent, event);
+ if (bclass->sink_event)
+ ret = bclass->sink_event (parse, event);
+ else {
+ gst_event_unref (event);
+ ret = FALSE;
+ }
}
GST_DEBUG_OBJECT (parse, "event handled");
static gboolean
gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event)
{
- gboolean handled = FALSE;
- GstEvent **eventp;
+ gboolean ret;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_CAPS:
GST_DEBUG_OBJECT (parse, "caps: %" GST_PTR_FORMAT, caps);
if (klass->set_sink_caps)
- klass->set_sink_caps (parse, caps);
+ ret = klass->set_sink_caps (parse, caps);
+ else
+ ret = TRUE;
/* will send our own caps downstream */
gst_event_unref (event);
- handled = TRUE;
break;
}
case GST_EVENT_SEGMENT:
/* save the segment for later, right before we push a new buffer so that
* the caps are fixed and the next linked element can receive
* the segment. */
- eventp = &parse->priv->pending_segment;
- gst_event_replace (eventp, event);
- gst_event_unref (event);
- handled = TRUE;
+ parse->priv->pending_events =
+ g_list_append (parse->priv->pending_events, event);
+ parse->priv->pending_segment = TRUE;
+ ret = TRUE;
/* but finish the current segment */
GST_DEBUG_OBJECT (parse, "draining current segment");
if (in_segment->rate > 0.0)
gst_base_parse_drain (parse);
else
- gst_base_parse_process_fragment (parse, FALSE);
+ gst_base_parse_finish_fragment (parse, FALSE);
gst_adapter_clear (parse->priv->adapter);
parse->priv->offset = offset;
case GST_EVENT_FLUSH_START:
parse->priv->flushing = TRUE;
- handled = gst_pad_push_event (parse->srcpad, gst_event_ref (event));
- if (handled)
- gst_event_unref (event);
+ ret = gst_pad_push_event (parse->srcpad, event);
/* Wait for _chain() to exit by taking the srcpad STREAM_LOCK */
GST_PAD_STREAM_LOCK (parse->srcpad);
GST_PAD_STREAM_UNLOCK (parse->srcpad);
-
break;
case GST_EVENT_FLUSH_STOP:
+ ret = gst_pad_push_event (parse->srcpad, event);
gst_adapter_clear (parse->priv->adapter);
gst_base_parse_clear_queues (parse);
parse->priv->flushing = FALSE;
parse->priv->discont = TRUE;
parse->priv->last_ts = GST_CLOCK_TIME_NONE;
- if (parse->priv->prev_frame) {
- gst_base_parse_frame_free (parse->priv->prev_frame);
- parse->priv->prev_frame = NULL;
- }
+ parse->priv->new_frame = TRUE;
break;
case GST_EVENT_EOS:
if (parse->segment.rate > 0.0)
gst_base_parse_drain (parse);
else
- gst_base_parse_process_fragment (parse, FALSE);
+ gst_base_parse_finish_fragment (parse, TRUE);
/* If we STILL have zero frames processed, fire an error */
if (parse->priv->framecount == 0) {
GST_ELEMENT_ERROR (parse, STREAM, WRONG_TYPE,
("No valid frames found before end of stream"), (NULL));
}
- /* newsegment before eos */
- if (parse->priv->pending_segment) {
- gst_pad_push_event (parse->srcpad, parse->priv->pending_segment);
- parse->priv->pending_segment = NULL;
+ /* newsegment and other serialized events before eos */
+ if (G_UNLIKELY (parse->priv->pending_events)) {
+ GList *l;
+
+ for (l = parse->priv->pending_events; l != NULL; l = l->next) {
+ gst_pad_push_event (parse->srcpad, GST_EVENT (l->data));
+ }
+ g_list_free (parse->priv->pending_events);
+ parse->priv->pending_events = NULL;
+ parse->priv->pending_segment = FALSE;
}
+ ret = gst_pad_push_event (parse->srcpad, event);
break;
default:
+ ret =
+ gst_pad_event_default (parse->sinkpad, GST_OBJECT_CAST (parse),
+ event);
break;
}
-
- return handled;
+ return ret;
}
static gboolean
{
GstBaseParse *parse;
GstBaseParseClass *bclass;
- gboolean handled = FALSE;
gboolean ret = TRUE;
parse = GST_BASE_PARSE (parent);
GST_EVENT_TYPE_NAME (event));
if (bclass->src_event)
- handled = bclass->src_event (parse, event);
-
- if (!handled)
- ret = gst_pad_event_default (pad, parent, event);
+ ret = bclass->src_event (parse, event);
+ else
+ gst_event_unref (event);
return ret;
}
static gboolean
gst_base_parse_src_eventfunc (GstBaseParse * parse, GstEvent * event)
{
- gboolean handled = FALSE;
+ gboolean res = FALSE;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEEK:
- {
- if (gst_base_parse_is_seekable (parse)) {
- handled = gst_base_parse_handle_seek (parse, event);
- }
+ if (gst_base_parse_is_seekable (parse))
+ res = gst_base_parse_handle_seek (parse, event);
break;
- }
default:
+ res = gst_pad_event_default (parse->srcpad, GST_OBJECT_CAST (parse),
+ event);
break;
}
- return handled;
+ return res;
}
/* need at least some frames */
if (!parse->priv->framecount)
- return FALSE;
+ goto no_framecount;
duration = parse->priv->acc_duration / GST_MSECOND;
bytes = parse->priv->bytecount;
if (G_UNLIKELY (!duration || !bytes))
- return FALSE;
+ goto no_duration_bytes;
if (src_format == GST_FORMAT_BYTES) {
if (dest_format == GST_FORMAT_TIME) {
GST_DEBUG_OBJECT (parse, "conversion result: %" G_GINT64_FORMAT " ms",
*dest_value / GST_MSECOND);
ret = TRUE;
+ } else {
+ GST_DEBUG_OBJECT (parse, "converting bytes -> other not implemented");
}
} else if (src_format == GST_FORMAT_TIME) {
if (dest_format == GST_FORMAT_BYTES) {
"time %" G_GINT64_FORMAT " ms in bytes = %" G_GINT64_FORMAT,
src_value / GST_MSECOND, *dest_value);
ret = TRUE;
+ } else {
+ GST_DEBUG_OBJECT (parse, "converting time -> other not implemented");
}
} else if (src_format == GST_FORMAT_DEFAULT) {
/* DEFAULT == frame-based */
if (dest_format == GST_FORMAT_TIME) {
+ GST_DEBUG_OBJECT (parse, "converting default -> time");
if (parse->priv->fps_den) {
*dest_value = gst_util_uint64_scale (src_value,
GST_SECOND * parse->priv->fps_den, parse->priv->fps_num);
ret = TRUE;
}
- } else if (dest_format == GST_FORMAT_BYTES) {
+ } else {
+ GST_DEBUG_OBJECT (parse, "converting default -> other not implemented");
}
+ } else {
+ GST_DEBUG_OBJECT (parse, "conversion not implemented");
}
-
return ret;
+
+ /* ERRORS */
+no_framecount:
+ {
+ GST_DEBUG_OBJECT (parse, "no framecount");
+ return FALSE;
+ }
+no_duration_bytes:
+ {
+ GST_DEBUG_OBJECT (parse, "no duration %" G_GUINT64_FORMAT ", bytes %"
+ G_GUINT64_FORMAT, duration, bytes);
+ return FALSE;
+ }
+
}
static void
GST_BUFFER_OFFSET (buffer) = parse->priv->offset;
- if (parse->priv->prev_frame) {
- if (parse->priv->prev_offset == parse->priv->offset) {
- frame = parse->priv->prev_frame;
- } else {
- gst_base_parse_frame_free (parse->priv->prev_frame);
- }
- parse->priv->prev_frame = NULL;
- }
-
- if (!frame) {
- frame = gst_base_parse_frame_new (buffer, 0, 0);
- }
+ frame = gst_base_parse_frame_new (buffer, 0, 0);
/* also ensure to update state flags */
gst_base_parse_frame_update (parse, frame, buffer);
gst_buffer_unref (buffer);
+ if (parse->priv->prev_offset != parse->priv->offset || parse->priv->new_frame) {
+ GST_LOG_OBJECT (parse, "marking as new frame");
+ parse->priv->new_frame = FALSE;
+ frame->flags |= GST_BASE_PARSE_FRAME_FLAG_NEW_FRAME;
+ }
+
frame->offset = parse->priv->prev_offset = parse->priv->offset;
/* use default handler to provide initial (upstream) metadata */
return frame;
}
-static void
-gst_base_parse_unprepare_frame (GstBaseParse * parse, GstBaseParseFrame * frame)
-{
- g_assert (parse->priv->prev_frame == NULL);
-
- parse->priv->prev_frame = frame;
- gst_base_parse_frame_update (parse, frame, NULL);
-}
-
-/* takes ownership of @buffer */
+/* Wraps buffer in a frame and dispatches to subclass.
+ * Also manages data skipping and offset handling (including adapter flushing).
+ * Takes ownership of @buffer */
static GstFlowReturn
gst_base_parse_handle_buffer (GstBaseParse * parse, GstBuffer * buffer,
gint * skip, gint * flushed)
g_return_val_if_fail (skip != NULL || flushed != NULL, GST_FLOW_ERROR);
+ GST_LOG_OBJECT (parse,
+ "handling buffer of size %" G_GSIZE_FORMAT " with ts %" GST_TIME_FORMAT
+ ", duration %" GST_TIME_FORMAT, gst_buffer_get_size (buffer),
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
+ GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
+
/* track what is being flushed during this single round of frame processing */
parse->priv->flushed = 0;
*skip = 0;
frame = gst_base_parse_prepare_frame (parse, buffer);
ret = klass->handle_frame (parse, frame, skip);
- gst_base_parse_unprepare_frame (parse, frame);
-
- if (parse->priv->pad_mode == GST_PAD_MODE_PULL) {
- gst_adapter_clear (parse->priv->adapter);
- }
*flushed = parse->priv->flushed;
GST_LOG_OBJECT (parse, "handle_frame skipped %d, flushed %d",
*skip, *flushed);
+ if (ret != GST_FLOW_OK) {
+ GST_DEBUG_OBJECT (parse, "handle_frame returned %d", ret);
+ goto exit;
+ }
+
/* subclass can only do one of these, or semantics are too unclear */
g_assert (*skip == 0 || *flushed == 0);
- /* if it did something, clear frame state,
- * though this should also trigger the offset check anyway */
- if (*skip != 0 || *flushed != 0) {
- GST_LOG_OBJECT (parse, "clearing prev frame");
- gst_base_parse_frame_free (parse->priv->prev_frame);
- parse->priv->prev_frame = NULL;
+ /* track skipping */
+ if (*skip > 0) {
+ GstClockTime timestamp;
+ GstBuffer *outbuf;
+
+ GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", *skip);
+ if (parse->segment.rate < 0.0 && !parse->priv->buffers_queued) {
+ /* reverse playback, and no frames found yet, so we are skipping
+ * the leading part of a fragment, which may form the tail of
+ * fragment coming later, hopefully subclass skips efficiently ... */
+ timestamp = gst_adapter_prev_timestamp (parse->priv->adapter, NULL);
+ outbuf = gst_adapter_take_buffer (parse->priv->adapter, *skip);
+ outbuf = gst_buffer_make_writable (outbuf);
+ GST_BUFFER_TIMESTAMP (outbuf) = timestamp;
+ parse->priv->buffers_head =
+ g_slist_prepend (parse->priv->buffers_head, outbuf);
+ outbuf = NULL;
+ } else {
+ gst_adapter_flush (parse->priv->adapter, *skip);
+ }
+ if (!parse->priv->discont)
+ parse->priv->sync_offset = parse->priv->offset;
+ parse->priv->offset += *skip;
+ parse->priv->discont = TRUE;
+ /* check for indefinite skipping */
+ if (ret == GST_FLOW_OK)
+ ret = gst_base_parse_check_sync (parse);
}
parse->priv->offset += *flushed;
-#ifndef GST_DISABLE_GST_DEBUG
- if (ret != GST_FLOW_OK) {
- GST_DEBUG_OBJECT (parse, "handle_frame returned %d", ret);
+exit:
+ if (parse->priv->pad_mode == GST_PAD_MODE_PULL) {
+ gst_adapter_clear (parse->priv->adapter);
}
-#endif
+
+ gst_base_parse_frame_free (frame);
return ret;
}
if (!gst_pad_has_current_caps (parse->srcpad))
goto no_caps;
+ if (G_UNLIKELY (parse->priv->pending_segment)) {
+ /* have caps; check identity */
+ gst_base_parse_check_media (parse);
+ }
+
+ /* Push pending events, including NEWSEGMENT events */
+ if (G_UNLIKELY (parse->priv->pending_events)) {
+ GList *l;
+
+ for (l = parse->priv->pending_events; l != NULL; l = l->next) {
+ gst_pad_push_event (parse->srcpad, GST_EVENT (l->data));
+ }
+ g_list_free (parse->priv->pending_events);
+ parse->priv->pending_events = NULL;
+ parse->priv->pending_segment = FALSE;
+ }
+
/* segment adjustment magic; only if we are running the whole show */
if (!parse->priv->passthrough && parse->segment.rate > 0.0 &&
(parse->priv->pad_mode == GST_PAD_MODE_PULL ||
parse->priv->upstream_seekable)) {
- /* segment times are typically estimates,
- * actual frame data might lead subclass to different timestamps,
- * so override segment start from what is supplied there */
- if (G_UNLIKELY (parse->priv->pending_segment && !parse->priv->exact_position
- && GST_CLOCK_TIME_IS_VALID (last_start))) {
- gst_event_unref (parse->priv->pending_segment);
- parse->segment.start =
- MIN ((guint64) last_start, (guint64) parse->segment.stop);
-
- GST_DEBUG_OBJECT (parse,
- "adjusting pending segment start to %" GST_TIME_FORMAT,
- GST_TIME_ARGS (parse->segment.start));
-
- parse->priv->pending_segment = gst_event_new_segment (&parse->segment);
- }
- /* handle gaps, e.g. non-zero start-time, in as much not handled by above */
+ /* handle gaps */
if (GST_CLOCK_TIME_IS_VALID (parse->segment.position) &&
GST_CLOCK_TIME_IS_VALID (last_start)) {
GstClockTimeDiff diff;
GST_TIME_ARGS (parse->segment.position),
GST_TIME_ARGS (last_start));
- if (G_UNLIKELY (parse->priv->pending_segment)) {
- gst_event_unref (parse->priv->pending_segment);
- parse->segment.start = last_start;
- parse->segment.time = last_start;
- parse->priv->pending_segment =
- gst_event_new_segment (&parse->segment);
- } else {
- /* skip gap FIXME */
- gst_pad_push_event (parse->srcpad,
- gst_event_new_segment (&parse->segment));
- }
+ /* skip gap FIXME */
+ gst_pad_push_event (parse->srcpad,
+ gst_event_new_segment (&parse->segment));
+
parse->segment.position = last_start;
}
}
}
- if (G_UNLIKELY (parse->priv->pending_segment)) {
- GstEvent *pending_segment;
-
- pending_segment = parse->priv->pending_segment;
- parse->priv->pending_segment = NULL;
-
- GST_DEBUG_OBJECT (parse, "%s push pending segment",
- parse->priv->pad_mode == GST_PAD_MODE_PULL ? "loop" : "chain");
- gst_pad_push_event (parse->srcpad, pending_segment);
-
- /* have caps; check identity */
- gst_base_parse_check_media (parse);
- }
-
/* update bitrates and optionally post corresponding tags
* (following newsegment) */
gst_base_parse_update_bitrates (parse, frame);
- if (G_UNLIKELY (parse->priv->pending_events)) {
- GList *l;
-
- for (l = parse->priv->pending_events; l != NULL; l = l->next) {
- gst_pad_push_event (parse->srcpad, GST_EVENT (l->data));
- }
- g_list_free (parse->priv->pending_events);
- parse->priv->pending_events = NULL;
- }
-
if (klass->pre_push_frame) {
ret = klass->pre_push_frame (parse, frame);
} else {
* If @frame's out_buffer is set, that will be used as subsequent frame data.
* Otherwise, @size samples will be taken from the input and used for output,
* and the output's metadata (timestamps etc) will be taken as (optionally)
- * set by the subclass on @frame's (input) buffer.
+ * set by the subclass on @frame's (input) buffer (which is otherwise
+ * ignored for any but the above purpose/information).
*
* Note that the latter buffer is invalidated by this call, whereas the
* caller retains ownership of @frame.
gst_buffer_unref (frame->out_buffer);
frame->out_buffer = NULL;
+ /* mark input size consumed */
+ frame->size = size;
+
/* subclass might queue frames/data internally if it needs more
* frames to decide on the format, or might request us to queue here. */
if (frame->flags & GST_BASE_PARSE_FRAME_FLAG_DROP) {
return ret;
}
-/* gst_base_parse_process_fragment:
+/* gst_base_parse_start_fragment:
+ *
+ * Prepares for processing a reverse playback (forward) fragment
+ * by (re)setting proper state variables.
+ */
+static GstFlowReturn
+gst_base_parse_start_fragment (GstBaseParse * parse)
+{
+ GST_LOG_OBJECT (parse, "starting fragment");
+
+ /* invalidate so no fall-back timestamping is performed;
+ * ok if taken from subclass or upstream */
+ parse->priv->next_ts = GST_CLOCK_TIME_NONE;
+ parse->priv->prev_ts = GST_CLOCK_TIME_NONE;
+ /* prevent it hanging around stop all the time */
+ parse->segment.position = GST_CLOCK_TIME_NONE;
+ /* mark next run */
+ parse->priv->discont = TRUE;
+
+ /* head of previous fragment is now pending tail of current fragment */
+ parse->priv->buffers_pending = parse->priv->buffers_head;
+ parse->priv->buffers_head = NULL;
+
+ return GST_FLOW_OK;
+}
+
+
+/* gst_base_parse_finish_fragment:
*
* Processes a reverse playback (forward) fragment:
* - append head of last fragment that was skipped to current fragment data
* - push queued data
*/
static GstFlowReturn
-gst_base_parse_process_fragment (GstBaseParse * parse, gboolean push_only)
+gst_base_parse_finish_fragment (GstBaseParse * parse, gboolean prev_head)
{
GstBuffer *buf;
GstFlowReturn ret = GST_FLOW_OK;
gboolean seen_key = FALSE, seen_delta = FALSE;
- if (push_only)
- goto push;
+ GST_LOG_OBJECT (parse, "finishing fragment");
/* restore order */
parse->priv->buffers_pending = g_slist_reverse (parse->priv->buffers_pending);
while (parse->priv->buffers_pending) {
buf = GST_BUFFER_CAST (parse->priv->buffers_pending->data);
- GST_LOG_OBJECT (parse, "adding pending buffer (size %" G_GSIZE_FORMAT ")",
- gst_buffer_get_size (buf));
- gst_adapter_push (parse->priv->adapter, buf);
+ if (prev_head) {
+ GST_LOG_OBJECT (parse, "adding pending buffer (size %" G_GSIZE_FORMAT ")",
+ gst_buffer_get_size (buf));
+ gst_adapter_push (parse->priv->adapter, buf);
+ } else {
+ GST_LOG_OBJECT (parse, "discarding head buffer");
+ gst_buffer_unref (buf);
+ }
parse->priv->buffers_pending =
g_slist_delete_link (parse->priv->buffers_pending,
parse->priv->buffers_pending);
}
- /* invalidate so no fall-back timestamping is performed;
- * ok if taken from subclass or upstream */
- parse->priv->next_ts = GST_CLOCK_TIME_NONE;
- /* prevent it hanging around stop all the time */
- parse->segment.position = GST_CLOCK_TIME_NONE;
- /* mark next run */
- parse->priv->discont = TRUE;
-
/* chain looks for frames and queues resulting ones (in stead of pushing) */
/* initial skipped data is added to buffers_pending */
gst_base_parse_drain (parse);
-push:
if (parse->priv->buffers_send) {
buf = GST_BUFFER_CAST (parse->priv->buffers_send->data);
seen_key |= !GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
GstBaseParseClass *bclass;
GstBaseParse *parse;
GstFlowReturn ret = GST_FLOW_OK;
- GstBuffer *outbuf = NULL;
GstBuffer *tmpbuf = NULL;
guint fsize = 1;
gint skip = -1;
return ret;
}
/* upstream feeding us in reverse playback;
- * gather each fragment, then process it in single run */
+ * finish previous fragment and start new upon DISCONT */
if (parse->segment.rate < 0.0) {
if (G_UNLIKELY (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT))) {
GST_DEBUG_OBJECT (parse, "buffer starts new reverse playback fragment");
- ret = gst_base_parse_process_fragment (parse, FALSE);
+ ret = gst_base_parse_finish_fragment (parse, TRUE);
+ gst_base_parse_start_fragment (parse);
}
- gst_adapter_push (parse->priv->adapter, buffer);
- return ret;
}
gst_adapter_push (parse->priv->adapter, buffer);
}
while (!parse->priv->flushing) {
gint flush = 0;
- /* Synchronization loop */
- for (;;) {
- /* note: if subclass indicates MAX fsize,
- * this will not likely be available anyway ... */
- min_size = MAX (parse->priv->min_frame_size, fsize);
- av = gst_adapter_available (parse->priv->adapter);
-
- if (G_UNLIKELY (parse->priv->drain)) {
- min_size = av;
- GST_DEBUG_OBJECT (parse, "draining, data left: %d", min_size);
- if (G_UNLIKELY (!min_size)) {
- goto done;
- }
- }
+ /* note: if subclass indicates MAX fsize,
+ * this will not likely be available anyway ... */
+ min_size = MAX (parse->priv->min_frame_size, fsize);
+ av = gst_adapter_available (parse->priv->adapter);
- /* Collect at least min_frame_size bytes */
- if (av < min_size) {
- GST_DEBUG_OBJECT (parse, "not enough data available (only %d bytes)",
- av);
+ if (G_UNLIKELY (parse->priv->drain)) {
+ min_size = av;
+ GST_DEBUG_OBJECT (parse, "draining, data left: %d", min_size);
+ if (G_UNLIKELY (!min_size)) {
goto done;
}
+ }
- /* move along with upstream timestamp (if any),
- * but interpolate in between */
- timestamp = gst_adapter_prev_timestamp (parse->priv->adapter, NULL);
- if (GST_CLOCK_TIME_IS_VALID (timestamp) &&
- (parse->priv->prev_ts != timestamp)) {
- parse->priv->prev_ts = parse->priv->next_ts = timestamp;
- }
-
- /* always pass all available data */
- data = gst_adapter_map (parse->priv->adapter, av);
- /* arrange for actual data to be copied if subclass tries to,
- * since what is passed is tied to the adapter */
- tmpbuf = gst_buffer_new ();
- gst_buffer_take_memory (tmpbuf, -1,
- gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY |
- GST_MEMORY_FLAG_NO_SHARE, (gpointer) data, NULL, av, 0, av));
-
- /* keep the adapter mapped, so keep track of what has to be flushed */
- ret = gst_base_parse_handle_buffer (parse, tmpbuf, &skip, &flush);
- tmpbuf = NULL;
-
- gst_adapter_unmap (parse->priv->adapter);
- if (ret != GST_FLOW_OK) {
- goto done;
- }
- if (skip > 0) {
- GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", skip);
- if (parse->segment.rate < 0.0 && !parse->priv->buffers_queued) {
- /* reverse playback, and no frames found yet, so we are skipping
- * the leading part of a fragment, which may form the tail of
- * fragment coming later, hopefully subclass skips efficiently ... */
- timestamp = gst_adapter_prev_timestamp (parse->priv->adapter, NULL);
- outbuf = gst_adapter_take_buffer (parse->priv->adapter, skip);
- outbuf = gst_buffer_make_writable (outbuf);
- GST_BUFFER_TIMESTAMP (outbuf) = timestamp;
- parse->priv->buffers_pending =
- g_slist_prepend (parse->priv->buffers_pending, outbuf);
- outbuf = NULL;
- } else {
- gst_adapter_flush (parse->priv->adapter, skip);
- }
- if (!parse->priv->discont)
- parse->priv->sync_offset = parse->priv->offset;
- parse->priv->offset += skip;
- parse->priv->discont = TRUE;
- } else if (!flush) {
- GST_LOG_OBJECT (parse, "nothing skipped and no frames finished, "
- "breaking to get more data");
- goto done;
- }
- if ((ret = gst_base_parse_check_sync (parse)) != GST_FLOW_OK) {
- goto done;
- }
+ /* Collect at least min_frame_size bytes */
+ if (av < min_size) {
+ GST_DEBUG_OBJECT (parse, "not enough data available (only %d bytes)", av);
+ goto done;
}
- /* Grab lock to prevent a race with FLUSH_START handler */
- GST_PAD_STREAM_LOCK (parse->srcpad);
+ /* move along with upstream timestamp (if any),
+ * but interpolate in between */
+ timestamp = gst_adapter_prev_timestamp (parse->priv->adapter, NULL);
+ if (GST_CLOCK_TIME_IS_VALID (timestamp) &&
+ (parse->priv->prev_ts != timestamp)) {
+ parse->priv->prev_ts = parse->priv->next_ts = timestamp;
+ }
- /* FLUSH_START event causes the "flushing" flag to be set. In this
- * case we can leave the frame pushing loop */
- if (parse->priv->flushing) {
- GST_PAD_STREAM_UNLOCK (parse->srcpad);
- break;
+ /* always pass all available data */
+ data = gst_adapter_map (parse->priv->adapter, av);
+ /* arrange for actual data to be copied if subclass tries to,
+ * since what is passed is tied to the adapter */
+ tmpbuf = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY |
+ GST_MEMORY_FLAG_NO_SHARE, (gpointer) data, av, 0, av, NULL, NULL);
+
+ /* keep the adapter mapped, so keep track of what has to be flushed */
+ ret = gst_base_parse_handle_buffer (parse, tmpbuf, &skip, &flush);
+ tmpbuf = NULL;
+
+ /* probably already implicitly unmapped due to adapter operation,
+ * but for good measure ... */
+ gst_adapter_unmap (parse->priv->adapter);
+ if (ret != GST_FLOW_OK) {
+ goto done;
+ }
+ if (skip == 0 && flush == 0) {
+ GST_LOG_OBJECT (parse, "nothing skipped and no frames finished, "
+ "breaking to get more data");
+ goto done;
}
}
if (parse->priv->exact_position) {
offset = gst_base_parse_find_offset (parse, ts, TRUE, NULL);
} else {
- if (!gst_pad_query_convert (parse->srcpad, GST_FORMAT_TIME, ts,
+ if (!gst_base_parse_convert (parse, GST_FORMAT_TIME, ts,
GST_FORMAT_BYTES, &offset)) {
GST_DEBUG_OBJECT (parse, "conversion failed, only BYTE based");
}
/* offset will increase again as fragment is processed/parsed */
parse->priv->last_offset = offset;
+ gst_base_parse_start_fragment (parse);
gst_adapter_push (parse->priv->adapter, buffer);
- ret = gst_base_parse_process_fragment (parse, FALSE);
+ ret = gst_base_parse_finish_fragment (parse, TRUE);
if (ret != GST_FLOW_OK)
goto exit;
gst_base_parse_scan_frame (GstBaseParse * parse, GstBaseParseClass * klass,
gboolean full)
{
- GstBuffer *buffer, *outbuf;
+ GstBuffer *buffer;
GstFlowReturn ret = GST_FLOW_OK;
guint fsize, min_size;
gint flushed = 0;
while (TRUE) {
min_size = MAX (parse->priv->min_frame_size, fsize);
+ GST_LOG_OBJECT (parse, "reading buffer size %u", min_size);
+
ret = gst_base_parse_pull_range (parse, min_size, &buffer);
if (ret != GST_FLOW_OK)
goto done;
/* if we got a short read, inform subclass we are draining leftover
* and no more is to be expected */
- if (gst_buffer_get_size (buffer) < min_size)
+ if (gst_buffer_get_size (buffer) < min_size) {
+ GST_LOG_OBJECT (parse, "... but did not get that; marked draining");
parse->priv->drain = TRUE;
+ }
if (parse->priv->detecting) {
ret = klass->detect (parse, buffer);
/* Else handle this buffer normally */
}
- /* might need it later on */
- gst_buffer_ref (buffer);
ret = gst_base_parse_handle_buffer (parse, buffer, &skip, &flushed);
if (ret != GST_FLOW_OK)
break;
- if (skip > 0) {
- GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", skip);
- if (full && parse->segment.rate < 0.0 && !parse->priv->buffers_queued) {
- /* reverse playback, and no frames found yet, so we are skipping
- * the leading part of a fragment, which may form the tail of
- * fragment coming later, hopefully subclass skips efficiently ... */
- outbuf = gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL, 0, skip);
- parse->priv->buffers_pending =
- g_slist_prepend (parse->priv->buffers_pending, outbuf);
- outbuf = NULL;
- }
- if (!parse->priv->discont)
- parse->priv->sync_offset = parse->priv->offset;
- parse->priv->offset += skip;
- parse->priv->discont = TRUE;
- } else {
- /* default to reasonable increase */
- fsize += 64 * 1024;
- }
- /* no longer needed */
- gst_buffer_unref (buffer);
- /* changed offset means something happened,
+ /* something flushed means something happened,
* and we should bail out of this loop so as not to occupy
* the task thread indefinitely */
if (flushed) {
ret = GST_FLOW_EOS;
break;
}
- parse->priv->drain = FALSE;
- if ((ret = gst_base_parse_check_sync (parse)) != GST_FLOW_OK) {
- goto done;
+ /* otherwise, get some more data
+ * note that is checked this does not happen indefinitely */
+ if (!skip) {
+ GST_LOG_OBJECT (parse, "getting some more data");
+ fsize += 64 * 1024;
}
+ parse->priv->drain = FALSE;
}
done:
parse->segment.position >= parse->segment.stop) {
GST_DEBUG_OBJECT (parse, "downstream has reached end of segment");
/* push what was accumulated during loop run */
- gst_base_parse_process_fragment (parse, TRUE);
+ gst_base_parse_finish_fragment (parse, FALSE);
/* force previous fragment */
parse->priv->offset = -1;
ret = GST_FLOW_OK;
push_eos = TRUE;
}
if (push_eos) {
- /* newsegment before eos */
- if (parse->priv->pending_segment) {
- gst_pad_push_event (parse->srcpad, parse->priv->pending_segment);
- parse->priv->pending_segment = NULL;
+ /* Push pending events, including NEWSEGMENT events */
+ if (G_UNLIKELY (parse->priv->pending_events)) {
+ GList *l;
+
+ for (l = parse->priv->pending_events; l != NULL; l = l->next) {
+ gst_pad_push_event (parse->srcpad, GST_EVENT (l->data));
+ }
+ g_list_free (parse->priv->pending_events);
+ parse->priv->pending_events = NULL;
+ parse->priv->pending_segment = FALSE;
}
+
gst_pad_push_event (parse->srcpad, gst_event_new_eos ());
}
gst_object_unref (parse);
gst_base_parse_sink_activate (GstPad * sinkpad, GstObject * parent)
{
GstBaseParse *parse;
- gboolean result = TRUE;
GstQuery *query;
gboolean pull_mode;
GST_DEBUG_OBJECT (parse, "sink activate");
query = gst_query_new_scheduling ();
- result = gst_pad_peer_query (sinkpad, query);
- if (result) {
- pull_mode = gst_query_has_scheduling_mode (query, GST_PAD_MODE_PULL);
- } else {
- pull_mode = FALSE;
+ if (!gst_pad_peer_query (sinkpad, query)) {
+ gst_query_unref (query);
+ goto baseparse_push;
}
+
+ pull_mode = gst_query_has_scheduling_mode (query, GST_PAD_MODE_PULL);
gst_query_unref (query);
- if (pull_mode) {
- GST_DEBUG_OBJECT (parse, "trying to activate in pull mode");
- result = gst_pad_activate_mode (sinkpad, GST_PAD_MODE_PULL, TRUE);
- } else {
+ if (!pull_mode)
+ goto baseparse_push;
+
+ GST_DEBUG_OBJECT (parse, "trying to activate in pull mode");
+ if (!gst_pad_activate_mode (sinkpad, GST_PAD_MODE_PULL, TRUE))
+ goto baseparse_push;
+
+ return gst_pad_start_task (sinkpad, (GstTaskFunction) gst_base_parse_loop,
+ sinkpad);
+ /* fallback */
+baseparse_push:
+ {
GST_DEBUG_OBJECT (parse, "trying to activate in push mode");
- result = gst_pad_activate_mode (sinkpad, GST_PAD_MODE_PUSH, TRUE);
+ return gst_pad_activate_mode (sinkpad, GST_PAD_MODE_PUSH, TRUE);
}
-
- GST_DEBUG_OBJECT (parse, "sink activate return %d", result);
- return result;
}
static gboolean
gst_base_parse_sink_activate_mode (GstPad * pad, GstObject * parent,
GstPadMode mode, gboolean active)
{
- gboolean result = TRUE;
+ gboolean result;
GstBaseParse *parse;
parse = GST_BASE_PARSE (parent);
GST_DEBUG_OBJECT (parse, "sink activate mode %d, %d", mode, active);
- result = gst_base_parse_activate (parse, active);
-
- if (result) {
- switch (mode) {
- case GST_PAD_MODE_PULL:
- if (active) {
- parse->priv->pending_segment =
- gst_event_new_segment (&parse->segment);
- result &=
- gst_pad_start_task (pad, (GstTaskFunction) gst_base_parse_loop,
- pad);
- } else {
- result &= gst_pad_stop_task (pad);
- }
- break;
- default:
- break;
- }
+ if (!gst_base_parse_activate (parse, active))
+ goto activate_failed;
+
+ switch (mode) {
+ case GST_PAD_MODE_PULL:
+ if (active) {
+ parse->priv->pending_events =
+ g_list_append (parse->priv->pending_events,
+ gst_event_new_segment (&parse->segment));
+ parse->priv->pending_segment = TRUE;
+ result = TRUE;
+ } else {
+ result = gst_pad_stop_task (pad);
+ }
+ break;
+ default:
+ result = TRUE;
+ break;
}
if (result)
parse->priv->pad_mode = active ? mode : GST_PAD_MODE_NONE;
GST_DEBUG_OBJECT (parse, "sink activate return: %d", result);
return result;
+
+ /* ERRORS */
+activate_failed:
+ {
+ GST_DEBUG_OBJECT (parse, "activate failed");
+ return FALSE;
+ }
}
/**
GST_LOG_OBJECT (parse, "using estimated duration");
*duration = parse->priv->estimated_duration;
res = TRUE;
+ } else {
+ GST_LOG_OBJECT (parse, "cannot estimate duration");
}
GST_LOG_OBJECT (parse, "res: %d, duration %" GST_TIME_FORMAT, res,
/* For any format other than TIME, see if upstream handles
* it directly or fail. For TIME, try upstream, but do it ourselves if
* it fails upstream */
- if (format != GST_FORMAT_TIME) {
- /* default action delegates to upstream */
- res = FALSE;
+ res = gst_pad_push_event (parse->sinkpad, event);
+ if (format != GST_FORMAT_TIME || res)
goto done;
- } else {
- gst_event_ref (event);
- if ((res = gst_pad_push_event (parse->sinkpad, event))) {
- goto done;
- }
- }
/* get flush flag */
flush = flags & GST_SEEK_FLAG_FLUSH;
NULL);
} else {
start_ts = seeksegment.position;
- if (!gst_pad_query_convert (parse->srcpad, format, seeksegment.position,
+ if (!gst_base_parse_convert (parse, format, seeksegment.position,
GST_FORMAT_BYTES, &seekpos))
goto convert_failed;
- if (!gst_pad_query_convert (parse->srcpad, format, seeksegment.stop,
+ if (!gst_base_parse_convert (parse, format, seeksegment.stop,
GST_FORMAT_BYTES, &seekstop))
goto convert_failed;
}
memcpy (&parse->segment, &seeksegment, sizeof (GstSegment));
/* store the newsegment event so it can be sent from the streaming thread. */
- if (parse->priv->pending_segment)
- gst_event_unref (parse->priv->pending_segment);
-
/* This will be sent later in _loop() */
- parse->priv->pending_segment = gst_event_new_segment (&parse->segment);
+ parse->priv->pending_segment = TRUE;
+ parse->priv->pending_events =
+ g_list_append (parse->priv->pending_events,
+ gst_event_new_segment (&parse->segment));
GST_DEBUG_OBJECT (parse, "Created newseg format %d, "
"start = %" GST_TIME_FORMAT ", stop = %" GST_TIME_FORMAT
}
done:
- /* handled event is ours to free */
- if (res)
- gst_event_unref (event);
return res;
/* ERRORS */