* Copyright (C) 2008 Nokia Corporation. All rights reserved.
* Contact: Stefan Kost <stefan.kost@nokia.com>
* Copyright (C) 2008 Sebastian Dröge <sebastian.droege@collabora.co.uk>.
+ * Copyright (C) 2011, Hewlett-Packard Development Company, L.P.
+ * Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>, Collabora Ltd.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* contain a valid frame, this call must return FALSE and optionally
* set the @skipsize value to inform base class that how many bytes
* it needs to skip in order to find a valid frame. @framesize can always
- * indicate a new minimum for current frame parsing. The passed buffer
+ * indicate a new minimum for current frame parsing. Indicating G_MAXUINT
+ * for requested amount means subclass simply needs best available
+ * subsequent data. In push mode this amounts to an additional input buffer
+ * (thus minimal additional latency), in pull mode this amounts to some
+ * arbitrary reasonable buffer size increase. The passed buffer
* is read-only. Note that @check_valid_frame might receive any small
* amount of input data when leftover data is being drained (e.g. at EOS).
* </para></listitem>
* <listitem><para>
* Finally the buffer can be pushed downstream and the parsing loop starts
* over again. Just prior to actually pushing the buffer in question,
- * it is passed to @pre_push_buffer which gives subclass yet one
+ * it is passed to @pre_push_frame which gives subclass yet one
* last chance to examine buffer metadata, or to send some custom (tag)
* events, or to perform custom (segment) filtering.
* </para></listitem>
GST_FORMAT_DEFAULT,
GST_FORMAT_BYTES,
GST_FORMAT_TIME,
- 0
+ GST_FORMAT_UNDEFINED
};
#define GST_BASE_PARSE_GET_PRIVATE(obj) \
gint64 duration;
GstFormat duration_fmt;
gint64 estimated_duration;
+ gint64 estimated_drift;
guint min_frame_size;
gboolean passthrough;
guint bitrate;
guint lead_in, lead_out;
GstClockTime lead_in_ts, lead_out_ts;
+ GstClockTime min_latency, max_latency;
gboolean discont;
gboolean flushing;
/* Segment event that closes the running segment prior to SEEK */
GstEvent *close_segment;
+
+ /* push mode helper frame */
+ GstBaseParseFrame frame;
+
+ /* TRUE if we're still detecting the format, i.e.
+ * if ::detect() is still called for future buffers */
+ gboolean detecting;
+ GList *detect_buffers;
+ guint detect_buffers_size;
};
typedef struct _GstBaseParseSeek
static gboolean gst_base_parse_src_event (GstPad * pad, GstEvent * event);
static gboolean gst_base_parse_sink_event (GstPad * pad, GstEvent * event);
-static gboolean gst_base_parse_query (GstPad * pad, GstQuery ** query);
-static gboolean gst_base_parse_sink_setcaps (GstPad * pad, GstCaps * caps);
+static gboolean gst_base_parse_query (GstPad * pad, GstQuery * query);
+static GstCaps *gst_base_parse_sink_getcaps (GstPad * pad, GstCaps * filter);
static const GstQueryType *gst_base_parse_get_querytypes (GstPad * pad);
static GstFlowReturn gst_base_parse_chain (GstPad * pad, GstBuffer * buffer);
g_slist_foreach (parse->priv->buffers_send, (GFunc) gst_buffer_unref, NULL);
g_slist_free (parse->priv->buffers_send);
parse->priv->buffers_send = NULL;
+
+ g_list_foreach (parse->priv->detect_buffers, (GFunc) gst_buffer_unref, NULL);
+ g_list_free (parse->priv->detect_buffers);
+ parse->priv->detect_buffers = NULL;
+ parse->priv->detect_buffers_size = 0;
}
static void
parse->sinkpad = gst_pad_new_from_template (pad_template, "sink");
gst_pad_set_event_function (parse->sinkpad,
GST_DEBUG_FUNCPTR (gst_base_parse_sink_event));
- gst_pad_set_setcaps_function (parse->sinkpad,
- GST_DEBUG_FUNCPTR (gst_base_parse_sink_setcaps));
+ gst_pad_set_getcaps_function (parse->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_base_parse_sink_getcaps));
gst_pad_set_chain_function (parse->sinkpad,
GST_DEBUG_FUNCPTR (gst_base_parse_chain));
gst_pad_set_activate_function (parse->sinkpad,
frame->buffer = NULL;
}
- if (!(frame->_private_flags & GST_BASE_PARSE_FRAME_PRIVATE_FLAG_NOALLOC))
+ if (!(frame->_private_flags & GST_BASE_PARSE_FRAME_PRIVATE_FLAG_NOALLOC)) {
g_slice_free (GstBaseParseFrame, frame);
+ } else {
+ memset (frame, 0, sizeof (*frame));
+ }
}
GType
parse->priv->first_frame_ts = GST_CLOCK_TIME_NONE;
parse->priv->first_frame_offset = -1;
parse->priv->estimated_duration = -1;
+ parse->priv->estimated_drift = 0;
parse->priv->next_ts = 0;
parse->priv->syncable = TRUE;
parse->priv->passthrough = FALSE;
g_slist_foreach (parse->priv->pending_seeks, (GFunc) g_free, NULL);
g_slist_free (parse->priv->pending_seeks);
parse->priv->pending_seeks = NULL;
+
+ /* we know it is not alloc'ed, but maybe other stuff to free, some day ... */
+ parse->priv->frame._private_flags |=
+ GST_BASE_PARSE_FRAME_PRIVATE_FLAG_NOALLOC;
+ gst_base_parse_frame_free (&parse->priv->frame);
+
+ g_list_foreach (parse->priv->detect_buffers, (GFunc) gst_buffer_unref, NULL);
+ g_list_free (parse->priv->detect_buffers);
+ parse->priv->detect_buffers = NULL;
+ parse->priv->detect_buffers_size = 0;
GST_OBJECT_UNLOCK (parse);
}
GstEvent **eventp;
switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_CAPS:
+ {
+ GstCaps *caps;
+ GstBaseParseClass *klass;
+
+ klass = GST_BASE_PARSE_GET_CLASS (parse);
+
+ gst_event_parse_caps (event, &caps);
+ GST_DEBUG_OBJECT (parse, "caps: %" GST_PTR_FORMAT, caps);
+
+ if (klass->set_sink_caps)
+ klass->set_sink_caps (parse, caps);
+
+ /* will send our own caps downstream */
+ gst_event_unref (event);
+ handled = TRUE;
+ break;
+ }
case GST_EVENT_SEGMENT:
{
const GstSegment *in_segment;
gboolean update;
#endif
- in_segment = gst_event_get_segment (event);
+ gst_event_parse_segment (event, &in_segment);
gst_segment_init (&out_segment, GST_FORMAT_TIME);
GST_DEBUG_OBJECT (parse, "segment %" GST_SEGMENT_FORMAT, in_segment);
event = gst_event_new_segment (&out_segment);
- GST_DEBUG_OBJECT (parse, "Converted incoming segment to TIME. "
+ GST_DEBUG_OBJECT (parse, "Converted incoming segment to TIME. %"
GST_SEGMENT_FORMAT, in_segment);
} else if (in_segment->format != GST_FORMAT_TIME) {
parse->priv->flushing = FALSE;
parse->priv->discont = TRUE;
parse->priv->last_ts = GST_CLOCK_TIME_NONE;
+ parse->priv->frame._private_flags |=
+ GST_BASE_PARSE_FRAME_PRIVATE_FLAG_NOALLOC;
+ gst_base_parse_frame_free (&parse->priv->frame);
break;
case GST_EVENT_EOS:
peer = gst_pad_get_peer (parse->sinkpad);
if (peer) {
- GstFormat pformat = GST_FORMAT_BYTES;
gboolean qres = FALSE;
gint64 ptot, dest_value;
- qres = gst_pad_query_duration (peer, &pformat, &ptot);
+ qres = gst_pad_query_duration (peer, GST_FORMAT_BYTES, &ptot);
gst_object_unref (GST_OBJECT (peer));
if (qres) {
- if (gst_base_parse_convert (parse, pformat, ptot,
+ if (gst_base_parse_convert (parse, GST_FORMAT_BYTES, ptot,
GST_FORMAT_TIME, &dest_value)) {
+
+ /* inform if duration changed, but try to avoid spamming */
+ parse->priv->estimated_drift +=
+ dest_value - parse->priv->estimated_duration;
+ if (parse->priv->estimated_drift > GST_SECOND ||
+ parse->priv->estimated_drift < -GST_SECOND) {
+ gst_element_post_message (GST_ELEMENT (parse),
+ gst_message_new_duration (GST_OBJECT (parse),
+ GST_FORMAT_TIME, dest_value));
+ parse->priv->estimated_drift = 0;
+ }
parse->priv->estimated_duration = dest_value;
GST_LOG_OBJECT (parse,
"updated estimated duration to %" GST_TIME_FORMAT,
GstTagList *taglist = NULL;
if (post_min && parse->priv->post_min_bitrate) {
- taglist = gst_tag_list_new ();
+ taglist = gst_tag_list_new_empty ();
gst_tag_list_add (taglist, GST_TAG_MERGE_REPLACE,
GST_TAG_MINIMUM_BITRATE, parse->priv->min_bitrate, NULL);
if (post_avg && parse->priv->post_avg_bitrate) {
if (taglist == NULL)
- taglist = gst_tag_list_new ();
+ taglist = gst_tag_list_new_empty ();
parse->priv->posted_avg_bitrate = parse->priv->avg_bitrate;
gst_tag_list_add (taglist, GST_TAG_MERGE_REPLACE, GST_TAG_BITRATE,
if (post_max && parse->priv->post_max_bitrate) {
if (taglist == NULL)
- taglist = gst_tag_list_new ();
+ taglist = gst_tag_list_new_empty ();
gst_tag_list_add (taglist, GST_TAG_MERGE_REPLACE,
GST_TAG_MAXIMUM_BITRATE, parse->priv->max_bitrate, NULL);
guint idx_interval = 0;
query = gst_query_new_seeking (GST_FORMAT_BYTES);
- if (!gst_pad_peer_query (parse->sinkpad, &query)) {
+ if (!gst_pad_peer_query (parse->sinkpad, query)) {
GST_DEBUG_OBJECT (parse, "seeking query failed");
goto done;
}
/* try harder to query upstream size if we didn't get it the first time */
if (seekable && stop == -1) {
- GstFormat fmt = GST_FORMAT_BYTES;
-
GST_DEBUG_OBJECT (parse, "doing duration query to fix up unset stop");
- gst_pad_query_peer_duration (parse->sinkpad, &fmt, &stop);
+ gst_pad_query_peer_duration (parse->sinkpad, GST_FORMAT_BYTES, &stop);
}
/* if upstream doesn't know the size, it's likely that it's not seekable in
static void
gst_base_parse_check_upstream (GstBaseParse * parse)
{
- GstFormat fmt = GST_FORMAT_TIME;
gint64 stop;
- if (gst_pad_query_peer_duration (parse->sinkpad, &fmt, &stop))
+ if (gst_pad_query_peer_duration (parse->sinkpad, GST_FORMAT_TIME, &stop))
if (GST_CLOCK_TIME_IS_VALID (stop) && stop) {
/* upstream has one, accept it also, and no further updates */
gst_base_parse_set_duration (parse, GST_FORMAT_TIME, stop, 0);
if (caps)
gst_caps_unref (caps);
- GST_DEBUG_OBJECT (parse, "media is video == %d", parse->priv->is_video);
+ GST_DEBUG_OBJECT (parse, "media is video: %d", parse->priv->is_video);
}
/* takes ownership of frame */
GST_LOG_OBJECT (parse,
"parsing frame at offset %" G_GUINT64_FORMAT
- " (%#" G_GINT64_MODIFIER "x) of size %d",
+ " (%#" G_GINT64_MODIFIER "x) of size %" G_GSIZE_FORMAT,
GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET (buffer),
gst_buffer_get_size (buffer));
while ((queued_frame = g_queue_pop_head (&parse->priv->queued_frames))) {
gst_base_parse_push_frame (parse, queued_frame);
- gst_base_parse_frame_free (queued_frame);
}
}
buffer = frame->buffer;
GST_LOG_OBJECT (parse,
- "processing buffer of size %d with ts %" GST_TIME_FORMAT
+ "processing buffer of size %" G_GSIZE_FORMAT " with ts %" GST_TIME_FORMAT
", duration %" GST_TIME_FORMAT, gst_buffer_get_size (buffer),
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
parse->priv->close_segment = NULL;
}
if (G_UNLIKELY (parse->priv->pending_segment)) {
+ GstEvent *pending_segment;
+
+ pending_segment = parse->priv->pending_segment;
+ parse->priv->pending_segment = NULL;
+
GST_DEBUG_OBJECT (parse, "%s push pending segment",
parse->priv->pad_mode == GST_ACTIVATE_PULL ? "loop" : "chain");
- gst_pad_push_event (parse->srcpad, parse->priv->pending_segment);
- parse->priv->pending_segment = NULL;
+ gst_pad_push_event (parse->srcpad, pending_segment);
/* have caps; check identity */
gst_base_parse_check_media (parse);
GST_BUFFER_TIMESTAMP (buffer) >
parse->segment.stop + parse->priv->lead_out_ts) {
GST_LOG_OBJECT (parse, "Dropped frame, after segment");
- ret = GST_FLOW_UNEXPECTED;
+ ret = GST_FLOW_EOS;
} else if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer) &&
GST_BUFFER_DURATION_IS_VALID (buffer) &&
GST_CLOCK_TIME_IS_VALID (parse->segment.start) &&
ret = GST_FLOW_OK;
} else if (ret == GST_FLOW_OK) {
if (parse->segment.rate > 0.0) {
+ GST_LOG_OBJECT (parse, "pushing frame (%" G_GSIZE_FORMAT " bytes) now..",
+ size);
ret = gst_pad_push (parse->srcpad, buffer);
- GST_LOG_OBJECT (parse, "frame (%" G_GSIZE_FORMAT " bytes) pushed: %s",
- size, gst_flow_get_name (ret));
+ GST_LOG_OBJECT (parse, "frame pushed, flow %s", gst_flow_get_name (ret));
} else {
GST_LOG_OBJECT (parse, "frame (%" G_GSIZE_FORMAT " bytes) queued for now",
size);
ret = GST_FLOW_OK;
}
} else {
- gst_buffer_unref (buffer);
GST_LOG_OBJECT (parse, "frame (%" G_GSIZE_FORMAT " bytes) not pushed: %s",
size, gst_flow_get_name (ret));
+ gst_buffer_unref (buffer);
/* if we are not sufficiently in control, let upstream decide on EOS */
- if (ret == GST_FLOW_UNEXPECTED &&
+ if (ret == GST_FLOW_EOS &&
(parse->priv->passthrough ||
(parse->priv->pad_mode == GST_ACTIVATE_PUSH &&
!parse->priv->upstream_seekable)))
parse->priv->buffers_pending = g_slist_reverse (parse->priv->buffers_pending);
while (parse->priv->buffers_pending) {
buf = GST_BUFFER_CAST (parse->priv->buffers_pending->data);
- GST_LOG_OBJECT (parse, "adding pending buffer (size %d)",
+ GST_LOG_OBJECT (parse, "adding pending buffer (size %" G_GSIZE_FORMAT ")",
gst_buffer_get_size (buf));
gst_adapter_push (parse->priv->adapter, buf);
parse->priv->buffers_pending =
/* any trailing unused no longer usable (ideally none) */
if (G_UNLIKELY (gst_adapter_available (parse->priv->adapter))) {
- GST_DEBUG_OBJECT (parse, "discarding %d trailing bytes",
+ GST_DEBUG_OBJECT (parse, "discarding %" G_GSIZE_FORMAT " trailing bytes",
gst_adapter_available (parse->priv->adapter));
gst_adapter_clear (parse->priv->adapter);
}
const guint8 *data;
guint old_min_size = 0, min_size, av;
GstClockTime timestamp;
- GstBaseParseFrame _frame;
GstBaseParseFrame *frame;
parse = GST_BASE_PARSE (GST_OBJECT_PARENT (pad));
bclass = GST_BASE_PARSE_GET_CLASS (parse);
- frame = &_frame;
- gst_base_parse_frame_init (frame);
+ if (parse->priv->detecting) {
+ GstBuffer *detect_buf;
+
+ if (parse->priv->detect_buffers_size == 0) {
+ detect_buf = gst_buffer_ref (buffer);
+ } else {
+ GList *l;
+ guint offset = 0;
+
+ detect_buf = gst_buffer_new ();
+
+ for (l = parse->priv->detect_buffers; l; l = l->next) {
+ gsize tmpsize = gst_buffer_get_size (l->data);
+
+ gst_buffer_copy_into (detect_buf, GST_BUFFER_CAST (l->data),
+ GST_BUFFER_COPY_MEMORY, offset, tmpsize);
+ offset += tmpsize;
+ }
+ if (buffer)
+ gst_buffer_copy_into (detect_buf, buffer, GST_BUFFER_COPY_MEMORY,
+ offset, gst_buffer_get_size (buffer));
+ }
+
+ ret = bclass->detect (parse, detect_buf);
+ gst_buffer_unref (detect_buf);
+
+ if (ret == GST_FLOW_OK) {
+ GList *l;
+
+ /* Detected something */
+ parse->priv->detecting = FALSE;
+
+ for (l = parse->priv->detect_buffers; l; l = l->next) {
+ if (ret == GST_FLOW_OK && !parse->priv->flushing)
+ ret =
+ gst_base_parse_chain (GST_BASE_PARSE_SINK_PAD (parse),
+ GST_BUFFER_CAST (l->data));
+ else
+ gst_buffer_unref (GST_BUFFER_CAST (l->data));
+ }
+ g_list_free (parse->priv->detect_buffers);
+ parse->priv->detect_buffers = NULL;
+ parse->priv->detect_buffers_size = 0;
+
+ if (ret != GST_FLOW_OK) {
+ return ret;
+ }
+
+ /* Handle the current buffer */
+ } else if (ret == GST_FLOW_NOT_NEGOTIATED) {
+ /* Still detecting, append buffer or error out if draining */
+
+ if (parse->priv->drain) {
+ GST_DEBUG_OBJECT (parse, "Draining but did not detect format yet");
+ return GST_FLOW_ERROR;
+ } else if (parse->priv->flushing) {
+ g_list_foreach (parse->priv->detect_buffers, (GFunc) gst_buffer_unref,
+ NULL);
+ g_list_free (parse->priv->detect_buffers);
+ parse->priv->detect_buffers = NULL;
+ parse->priv->detect_buffers_size = 0;
+ } else {
+ parse->priv->detect_buffers =
+ g_list_append (parse->priv->detect_buffers, buffer);
+ parse->priv->detect_buffers_size += gst_buffer_get_size (buffer);
+ return GST_FLOW_OK;
+ }
+ } else {
+ /* Something went wrong, subclass responsible for error reporting */
+ return ret;
+ }
+
+ /* And now handle the current buffer if detection worked */
+ }
+
+ frame = &parse->priv->frame;
if (G_LIKELY (buffer)) {
- GST_LOG_OBJECT (parse, "buffer size: %d, offset = %" G_GINT64_FORMAT,
+ GST_LOG_OBJECT (parse,
+ "buffer size: %" G_GSIZE_FORMAT ", offset = %" G_GINT64_FORMAT,
gst_buffer_get_size (buffer), GST_BUFFER_OFFSET (buffer));
if (G_UNLIKELY (parse->priv->passthrough)) {
+ gst_base_parse_frame_init (frame);
frame->buffer = gst_buffer_make_writable (buffer);
return gst_base_parse_push_frame (parse, frame);
}
gst_adapter_push (parse->priv->adapter, buffer);
}
+ if (G_UNLIKELY (buffer &&
+ GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT))) {
+ frame->_private_flags |= GST_BASE_PARSE_FRAME_PRIVATE_FLAG_NOALLOC;
+ gst_base_parse_frame_free (frame);
+ }
+
/* Parse and push as many frames as possible */
/* Stop either when adapter is empty or we are flushing */
while (!parse->priv->flushing) {
gboolean res;
+ /* maintain frame state for a single frame parsing round across _chain calls,
+ * so only init when needed */
+ if (!frame->_private_flags)
+ gst_base_parse_frame_init (frame);
+
tmpbuf = gst_buffer_new ();
old_min_size = 0;
/* Synchronization loop */
for (;;) {
+ /* note: if subclass indicates MAX fsize,
+ * this will not likely be available anyway ... */
min_size = MAX (parse->priv->min_frame_size, fsize);
av = gst_adapter_available (parse->priv->adapter);
/* always pass all available data */
data = gst_adapter_map (parse->priv->adapter, av);
- gst_buffer_take_memory (tmpbuf,
+ gst_buffer_take_memory (tmpbuf, -1,
gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY,
- (gpointer) data, NULL, min_size, 0, min_size));
+ (gpointer) data, NULL, av, 0, av));
GST_BUFFER_OFFSET (tmpbuf) = parse->priv->offset;
if (parse->priv->discont) {
res = bclass->check_valid_frame (parse, frame, &fsize, &skip);
gst_adapter_unmap (parse->priv->adapter, 0);
gst_buffer_replace (&frame->buffer, NULL);
+ gst_buffer_remove_memory_range (tmpbuf, 0, -1);
if (res) {
if (gst_adapter_available (parse->priv->adapter) < fsize) {
- GST_DEBUG_OBJECT (parse,
- "found valid frame but not enough data available (only %d bytes)",
+ GST_DEBUG_OBJECT (parse, "found valid frame but not enough data"
+ " available (only %" G_GSIZE_FORMAT " bytes)",
gst_adapter_available (parse->priv->adapter));
gst_buffer_unref (tmpbuf);
goto done;
if (gst_buffer_get_size (parse->priv->cache) < size) {
GST_DEBUG_OBJECT (parse, "Returning short buffer at offset %"
- G_GUINT64_FORMAT ": wanted %u bytes, got %u bytes", parse->priv->offset,
- size, gst_buffer_get_size (parse->priv->cache));
+ G_GUINT64_FORMAT ": wanted %u bytes, got %" G_GSIZE_FORMAT " bytes",
+ parse->priv->offset, size, gst_buffer_get_size (parse->priv->cache));
*buffer = parse->priv->cache;
parse->priv->cache = NULL;
if (!parse->priv->last_offset || parse->priv->last_ts <= parse->segment.start) {
GST_DEBUG_OBJECT (parse, "past start of segment %" GST_TIME_FORMAT,
GST_TIME_ARGS (parse->segment.start));
- ret = GST_FLOW_UNEXPECTED;
+ ret = GST_FLOW_EOS;
goto exit;
}
if (parse->priv->exact_position) {
offset = gst_base_parse_find_offset (parse, ts, TRUE, NULL);
} else {
- GstFormat dstformat = GST_FORMAT_BYTES;
-
if (!gst_pad_query_convert (parse->srcpad, GST_FORMAT_TIME, ts,
- &dstformat, &offset)) {
+ GST_FORMAT_BYTES, &offset)) {
GST_DEBUG_OBJECT (parse, "conversion failed, only BYTE based");
}
}
GST_LOG_OBJECT (parse, "scanning for frame at offset %" G_GUINT64_FORMAT
" (%#" G_GINT64_MODIFIER "x)", parse->priv->offset, parse->priv->offset);
+ /* let's make this efficient for all subclass once and for all;
+ * maybe it does not need this much, but in the latter case, we know we are
+ * in pull mode here and might as well try to read and supply more anyway
+ * (so does the buffer caching mechanism) */
+ fsize = 64 * 1024;
+
while (TRUE) {
gboolean res;
if (gst_buffer_get_size (buffer) < min_size)
parse->priv->drain = TRUE;
+ if (parse->priv->detecting) {
+ ret = klass->detect (parse, buffer);
+ if (ret == GST_FLOW_NOT_NEGOTIATED) {
+ /* If draining we error out, otherwise request a buffer
+ * with 64kb more */
+ if (parse->priv->drain) {
+ gst_buffer_unref (buffer);
+ GST_ERROR_OBJECT (parse, "Failed to detect format but draining");
+ return GST_FLOW_ERROR;
+ } else {
+ fsize += 64 * 1024;
+ gst_buffer_unref (buffer);
+ continue;
+ }
+ } else if (ret != GST_FLOW_OK) {
+ gst_buffer_unref (buffer);
+ GST_ERROR_OBJECT (parse, "detect() returned %s",
+ gst_flow_get_name (ret));
+ return ret;
+ }
+
+ /* Else handle this buffer normally */
+ }
+
skip = -1;
gst_base_parse_frame_update (parse, frame, buffer);
res = klass->check_valid_frame (parse, frame, &fsize, &skip);
if (!parse->priv->discont)
parse->priv->sync_offset = parse->priv->offset;
parse->priv->discont = TRUE;
- /* something changed least; nullify loop check */
+ /* something changed at least; nullify loop check */
+ if (fsize == G_MAXUINT)
+ fsize = old_min_size + 64 * 1024;
old_min_size = 0;
}
/* skip == 0 should imply subclass set min_size to need more data;
goto done;
if (gst_buffer_get_size (outbuf) < fsize) {
gst_buffer_unref (outbuf);
- ret = GST_FLOW_UNEXPECTED;
+ ret = GST_FLOW_EOS;
}
}
ret = gst_base_parse_handle_and_push_frame (parse, klass, &frame);
/* eat expected eos signalling past segment in reverse playback */
- if (parse->segment.rate < 0.0 && ret == GST_FLOW_UNEXPECTED &&
+ if (parse->segment.rate < 0.0 && ret == GST_FLOW_EOS &&
parse->segment.position >= parse->segment.stop) {
GST_DEBUG_OBJECT (parse, "downstream has reached end of segment");
/* push what was accumulated during loop run */
}
done:
- if (ret == GST_FLOW_UNEXPECTED)
+ if (ret == GST_FLOW_EOS)
goto eos;
else if (ret != GST_FLOW_OK)
goto pause;
/* ERRORS */
eos:
{
- ret = GST_FLOW_UNEXPECTED;
+ ret = GST_FLOW_EOS;
GST_DEBUG_OBJECT (parse, "eos");
/* fall-through */
}
gst_flow_get_name (ret));
gst_pad_pause_task (parse->sinkpad);
- if (ret == GST_FLOW_UNEXPECTED) {
+ if (ret == GST_FLOW_EOS) {
/* handle end-of-stream/segment */
if (parse->segment.flags & GST_SEEK_FLAG_SEGMENT) {
gint64 stop;
}
push_eos = TRUE;
}
- } else if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_UNEXPECTED) {
+ } else if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS) {
/* for fatal errors we post an error message, wrong-state is
* not fatal because it happens due to flushes and only means
* that we should stop now. */
{
GstBaseParse *parse;
gboolean result = TRUE;
+ GstQuery *query;
+ gboolean pull_mode;
parse = GST_BASE_PARSE (gst_pad_get_parent (sinkpad));
GST_DEBUG_OBJECT (parse, "sink activate");
- if (gst_pad_check_pull_range (sinkpad)) {
+ query = gst_query_new_scheduling ();
+ result = gst_pad_peer_query (sinkpad, query);
+ if (result) {
+ gst_query_parse_scheduling (query, &pull_mode, NULL, NULL, NULL, NULL,
+ NULL);
+ } else {
+ pull_mode = FALSE;
+ }
+ gst_query_unref (query);
+
+ if (pull_mode) {
GST_DEBUG_OBJECT (parse, "trying to activate in pull mode");
result = gst_pad_activate_pull (sinkpad, TRUE);
} else {
gst_base_parse_activate (GstBaseParse * parse, gboolean active)
{
GstBaseParseClass *klass;
- gboolean result = FALSE;
+ gboolean result = TRUE;
GST_DEBUG_OBJECT (parse, "activate %d", active);
if (active) {
if (parse->priv->pad_mode == GST_ACTIVATE_NONE && klass->start)
result = klass->start (parse);
+
+ /* If the subclass implements ::detect we want to
+ * call it for the first buffers now */
+ parse->priv->detecting = (klass->detect != NULL);
} else {
/* We must make sure streaming has finished before resetting things
* and calling the ::stop vfunc */
* parsing, and the parser should operate in passthrough mode (which only
* applies when operating in push mode). That is, incoming buffers are
* pushed through unmodified, i.e. no @check_valid_frame or @parse_frame
- * callbacks will be invoked, but @pre_push_buffer will still be invoked,
+ * callbacks will be invoked, but @pre_push_frame will still be invoked,
* so subclass can perform as much or as little is appropriate for
- * passthrough semantics in @pre_push_buffer.
+ * passthrough semantics in @pre_push_frame.
*
* Since: 0.10.33
*/
GST_INFO_OBJECT (parse, "passthrough: %s", (passthrough) ? "yes" : "no");
}
+/**
+ * gst_base_parse_set_latency:
+ * @parse: a #GstBaseParse
+ * @min_latency: minimum parse latency
+ * @max_latency: maximum parse latency
+ *
+ * Sets the minimum and maximum (which may likely be equal) latency introduced
+ * by the parsing process. If there is such a latency, which depends on the
+ * particular parsing of the format, it typically corresponds to 1 frame duration.
+ *
+ * Since: 0.10.34
+ */
+void
+gst_base_parse_set_latency (GstBaseParse * parse, GstClockTime min_latency,
+ GstClockTime max_latency)
+{
+ GST_OBJECT_LOCK (parse);
+ parse->priv->min_latency = min_latency;
+ parse->priv->max_latency = max_latency;
+ GST_OBJECT_UNLOCK (parse);
+ GST_INFO_OBJECT (parse, "min/max latency %" GST_TIME_FORMAT ", %"
+ GST_TIME_FORMAT, GST_TIME_ARGS (min_latency),
+ GST_TIME_ARGS (max_latency));
+}
+
static gboolean
gst_base_parse_get_duration (GstBaseParse * parse, GstFormat format,
GstClockTime * duration)
GST_QUERY_FORMATS,
GST_QUERY_SEEKING,
GST_QUERY_CONVERT,
- 0
+ GST_QUERY_NONE
};
return list;
}
static gboolean
-gst_base_parse_query (GstPad * pad, GstQuery ** query)
+gst_base_parse_query (GstPad * pad, GstQuery * query)
{
GstBaseParse *parse;
gboolean res = FALSE;
GST_LOG_OBJECT (parse, "handling query: %" GST_PTR_FORMAT, query);
- switch (GST_QUERY_TYPE (*query)) {
+ switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_POSITION:
{
gint64 dest_value;
GstFormat format;
GST_DEBUG_OBJECT (parse, "position query");
- gst_query_parse_position (*query, &format, NULL);
-
- GST_OBJECT_LOCK (parse);
- if (format == GST_FORMAT_BYTES) {
- dest_value = parse->priv->offset;
- res = TRUE;
- } else if (format == parse->segment.format &&
- GST_CLOCK_TIME_IS_VALID (parse->segment.position)) {
- dest_value = parse->segment.position;
- res = TRUE;
- }
- GST_OBJECT_UNLOCK (parse);
+ gst_query_parse_position (query, &format, NULL);
- if (res) {
- *query = gst_query_make_writable (*query);
- gst_query_set_position (*query, format, dest_value);
- } else {
- res = gst_pad_query_default (pad, query);
+ /* try upstream first */
+ res = gst_pad_query_default (pad, query);
+ if (!res) {
+ /* Fall back on interpreting segment */
+ GST_OBJECT_LOCK (parse);
+ if (format == GST_FORMAT_BYTES) {
+ dest_value = parse->priv->offset;
+ res = TRUE;
+ } else if (format == parse->segment.format &&
+ GST_CLOCK_TIME_IS_VALID (parse->segment.position)) {
+ dest_value = gst_segment_to_stream_time (&parse->segment,
+ parse->segment.format, parse->segment.position);
+ res = TRUE;
+ }
+ GST_OBJECT_UNLOCK (parse);
if (!res) {
/* no precise result, upstream no idea either, then best estimate */
/* priv->offset is updated in both PUSH/PULL modes */
res = gst_base_parse_convert (parse,
GST_FORMAT_BYTES, parse->priv->offset, format, &dest_value);
}
+ if (res)
+ gst_query_set_position (query, format, dest_value);
}
break;
}
GstClockTime duration;
GST_DEBUG_OBJECT (parse, "duration query");
- gst_query_parse_duration (*query, &format, NULL);
+ gst_query_parse_duration (query, &format, NULL);
/* consult upstream */
res = gst_pad_query_default (pad, query);
/* otherwise best estimate from us */
if (!res) {
res = gst_base_parse_get_duration (parse, format, &duration);
- if (res) {
- *query = gst_query_make_writable (*query);
- gst_query_set_duration (*query, format, duration);
- }
+ if (res)
+ gst_query_set_duration (query, format, duration);
}
break;
}
gboolean seekable = FALSE;
GST_DEBUG_OBJECT (parse, "seeking query");
- gst_query_parse_seeking (*query, &fmt, NULL, NULL, NULL);
+ gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
/* consult upstream */
res = gst_pad_query_default (pad, query);
/* we may be able to help if in TIME */
if (fmt == GST_FORMAT_TIME && gst_base_parse_is_seekable (parse)) {
- gst_query_parse_seeking (*query, &fmt, &seekable, NULL, NULL);
+ gst_query_parse_seeking (query, &fmt, &seekable, NULL, NULL);
/* already OK if upstream takes care */
GST_LOG_OBJECT (parse, "upstream handled %d, seekable %d",
res, seekable);
GST_LOG_OBJECT (parse, "already determine upstream seekabled: %d",
seekable);
}
- *query = gst_query_make_writable (*query);
- gst_query_set_seeking (*query, GST_FORMAT_TIME, seekable, 0,
- duration);
+ gst_query_set_seeking (query, GST_FORMAT_TIME, seekable, 0, duration);
res = TRUE;
}
}
break;
}
case GST_QUERY_FORMATS:
- *query = gst_query_make_writable (*query);
- gst_query_set_formatsv (*query, 3, fmtlist);
+ gst_query_set_formatsv (query, 3, fmtlist);
res = TRUE;
break;
case GST_QUERY_CONVERT:
GstFormat src_format, dest_format;
gint64 src_value, dest_value;
- gst_query_parse_convert (*query, &src_format, &src_value,
+ gst_query_parse_convert (query, &src_format, &src_value,
&dest_format, &dest_value);
res = gst_base_parse_convert (parse, src_format, src_value,
dest_format, &dest_value);
if (res) {
- *query = gst_query_make_writable (*query);
- gst_query_set_convert (*query, src_format, src_value,
+ gst_query_set_convert (query, src_format, src_value,
dest_format, dest_value);
}
break;
}
+ case GST_QUERY_LATENCY:
+ {
+ if ((res = gst_pad_peer_query (parse->sinkpad, query))) {
+ gboolean live;
+ GstClockTime min_latency, max_latency;
+
+ gst_query_parse_latency (query, &live, &min_latency, &max_latency);
+ GST_DEBUG_OBJECT (parse, "Peer latency: live %d, min %"
+ GST_TIME_FORMAT " max %" GST_TIME_FORMAT, live,
+ GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
+
+ GST_OBJECT_LOCK (parse);
+ /* add our latency */
+ if (min_latency != -1)
+ min_latency += parse->priv->min_latency;
+ if (max_latency != -1)
+ max_latency += parse->priv->max_latency;
+ GST_OBJECT_UNLOCK (parse);
+
+ gst_query_set_latency (query, live, min_latency, max_latency);
+ }
+ break;
+ }
default:
res = gst_pad_query_default (pad, query);
break;
buf = frame.buffer;
GST_LOG_OBJECT (parse,
"peek parsing frame at offset %" G_GUINT64_FORMAT
- " (%#" G_GINT64_MODIFIER "x) of size %d",
+ " (%#" G_GINT64_MODIFIER "x) of size %" G_GSIZE_FORMAT,
GST_BUFFER_OFFSET (buf), GST_BUFFER_OFFSET (buf),
gst_buffer_get_size (buf));
GST_TIME_ARGS (time), newpos);
ret = gst_base_parse_find_frame (parse, &newpos, &newtime, &dur);
- if (ret == GST_FLOW_UNEXPECTED) {
+ if (ret == GST_FLOW_EOS) {
/* heuristic HACK */
hpos = MAX (lpos, hpos - chunk);
continue;
gboolean flush, update, res = TRUE, accurate;
gint64 cur, stop, seekpos, seekstop;
GstSegment seeksegment = { 0, };
- GstFormat dstformat;
GstClockTime start_ts;
gst_event_parse_seek (event, &rate, &format, &flags,
NULL);
} else {
start_ts = seeksegment.position;
- dstformat = GST_FORMAT_BYTES;
if (!gst_pad_query_convert (parse->srcpad, format, seeksegment.position,
- &dstformat, &seekpos))
+ GST_FORMAT_BYTES, &seekpos))
goto convert_failed;
if (!gst_pad_query_convert (parse->srcpad, format, seeksegment.stop,
- &dstformat, &seekstop))
+ GST_FORMAT_BYTES, &seekstop))
goto convert_failed;
}
/* prepare for streaming again */
if (flush) {
GST_DEBUG_OBJECT (parse, "sending flush stop");
- gst_pad_push_event (parse->srcpad, gst_event_new_flush_stop ());
- gst_pad_push_event (parse->sinkpad, gst_event_new_flush_stop ());
+ gst_pad_push_event (parse->srcpad, gst_event_new_flush_stop (TRUE));
+ gst_pad_push_event (parse->sinkpad, gst_event_new_flush_stop (TRUE));
gst_base_parse_clear_queues (parse);
} else {
/* keep track of our position */
} else {
GstEvent *new_event;
GstBaseParseSeek *seek;
+ GstSeekFlags flags = (flush ? GST_SEEK_FLAG_FLUSH : GST_SEEK_FLAG_NONE);
/* The only thing we need to do in PUSH-mode is to send the
seek event (in bytes) to upstream. Segment / flush handling happens
in corresponding src event handlers */
GST_DEBUG_OBJECT (parse, "seek in PUSH mode");
- if (seekstop >= 0 && seekpos <= seekpos)
+ if (seekstop >= 0 && seekstop <= seekpos)
seekstop = seekpos;
- new_event = gst_event_new_seek (rate, GST_FORMAT_BYTES, flush,
+ new_event = gst_event_new_seek (rate, GST_FORMAT_BYTES, flags,
GST_SEEK_TYPE_SET, seekpos, stop_type, seekstop);
/* store segment info so its precise details can be reconstructed when
}
}
-static gboolean
-gst_base_parse_sink_setcaps (GstPad * pad, GstCaps * caps)
+static GstCaps *
+gst_base_parse_sink_getcaps (GstPad * pad, GstCaps * filter)
{
GstBaseParse *parse;
GstBaseParseClass *klass;
- gboolean res = TRUE;
+ GstCaps *caps;
- parse = GST_BASE_PARSE (GST_PAD_PARENT (pad));
+ parse = GST_BASE_PARSE (gst_pad_get_parent (pad));
klass = GST_BASE_PARSE_GET_CLASS (parse);
+ g_assert (pad == GST_BASE_PARSE_SINK_PAD (parse));
- GST_DEBUG_OBJECT (parse, "caps: %" GST_PTR_FORMAT, caps);
+ if (klass->get_sink_caps)
+ caps = klass->get_sink_caps (parse, filter);
+ else
+ caps = gst_pad_proxy_getcaps (pad, filter);
+ gst_object_unref (parse);
- if (klass->set_sink_caps)
- res = klass->set_sink_caps (parse, caps);
+ GST_LOG_OBJECT (parse, "sink getcaps returning caps %" GST_PTR_FORMAT, caps);
+
+ return caps;
- return res;
}
static void