X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=libs%2Fgst%2Fbase%2Fgstbaseparse.c;h=27661f1175cfa84e806da1971565c83362abec07;hb=eadebb172d69d331fab58708e4982b46ab8c1f38;hp=09dff8e9c04653744856afe1d6f61d644209d16d;hpb=b5c3e254b10e6332397ba59fd5b6389f8a80741d;p=platform%2Fupstream%2Fgstreamer.git diff --git a/libs/gst/base/gstbaseparse.c b/libs/gst/base/gstbaseparse.c index 09dff8e..27661f1 100644 --- a/libs/gst/base/gstbaseparse.c +++ b/libs/gst/base/gstbaseparse.c @@ -50,14 +50,16 @@ * * Set-up phase * - * GstBaseParse class calls @set_sink_caps to inform the subclass about - * incoming sinkpad caps. Subclass should set the srcpad caps accordingly. - * - * * GstBaseParse calls @start to inform subclass that data processing is * about to start now. * * + * GstBaseParse class calls @set_sink_caps to inform the subclass about + * incoming sinkpad caps. Subclass could already set the srcpad caps + * accordingly, but this might be delayed until calling + * gst_base_parse_finish_frame() with a non-queued frame. + * + * * At least at this point subclass needs to tell the GstBaseParse class * how big data chunks it wants to receive (min_frame_size). It can do * this with gst_base_parse_set_min_frame_size(). @@ -78,34 +80,39 @@ * * * A buffer of (at least) min_frame_size bytes is passed to subclass with - * @check_valid_frame. Subclass checks the contents and returns TRUE - * if the buffer contains a valid frame. It also needs to set the - * @framesize according to the detected frame size. If buffer didn't - * contain a valid frame, this call must return FALSE and optionally - * set the @skipsize value to inform base class that how many bytes - * it needs to skip in order to find a valid frame. @framesize can always - * indicate a new minimum for current frame parsing. Indicating G_MAXUINT - * for requested amount means subclass simply needs best available - * subsequent data. In push mode this amounts to an additional input buffer - * (thus minimal additional latency), in pull mode this amounts to some - * arbitrary reasonable buffer size increase. The passed buffer - * is read-only. Note that @check_valid_frame might receive any small - * amount of input data when leftover data is being drained (e.g. at EOS). - * - * - * After valid frame is found, it will be passed again to subclass with - * @parse_frame call. Now subclass is responsible for parsing the - * frame contents and setting the caps, and buffer metadata (e.g. - * buffer timestamp and duration, or keyframe if applicable). + * @handle_frame. Subclass checks the contents and can optionally + * return GST_FLOW_OK along with an amount of data to be skipped to find + * a valid frame (which will result in a subsequent DISCONT). + * If, otherwise, the buffer does not hold a complete frame, + * @handle_frame can merely return and will be called again when additional + * data is available. In push mode this amounts to an + * additional input buffer (thus minimal additional latency), in pull mode + * this amounts to some arbitrary reasonable buffer size increase. + * Of course, gst_base_parse_set_min_size() could also be used if a very + * specific known amount of additional data is required. + * If, however, the buffer holds a complete valid frame, it can pass + * the size of this frame to gst_base_parse_finish_frame(). + * If acting as a converter, it can also merely indicate consumed input data + * while simultaneously providing custom output data. + * Note that baseclass performs some processing (such as tracking + * overall consumed data rate versus duration) for each finished frame, + * but other state is only updated upon each call to @handle_frame + * (such as tracking upstream input timestamp). + * + * Subclass is also responsible for setting the buffer metadata + * (e.g. buffer timestamp and duration, or keyframe if applicable). * (although the latter can also be done by GstBaseParse if it is * appropriately configured, see below). Frame is provided with * timestamp derived from upstream (as much as generally possible), * duration obtained from configuration (see below), and offset * if meaningful (in pull mode). + * + * Note that @check_valid_frame might receive any small + * amount of input data when leftover data is being drained (e.g. at EOS). * * - * Finally the buffer can be pushed downstream and the parsing loop starts - * over again. Just prior to actually pushing the buffer in question, + * As part of finish frame processing, + * just prior to actually pushing the buffer in question, * it is passed to @pre_push_frame which gives subclass yet one * last chance to examine buffer metadata, or to send some custom (tag) * events, or to perform custom (segment) filtering. @@ -154,12 +161,9 @@ * done with gst_base_parse_set_min_frame_size() function. * * - * Examine data chunks passed to subclass with @check_valid_frame - * and tell if they contain a valid frame - * - * - * Set the caps and timestamp to frame that is passed to subclass with - * @parse_frame function. + * Examine data chunks passed to subclass with @handle_frame and pass + * proper frame(s) to gst_base_parse_finish_frame(), and setting src pad + * caps and timestamps on frame. * * Provide conversion functions * @@ -205,6 +209,11 @@ #include "gstbaseparse.h" +/* FIXME: get rid of old GstIndex code */ +#include "gstindex.h" +#include "gstindex.c" +#include "gstmemindex.c" + #define GST_BASE_PARSE_FRAME_PRIVATE_FLAG_NOALLOC (1 << 0) #define MIN_FRAMES_TO_POST_BITRATE 10 @@ -226,7 +235,7 @@ static const GstFormat fmtlist[] = { struct _GstBaseParsePrivate { - GstPadActivateMode pad_mode; + GstPadMode pad_mode; GstAdapter *adapter; @@ -257,6 +266,7 @@ struct _GstBaseParsePrivate GstClockTime frame_duration; gboolean seen_keyframe; gboolean is_video; + gint flushed; guint64 framecount; guint64 bytecount; @@ -273,8 +283,6 @@ struct _GstBaseParsePrivate guint max_bitrate; guint posted_avg_bitrate; - GList *pending_events; - /* frames/buffers that are queued and ready to go on OK */ GQueue queued_frames; @@ -284,7 +292,7 @@ struct _GstBaseParsePrivate GstIndex *index; gint index_id; gboolean own_index; - GStaticMutex index_lock; + GMutex index_lock; /* seek table entries only maintained if upstream is BYTE seekable */ gboolean upstream_seekable; @@ -304,19 +312,25 @@ struct _GstBaseParsePrivate /* reverse playback */ GSList *buffers_pending; + GSList *buffers_head; GSList *buffers_queued; GSList *buffers_send; GstClockTime last_ts; gint64 last_offset; + /* Pending serialized events */ + GList *pending_events; /* Newsegment event to be sent after SEEK */ - GstEvent *pending_segment; + gboolean pending_segment; - /* Segment event that closes the running segment prior to SEEK */ - GstEvent *close_segment; - - /* push mode helper frame */ - GstBaseParseFrame frame; + /* offset of last parsed frame/data */ + gint64 prev_offset; + /* force a new frame, regardless of offset */ + gboolean new_frame; + /* whether we are merely scanning for a frame */ + gboolean scanning; + /* ... and resulting frame, if any */ + GstBaseParseFrame *scanned_frame; /* TRUE if we're still detecting the format, i.e. * if ::detect() is still called for future buffers */ @@ -333,6 +347,11 @@ typedef struct _GstBaseParseSeek GstClockTime start_ts; } GstBaseParseSeek; +#define GST_BASE_PARSE_INDEX_LOCK(parse) \ + g_mutex_lock (&parse->priv->index_lock); +#define GST_BASE_PARSE_INDEX_UNLOCK(parse) \ + g_mutex_unlock (&parse->priv->index_lock); + static GstElementClass *parent_class = NULL; static void gst_base_parse_class_init (GstBaseParseClass * klass); @@ -371,29 +390,33 @@ static GstStateChangeReturn gst_base_parse_change_state (GstElement * element, GstStateChange transition); static void gst_base_parse_reset (GstBaseParse * parse); +#if 0 static void gst_base_parse_set_index (GstElement * element, GstIndex * index); static GstIndex *gst_base_parse_get_index (GstElement * element); +#endif -static gboolean gst_base_parse_sink_activate (GstPad * sinkpad); -static gboolean gst_base_parse_sink_activate_push (GstPad * pad, - gboolean active); -static gboolean gst_base_parse_sink_activate_pull (GstPad * pad, - gboolean active); +static gboolean gst_base_parse_sink_activate (GstPad * sinkpad, + GstObject * parent); +static gboolean gst_base_parse_sink_activate_mode (GstPad * pad, + GstObject * parent, GstPadMode mode, gboolean active); static gboolean gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event); static void gst_base_parse_handle_tag (GstBaseParse * parse, GstEvent * event); -static gboolean gst_base_parse_src_event (GstPad * pad, GstEvent * event); -static gboolean gst_base_parse_src_query (GstPad * pad, GstQuery * query); +static gboolean gst_base_parse_src_event (GstPad * pad, GstObject * parent, + GstEvent * event); +static gboolean gst_base_parse_src_query (GstPad * pad, GstObject * parent, + GstQuery * query); -static gboolean gst_base_parse_sink_event (GstPad * pad, GstEvent * event); -static gboolean gst_base_parse_sink_query (GstPad * pad, GstQuery * query); +static gboolean gst_base_parse_sink_event (GstPad * pad, GstObject * parent, + GstEvent * event); +static gboolean gst_base_parse_sink_query (GstPad * pad, GstObject * parent, + GstQuery * query); -static GstFlowReturn gst_base_parse_chain (GstPad * pad, GstBuffer * buffer); +static GstFlowReturn gst_base_parse_chain (GstPad * pad, GstObject * parent, + GstBuffer * buffer); static void gst_base_parse_loop (GstPad * pad); -static gboolean gst_base_parse_check_frame (GstBaseParse * parse, - GstBaseParseFrame * frame, guint * framesize, gint * skipsize); static GstFlowReturn gst_base_parse_parse_frame (GstBaseParse * parse, GstBaseParseFrame * frame); @@ -413,8 +436,11 @@ static gint64 gst_base_parse_find_offset (GstBaseParse * parse, static GstFlowReturn gst_base_parse_locate_time (GstBaseParse * parse, GstClockTime * _time, gint64 * _offset); -static GstFlowReturn gst_base_parse_process_fragment (GstBaseParse * parse, - gboolean push_only); +static GstFlowReturn gst_base_parse_start_fragment (GstBaseParse * parse); +static GstFlowReturn gst_base_parse_finish_fragment (GstBaseParse * parse, + gboolean prev_head); + +static inline GstFlowReturn gst_base_parse_check_sync (GstBaseParse * parse); static gboolean gst_base_parse_is_seekable (GstBaseParse * parse); @@ -428,6 +454,9 @@ gst_base_parse_clear_queues (GstBaseParse * parse) NULL); g_slist_free (parse->priv->buffers_pending); parse->priv->buffers_pending = NULL; + g_slist_foreach (parse->priv->buffers_head, (GFunc) gst_buffer_unref, NULL); + g_slist_free (parse->priv->buffers_head); + parse->priv->buffers_head = NULL; g_slist_foreach (parse->priv->buffers_send, (GFunc) gst_buffer_unref, NULL); g_slist_free (parse->priv->buffers_send); parse->priv->buffers_send = NULL; @@ -436,25 +465,26 @@ gst_base_parse_clear_queues (GstBaseParse * parse) g_list_free (parse->priv->detect_buffers); parse->priv->detect_buffers = NULL; parse->priv->detect_buffers_size = 0; + + g_queue_foreach (&parse->priv->queued_frames, + (GFunc) gst_base_parse_frame_free, NULL); + g_queue_clear (&parse->priv->queued_frames); + + gst_buffer_replace (&parse->priv->cache, NULL); + + g_list_foreach (parse->priv->pending_events, (GFunc) gst_event_unref, NULL); + g_list_free (parse->priv->pending_events); + parse->priv->pending_events = NULL; + parse->priv->pending_segment = FALSE; } static void gst_base_parse_finalize (GObject * object) { GstBaseParse *parse = GST_BASE_PARSE (object); - GstEvent **p_ev; g_object_unref (parse->priv->adapter); - if (parse->priv->pending_segment) { - p_ev = &parse->priv->pending_segment; - gst_event_replace (p_ev, NULL); - } - if (parse->priv->close_segment) { - p_ev = &parse->priv->close_segment; - gst_event_replace (p_ev, NULL); - } - if (parse->priv->cache) { gst_buffer_unref (parse->priv->cache); parse->priv->cache = NULL; @@ -464,17 +494,13 @@ gst_base_parse_finalize (GObject * object) NULL); g_list_free (parse->priv->pending_events); parse->priv->pending_events = NULL; - - g_queue_foreach (&parse->priv->queued_frames, - (GFunc) gst_base_parse_frame_free, NULL); - g_queue_clear (&parse->priv->queued_frames); + parse->priv->pending_segment = FALSE; if (parse->priv->index) { gst_object_unref (parse->priv->index); parse->priv->index = NULL; } - - g_static_mutex_free (&parse->priv->index_lock); + g_mutex_clear (&parse->priv->index_lock); gst_base_parse_clear_queues (parse); @@ -495,12 +521,14 @@ gst_base_parse_class_init (GstBaseParseClass * klass) gstelement_class = (GstElementClass *) klass; gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_base_parse_change_state); + +#if 0 gstelement_class->set_index = GST_DEBUG_FUNCPTR (gst_base_parse_set_index); gstelement_class->get_index = GST_DEBUG_FUNCPTR (gst_base_parse_get_index); +#endif /* Default handlers */ - klass->check_valid_frame = gst_base_parse_check_frame; - klass->parse_frame = gst_base_parse_parse_frame; + klass->sink_event = gst_base_parse_sink_eventfunc; klass->src_event = gst_base_parse_src_eventfunc; klass->convert = gst_base_parse_convert_default; @@ -529,10 +557,9 @@ gst_base_parse_init (GstBaseParse * parse, GstBaseParseClass * bclass) GST_DEBUG_FUNCPTR (gst_base_parse_chain)); gst_pad_set_activate_function (parse->sinkpad, GST_DEBUG_FUNCPTR (gst_base_parse_sink_activate)); - gst_pad_set_activatepush_function (parse->sinkpad, - GST_DEBUG_FUNCPTR (gst_base_parse_sink_activate_push)); - gst_pad_set_activatepull_function (parse->sinkpad, - GST_DEBUG_FUNCPTR (gst_base_parse_sink_activate_pull)); + gst_pad_set_activatemode_function (parse->sinkpad, + GST_DEBUG_FUNCPTR (gst_base_parse_sink_activate_mode)); + GST_PAD_SET_PROXY_ALLOCATION (parse->sinkpad); gst_element_add_pad (GST_ELEMENT (parse), parse->sinkpad); GST_DEBUG_OBJECT (parse, "sinkpad created"); @@ -553,13 +580,15 @@ gst_base_parse_init (GstBaseParse * parse, GstBaseParseClass * bclass) parse->priv->adapter = gst_adapter_new (); - parse->priv->pad_mode = GST_PAD_ACTIVATE_NONE; + parse->priv->pad_mode = GST_PAD_MODE_NONE; - g_static_mutex_init (&parse->priv->index_lock); + g_mutex_init (&parse->priv->index_lock); /* init state */ gst_base_parse_reset (parse); GST_DEBUG_OBJECT (parse, "init ok"); + + GST_OBJECT_FLAG_SET (parse, GST_ELEMENT_FLAG_INDEXABLE); } static GstBaseParseFrame * @@ -593,21 +622,9 @@ gst_base_parse_frame_free (GstBaseParseFrame * frame) } } -GType -gst_base_parse_frame_get_type (void) -{ - static volatile gsize frame_type = 0; - - if (g_once_init_enter (&frame_type)) { - GType _type; - - _type = g_boxed_type_register_static ("GstBaseParseFrame", - (GBoxedCopyFunc) gst_base_parse_frame_copy, - (GBoxedFreeFunc) gst_base_parse_frame_free); - g_once_init_leave (&frame_type, _type); - } - return (GType) frame_type; -} +G_DEFINE_BOXED_TYPE (GstBaseParseFrame, gst_base_parse_frame, + (GBoxedCopyFunc) gst_base_parse_frame_copy, + (GBoxedFreeFunc) gst_base_parse_frame_free); /** * gst_base_parse_frame_init: @@ -642,8 +659,7 @@ gst_base_parse_frame_init (GstBaseParseFrame * frame) * then use gst_base_parse_frame_init() to initialise it. * * Returns: a newly-allocated #GstBaseParseFrame. Free with - * gst_base_parse_frame_free() when no longer needed, unless you gave - * away ownership to gst_base_parse_push_frame(). + * gst_base_parse_frame_free() when no longer needed. * * Since: 0.10.33 */ @@ -726,15 +742,11 @@ gst_base_parse_reset (GstBaseParse * parse) parse->priv->last_ts = GST_CLOCK_TIME_NONE; parse->priv->last_offset = 0; - if (parse->priv->pending_segment) { - gst_event_unref (parse->priv->pending_segment); - parse->priv->pending_segment = NULL; - } - g_list_foreach (parse->priv->pending_events, (GFunc) gst_mini_object_unref, NULL); g_list_free (parse->priv->pending_events); parse->priv->pending_events = NULL; + parse->priv->pending_segment = FALSE; if (parse->priv->cache) { gst_buffer_unref (parse->priv->cache); @@ -745,10 +757,10 @@ gst_base_parse_reset (GstBaseParse * parse) g_slist_free (parse->priv->pending_seeks); parse->priv->pending_seeks = NULL; - /* we know it is not alloc'ed, but maybe other stuff to free, some day ... */ - parse->priv->frame._private_flags |= - GST_BASE_PARSE_FRAME_PRIVATE_FLAG_NOALLOC; - gst_base_parse_frame_free (&parse->priv->frame); + if (parse->priv->adapter) + gst_adapter_clear (parse->priv->adapter); + + parse->priv->new_frame = TRUE; g_list_foreach (parse->priv->detect_buffers, (GFunc) gst_buffer_unref, NULL); g_list_free (parse->priv->detect_buffers); @@ -757,27 +769,6 @@ gst_base_parse_reset (GstBaseParse * parse) GST_OBJECT_UNLOCK (parse); } -/* gst_base_parse_check_frame: - * @parse: #GstBaseParse. - * @buffer: GstBuffer. - * @framesize: This will be set to tell the found frame size in bytes. - * @skipsize: Output parameter that tells how much data needs to be skipped - * in order to find the following frame header. - * - * Default callback for check_valid_frame. - * - * Returns: Always TRUE. - */ -static gboolean -gst_base_parse_check_frame (GstBaseParse * parse, - GstBaseParseFrame * frame, guint * framesize, gint * skipsize) -{ - *framesize = gst_buffer_get_size (frame->buffer); - *skipsize = 0; - return TRUE; -} - - /* gst_base_parse_parse_frame: * @parse: #GstBaseParse. * @buffer: #GstBuffer. @@ -863,25 +854,26 @@ gst_base_parse_convert (GstBaseParse * parse, * Returns: TRUE if the event was handled. */ static gboolean -gst_base_parse_sink_event (GstPad * pad, GstEvent * event) +gst_base_parse_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) { GstBaseParse *parse; GstBaseParseClass *bclass; - gboolean handled = FALSE; - gboolean ret = TRUE; + gboolean ret; - parse = GST_BASE_PARSE (gst_pad_get_parent (pad)); + parse = GST_BASE_PARSE (parent); bclass = GST_BASE_PARSE_GET_CLASS (parse); GST_DEBUG_OBJECT (parse, "handling event %d, %s", GST_EVENT_TYPE (event), GST_EVENT_TYPE_NAME (event)); - /* Cache all events except EOS, SEGMENT and FLUSH_STOP if we have a + /* Cache all serialized events except EOS, SEGMENT and FLUSH_STOP if we have a * pending segment */ - if (parse->priv->pending_segment && GST_EVENT_TYPE (event) != GST_EVENT_EOS + if (parse->priv->pending_segment && GST_EVENT_IS_SERIALIZED (event) + && GST_EVENT_TYPE (event) != GST_EVENT_EOS && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_START - && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) { + && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP + && GST_EVENT_TYPE (event) != GST_EVENT_CAPS) { if (GST_EVENT_TYPE (event) == GST_EVENT_TAG) /* See if any bitrate tags were posted */ @@ -891,24 +883,21 @@ gst_base_parse_sink_event (GstPad * pad, GstEvent * event) g_list_append (parse->priv->pending_events, event); ret = TRUE; } else { - if (GST_EVENT_TYPE (event) == GST_EVENT_EOS && parse->priv->framecount < MIN_FRAMES_TO_POST_BITRATE) /* We've not posted bitrate tags yet - do so now */ gst_base_parse_post_bitrates (parse, TRUE, TRUE, TRUE); - if (bclass->event) - handled = bclass->event (parse, event); - - if (!handled) - handled = gst_base_parse_sink_eventfunc (parse, event); - - if (!handled) - ret = gst_pad_event_default (pad, event); + if (bclass->sink_event) + ret = bclass->sink_event (parse, event); + else { + gst_event_unref (event); + ret = FALSE; + } } - gst_object_unref (parse); GST_DEBUG_OBJECT (parse, "event handled"); + return ret; } @@ -927,8 +916,7 @@ gst_base_parse_sink_event (GstPad * pad, GstEvent * event) static gboolean gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event) { - gboolean handled = FALSE; - GstEvent **eventp; + gboolean ret; switch (GST_EVENT_TYPE (event)) { case GST_EVENT_CAPS: @@ -942,11 +930,12 @@ gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event) GST_DEBUG_OBJECT (parse, "caps: %" GST_PTR_FORMAT, caps); if (klass->set_sink_caps) - klass->set_sink_caps (parse, caps); + ret = klass->set_sink_caps (parse, caps); + else + ret = TRUE; /* will send our own caps downstream */ gst_event_unref (event); - handled = TRUE; break; } case GST_EVENT_SEGMENT: @@ -1026,8 +1015,8 @@ gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event) gst_event_unref (event); out_segment.start = 0; - out_segment.stop = GST_CLOCK_TIME_NONE;; - out_segment.time = 0;; + out_segment.stop = GST_CLOCK_TIME_NONE; + out_segment.time = 0; event = gst_event_new_segment (&out_segment); @@ -1049,17 +1038,17 @@ gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event) /* save the segment for later, right before we push a new buffer so that * the caps are fixed and the next linked element can receive * the segment. */ - eventp = &parse->priv->pending_segment; - gst_event_replace (eventp, event); - gst_event_unref (event); - handled = TRUE; + parse->priv->pending_events = + g_list_append (parse->priv->pending_events, event); + parse->priv->pending_segment = TRUE; + ret = TRUE; /* but finish the current segment */ GST_DEBUG_OBJECT (parse, "draining current segment"); if (in_segment->rate > 0.0) gst_base_parse_drain (parse); else - gst_base_parse_process_fragment (parse, FALSE); + gst_base_parse_finish_fragment (parse, FALSE); gst_adapter_clear (parse->priv->adapter); parse->priv->offset = offset; @@ -1073,59 +1062,64 @@ gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event) case GST_EVENT_FLUSH_START: parse->priv->flushing = TRUE; - handled = gst_pad_push_event (parse->srcpad, gst_event_ref (event)); - if (handled) - gst_event_unref (event); + ret = gst_pad_push_event (parse->srcpad, event); /* Wait for _chain() to exit by taking the srcpad STREAM_LOCK */ GST_PAD_STREAM_LOCK (parse->srcpad); GST_PAD_STREAM_UNLOCK (parse->srcpad); - break; case GST_EVENT_FLUSH_STOP: + ret = gst_pad_push_event (parse->srcpad, event); gst_adapter_clear (parse->priv->adapter); gst_base_parse_clear_queues (parse); parse->priv->flushing = FALSE; parse->priv->discont = TRUE; parse->priv->last_ts = GST_CLOCK_TIME_NONE; - parse->priv->frame._private_flags |= - GST_BASE_PARSE_FRAME_PRIVATE_FLAG_NOALLOC; - gst_base_parse_frame_free (&parse->priv->frame); + parse->priv->new_frame = TRUE; break; case GST_EVENT_EOS: if (parse->segment.rate > 0.0) gst_base_parse_drain (parse); else - gst_base_parse_process_fragment (parse, FALSE); + gst_base_parse_finish_fragment (parse, TRUE); /* If we STILL have zero frames processed, fire an error */ if (parse->priv->framecount == 0) { GST_ELEMENT_ERROR (parse, STREAM, WRONG_TYPE, ("No valid frames found before end of stream"), (NULL)); } - /* newsegment before eos */ - if (parse->priv->pending_segment) { - gst_pad_push_event (parse->srcpad, parse->priv->pending_segment); - parse->priv->pending_segment = NULL; + /* newsegment and other serialized events before eos */ + if (G_UNLIKELY (parse->priv->pending_events)) { + GList *l; + + for (l = parse->priv->pending_events; l != NULL; l = l->next) { + gst_pad_push_event (parse->srcpad, GST_EVENT (l->data)); + } + g_list_free (parse->priv->pending_events); + parse->priv->pending_events = NULL; + parse->priv->pending_segment = FALSE; } + ret = gst_pad_push_event (parse->srcpad, event); break; default: + ret = + gst_pad_event_default (parse->sinkpad, GST_OBJECT_CAST (parse), + event); break; } - - return handled; + return ret; } static gboolean -gst_base_parse_sink_query (GstPad * pad, GstQuery * query) +gst_base_parse_sink_query (GstPad * pad, GstObject * parent, GstQuery * query) { GstBaseParse *parse; GstBaseParseClass *bclass; gboolean res; - parse = GST_BASE_PARSE (gst_pad_get_parent (pad)); + parse = GST_BASE_PARSE (parent); bclass = GST_BASE_PARSE_GET_CLASS (parse); switch (GST_QUERY_TYPE (query)) { @@ -1142,17 +1136,32 @@ gst_base_parse_sink_query (GstPad * pad, GstQuery * query) gst_caps_unref (caps); res = TRUE; - } else - res = gst_pad_peer_query (parse->srcpad, query); + } else { + GstCaps *caps, *template_caps, *filter; + + gst_query_parse_caps (query, &filter); + template_caps = gst_pad_get_pad_template_caps (pad); + if (filter != NULL) { + caps = + gst_caps_intersect_full (filter, template_caps, + GST_CAPS_INTERSECT_FIRST); + gst_caps_unref (template_caps); + } else { + caps = template_caps; + } + gst_query_set_caps_result (query, caps); + gst_caps_unref (caps); + + res = TRUE; + } break; } default: { - res = gst_pad_query_default (pad, query); + res = gst_pad_query_default (pad, parent, query); break; } } - gst_object_unref (parse); return res; } @@ -1167,26 +1176,23 @@ gst_base_parse_sink_query (GstPad * pad, GstQuery * query) * Returns: TRUE if the event was handled. */ static gboolean -gst_base_parse_src_event (GstPad * pad, GstEvent * event) +gst_base_parse_src_event (GstPad * pad, GstObject * parent, GstEvent * event) { GstBaseParse *parse; GstBaseParseClass *bclass; - gboolean handled = FALSE; gboolean ret = TRUE; - parse = GST_BASE_PARSE (gst_pad_get_parent (pad)); + parse = GST_BASE_PARSE (parent); bclass = GST_BASE_PARSE_GET_CLASS (parse); GST_DEBUG_OBJECT (parse, "event %d, %s", GST_EVENT_TYPE (event), GST_EVENT_TYPE_NAME (event)); if (bclass->src_event) - handled = bclass->src_event (parse, event); - - if (!handled) - ret = gst_pad_event_default (pad, event); + ret = bclass->src_event (parse, event); + else + gst_event_unref (event); - gst_object_unref (parse); return ret; } @@ -1209,20 +1215,19 @@ gst_base_parse_is_seekable (GstBaseParse * parse) static gboolean gst_base_parse_src_eventfunc (GstBaseParse * parse, GstEvent * event) { - gboolean handled = FALSE; + gboolean res = FALSE; switch (GST_EVENT_TYPE (event)) { case GST_EVENT_SEEK: - { - if (gst_base_parse_is_seekable (parse)) { - handled = gst_base_parse_handle_seek (parse, event); - } + if (gst_base_parse_is_seekable (parse)) + res = gst_base_parse_handle_seek (parse, event); break; - } default: + res = gst_pad_event_default (parse->srcpad, GST_OBJECT_CAST (parse), + event); break; } - return handled; + return res; } @@ -1265,13 +1270,13 @@ gst_base_parse_convert_default (GstBaseParse * parse, /* need at least some frames */ if (!parse->priv->framecount) - return FALSE; + goto no_framecount; duration = parse->priv->acc_duration / GST_MSECOND; bytes = parse->priv->bytecount; if (G_UNLIKELY (!duration || !bytes)) - return FALSE; + goto no_duration_bytes; if (src_format == GST_FORMAT_BYTES) { if (dest_format == GST_FORMAT_TIME) { @@ -1282,6 +1287,8 @@ gst_base_parse_convert_default (GstBaseParse * parse, GST_DEBUG_OBJECT (parse, "conversion result: %" G_GINT64_FORMAT " ms", *dest_value / GST_MSECOND); ret = TRUE; + } else { + GST_DEBUG_OBJECT (parse, "converting bytes -> other not implemented"); } } else if (src_format == GST_FORMAT_TIME) { if (dest_format == GST_FORMAT_BYTES) { @@ -1292,20 +1299,39 @@ gst_base_parse_convert_default (GstBaseParse * parse, "time %" G_GINT64_FORMAT " ms in bytes = %" G_GINT64_FORMAT, src_value / GST_MSECOND, *dest_value); ret = TRUE; + } else { + GST_DEBUG_OBJECT (parse, "converting time -> other not implemented"); } } else if (src_format == GST_FORMAT_DEFAULT) { /* DEFAULT == frame-based */ if (dest_format == GST_FORMAT_TIME) { + GST_DEBUG_OBJECT (parse, "converting default -> time"); if (parse->priv->fps_den) { *dest_value = gst_util_uint64_scale (src_value, GST_SECOND * parse->priv->fps_den, parse->priv->fps_num); ret = TRUE; } - } else if (dest_format == GST_FORMAT_BYTES) { + } else { + GST_DEBUG_OBJECT (parse, "converting default -> other not implemented"); } + } else { + GST_DEBUG_OBJECT (parse, "conversion not implemented"); } - return ret; + + /* ERRORS */ +no_framecount: + { + GST_DEBUG_OBJECT (parse, "no framecount"); + return FALSE; + } +no_duration_bytes: + { + GST_DEBUG_OBJECT (parse, "no duration %" G_GUINT64_FORMAT ", bytes %" + G_GUINT64_FORMAT, duration, bytes); + return FALSE; + } + } static void @@ -1467,14 +1493,6 @@ gst_base_parse_update_bitrates (GstBaseParse * parse, GstBaseParseFrame * frame) if ((update_min || update_avg || update_max)) gst_base_parse_post_bitrates (parse, update_min, update_avg, update_max); - /* If average bitrate changes that much and no valid (time) duration provided, - * then post a new duration message so applications can update their cached - * values */ - if (update_avg && !(parse->priv->duration_fmt == GST_FORMAT_TIME && - GST_CLOCK_TIME_IS_VALID (parse->priv->duration))) - gst_element_post_message (GST_ELEMENT (parse), - gst_message_new_duration (GST_OBJECT (parse), GST_FORMAT_TIME, -1)); - exit: return; } @@ -1551,11 +1569,12 @@ gst_base_parse_add_index_entry (GstBaseParse * parse, guint64 offset, associations[1].value = offset; /* index might change on-the-fly, although that would be nutty app ... */ - g_static_mutex_lock (&parse->priv->index_lock); + GST_BASE_PARSE_INDEX_LOCK (parse); gst_index_add_associationv (parse->priv->index, parse->priv->index_id, - (key) ? GST_ASSOCIATION_FLAG_KEY_UNIT : GST_ASSOCIATION_FLAG_DELTA_UNIT, - 2, (const GstIndexAssociation *) &associations); - g_static_mutex_unlock (&parse->priv->index_lock); + (key) ? GST_INDEX_ASSOCIATION_FLAG_KEY_UNIT : + GST_INDEX_ASSOCIATION_FLAG_DELTA_UNIT, 2, + (const GstIndexAssociation *) &associations); + GST_BASE_PARSE_INDEX_UNLOCK (parse); if (key) { parse->priv->index_last_offset = offset; @@ -1588,7 +1607,7 @@ gst_base_parse_check_seekability (GstBaseParse * parse) /* try harder to query upstream size if we didn't get it the first time */ if (seekable && stop == -1) { GST_DEBUG_OBJECT (parse, "doing duration query to fix up unset stop"); - gst_pad_query_peer_duration (parse->sinkpad, GST_FORMAT_BYTES, &stop); + gst_pad_peer_query_duration (parse->sinkpad, GST_FORMAT_BYTES, &stop); } /* if upstream doesn't know the size, it's likely that it's not seekable in @@ -1626,7 +1645,7 @@ gst_base_parse_check_upstream (GstBaseParse * parse) { gint64 stop; - if (gst_pad_query_peer_duration (parse->sinkpad, GST_FORMAT_TIME, &stop)) + if (gst_pad_peer_query_duration (parse->sinkpad, GST_FORMAT_TIME, &stop)) if (GST_CLOCK_TIME_IS_VALID (stop) && stop) { /* upstream has one, accept it also, and no further updates */ gst_base_parse_set_duration (parse, GST_FORMAT_TIME, stop, 0); @@ -1678,6 +1697,136 @@ gst_base_parse_queue_frame (GstBaseParse * parse, GstBaseParseFrame * frame) } } +/* makes sure that @buf is properly prepared and decorated for passing + * to baseclass, and an equally setup frame is returned setup with @buf. + * Takes ownership of @buf. */ +static GstBaseParseFrame * +gst_base_parse_prepare_frame (GstBaseParse * parse, GstBuffer * buffer) +{ + GstBaseParseFrame *frame = NULL; + + buffer = gst_buffer_make_writable (buffer); + + GST_LOG_OBJECT (parse, + "preparing frame at offset %" G_GUINT64_FORMAT + " (%#" G_GINT64_MODIFIER "x) of size %" G_GSIZE_FORMAT, + GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET (buffer), + gst_buffer_get_size (buffer)); + + if (parse->priv->discont) { + GST_DEBUG_OBJECT (parse, "marking DISCONT"); + GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); + parse->priv->discont = FALSE; + } + + GST_BUFFER_OFFSET (buffer) = parse->priv->offset; + + frame = gst_base_parse_frame_new (buffer, 0, 0); + + /* also ensure to update state flags */ + gst_base_parse_frame_update (parse, frame, buffer); + gst_buffer_unref (buffer); + + if (parse->priv->prev_offset != parse->priv->offset || parse->priv->new_frame) { + GST_LOG_OBJECT (parse, "marking as new frame"); + parse->priv->new_frame = FALSE; + frame->flags |= GST_BASE_PARSE_FRAME_FLAG_NEW_FRAME; + } + + frame->offset = parse->priv->prev_offset = parse->priv->offset; + + /* use default handler to provide initial (upstream) metadata */ + gst_base_parse_parse_frame (parse, frame); + + return frame; +} + +/* Wraps buffer in a frame and dispatches to subclass. + * Also manages data skipping and offset handling (including adapter flushing). + * Takes ownership of @buffer */ +static GstFlowReturn +gst_base_parse_handle_buffer (GstBaseParse * parse, GstBuffer * buffer, + gint * skip, gint * flushed) +{ + GstBaseParseClass *klass = GST_BASE_PARSE_GET_CLASS (parse); + GstBaseParseFrame *frame; + GstFlowReturn ret; + + g_return_val_if_fail (skip != NULL || flushed != NULL, GST_FLOW_ERROR); + + GST_LOG_OBJECT (parse, + "handling buffer of size %" G_GSIZE_FORMAT " with ts %" GST_TIME_FORMAT + ", duration %" GST_TIME_FORMAT, gst_buffer_get_size (buffer), + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)), + GST_TIME_ARGS (GST_BUFFER_DURATION (buffer))); + + /* track what is being flushed during this single round of frame processing */ + parse->priv->flushed = 0; + *skip = 0; + + /* make it easy for _finish_frame to pick up input data */ + if (parse->priv->pad_mode == GST_PAD_MODE_PULL) { + gst_buffer_ref (buffer); + gst_adapter_push (parse->priv->adapter, buffer); + } + + frame = gst_base_parse_prepare_frame (parse, buffer); + ret = klass->handle_frame (parse, frame, skip); + + *flushed = parse->priv->flushed; + + GST_LOG_OBJECT (parse, "handle_frame skipped %d, flushed %d", + *skip, *flushed); + + if (ret != GST_FLOW_OK) { + GST_DEBUG_OBJECT (parse, "handle_frame returned %d", ret); + goto exit; + } + + /* subclass can only do one of these, or semantics are too unclear */ + g_assert (*skip == 0 || *flushed == 0); + + /* track skipping */ + if (*skip > 0) { + GstClockTime timestamp; + GstBuffer *outbuf; + + GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", *skip); + if (parse->segment.rate < 0.0 && !parse->priv->buffers_queued) { + /* reverse playback, and no frames found yet, so we are skipping + * the leading part of a fragment, which may form the tail of + * fragment coming later, hopefully subclass skips efficiently ... */ + timestamp = gst_adapter_prev_timestamp (parse->priv->adapter, NULL); + outbuf = gst_adapter_take_buffer (parse->priv->adapter, *skip); + outbuf = gst_buffer_make_writable (outbuf); + GST_BUFFER_TIMESTAMP (outbuf) = timestamp; + parse->priv->buffers_head = + g_slist_prepend (parse->priv->buffers_head, outbuf); + outbuf = NULL; + } else { + gst_adapter_flush (parse->priv->adapter, *skip); + } + if (!parse->priv->discont) + parse->priv->sync_offset = parse->priv->offset; + parse->priv->offset += *skip; + parse->priv->discont = TRUE; + /* check for indefinite skipping */ + if (ret == GST_FLOW_OK) + ret = gst_base_parse_check_sync (parse); + } + + parse->priv->offset += *flushed; + +exit: + if (parse->priv->pad_mode == GST_PAD_MODE_PULL) { + gst_adapter_clear (parse->priv->adapter); + } + + gst_base_parse_frame_free (frame); + + return ret; +} + /* gst_base_parse_handle_and_push_buffer: * @parse: #GstBaseParse. * @klass: #GstBaseParseClass. @@ -1692,50 +1841,27 @@ gst_base_parse_queue_frame (GstBaseParse * parse, GstBaseParseFrame * frame) */ static GstFlowReturn gst_base_parse_handle_and_push_frame (GstBaseParse * parse, - GstBaseParseClass * klass, GstBaseParseFrame * frame) + GstBaseParseFrame * frame) { - GstFlowReturn ret; gint64 offset; GstBuffer *buffer; g_return_val_if_fail (frame != NULL, GST_FLOW_ERROR); - buffer = frame->buffer; - - if (parse->priv->discont) { - GST_DEBUG_OBJECT (parse, "marking DISCONT"); - GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); - parse->priv->discont = FALSE; - } - /* some one-time start-up */ if (G_UNLIKELY (!parse->priv->framecount)) { gst_base_parse_check_seekability (parse); gst_base_parse_check_upstream (parse); } - GST_LOG_OBJECT (parse, - "parsing frame at offset %" G_GUINT64_FORMAT - " (%#" G_GINT64_MODIFIER "x) of size %" G_GSIZE_FORMAT, - GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET (buffer), - gst_buffer_get_size (buffer)); - - /* use default handler to provide initial (upstream) metadata */ - gst_base_parse_parse_frame (parse, frame); - - /* store offset as it might get overwritten */ - offset = GST_BUFFER_OFFSET (buffer); - ret = klass->parse_frame (parse, frame); - /* sync */ buffer = frame->buffer; - /* subclass must play nice */ - g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR); + offset = frame->offset; /* check if subclass/format can provide ts. * If so, that allows and enables extra seek and duration determining options */ - if (G_UNLIKELY (parse->priv->first_frame_offset < 0 && ret == GST_FLOW_OK)) { + if (G_UNLIKELY (parse->priv->first_frame_offset < 0)) { if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer) && parse->priv->has_timing_info - && parse->priv->pad_mode == GST_PAD_ACTIVATE_PULL) { + && parse->priv->pad_mode == GST_PAD_MODE_PULL) { parse->priv->first_frame_offset = offset; parse->priv->first_frame_ts = GST_BUFFER_TIMESTAMP (buffer); GST_DEBUG_OBJECT (parse, "subclass provided ts %" GST_TIME_FORMAT @@ -1777,19 +1903,6 @@ gst_base_parse_handle_and_push_frame (GstBaseParse * parse, GST_BUFFER_TIMESTAMP (buffer), !GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT), FALSE); - /* First buffers are dropped, this means that the subclass needs more - * frames to decide on the format and queues them internally */ - /* convert internal flow to OK and mark discont for the next buffer. */ - if (ret == GST_BASE_PARSE_FLOW_DROPPED) { - gst_base_parse_frame_free (frame); - return GST_FLOW_OK; - } else if (ret == GST_BASE_PARSE_FLOW_QUEUED) { - gst_base_parse_queue_frame (parse, frame); - return GST_FLOW_OK; - } else if (ret != GST_FLOW_OK) { - return ret; - } - /* All OK, push queued frames if there are any */ if (G_UNLIKELY (!g_queue_is_empty (&parse->priv->queued_frames))) { GstBaseParseFrame *queued_frame; @@ -1805,12 +1918,11 @@ gst_base_parse_handle_and_push_frame (GstBaseParse * parse, /** * gst_base_parse_push_frame: * @parse: #GstBaseParse. - * @frame: (transfer full): a #GstBaseParseFrame + * @frame: (transfer none): a #GstBaseParseFrame * - * Pushes the frame downstream, sends any pending events and - * does some timestamp and segment handling. Takes ownership - * of @frame and will clear it (if it was initialised with - * gst_base_parse_frame_init()) or free it. + * Pushes the frame's buffer downstream, sends any pending events and + * does some timestamp and segment handling. Takes ownership of + * frame's buffer, though caller retains ownership of @frame. * * This must be called with sinkpad STREAM_LOCK held. * @@ -1842,8 +1954,7 @@ gst_base_parse_push_frame (GstBaseParse * parse, GstBaseParseFrame * frame) GST_TIME_ARGS (GST_BUFFER_DURATION (buffer))); /* update stats */ - size = gst_buffer_get_size (buffer); - parse->priv->bytecount += size; + parse->priv->bytecount += frame->size; if (G_LIKELY (!(frame->flags & GST_BASE_PARSE_FRAME_FLAG_NO_FRAME))) { parse->priv->framecount++; if (GST_BUFFER_DURATION_IS_VALID (buffer)) { @@ -1864,29 +1975,31 @@ gst_base_parse_push_frame (GstBaseParse * parse, GstBaseParseFrame * frame) last_stop = last_start + GST_BUFFER_DURATION (buffer); /* should have caps by now */ - g_return_val_if_fail (gst_pad_has_current_caps (parse->srcpad), - GST_FLOW_ERROR); + if (!gst_pad_has_current_caps (parse->srcpad)) + goto no_caps; - /* segment adjustment magic; only if we are running the whole show */ - if (!parse->priv->passthrough && parse->segment.rate > 0.0 && - (parse->priv->pad_mode == GST_PAD_ACTIVATE_PULL || - parse->priv->upstream_seekable)) { - /* segment times are typically estimates, - * actual frame data might lead subclass to different timestamps, - * so override segment start from what is supplied there */ - if (G_UNLIKELY (parse->priv->pending_segment && !parse->priv->exact_position - && GST_CLOCK_TIME_IS_VALID (last_start))) { - gst_event_unref (parse->priv->pending_segment); - parse->segment.start = - MIN ((guint64) last_start, (guint64) parse->segment.stop); + if (G_UNLIKELY (parse->priv->pending_segment)) { + /* have caps; check identity */ + gst_base_parse_check_media (parse); + } - GST_DEBUG_OBJECT (parse, - "adjusting pending segment start to %" GST_TIME_FORMAT, - GST_TIME_ARGS (parse->segment.start)); + /* Push pending events, including NEWSEGMENT events */ + if (G_UNLIKELY (parse->priv->pending_events)) { + GList *l; - parse->priv->pending_segment = gst_event_new_segment (&parse->segment); + for (l = parse->priv->pending_events; l != NULL; l = l->next) { + gst_pad_push_event (parse->srcpad, GST_EVENT (l->data)); } - /* handle gaps, e.g. non-zero start-time, in as much not handled by above */ + g_list_free (parse->priv->pending_events); + parse->priv->pending_events = NULL; + parse->priv->pending_segment = FALSE; + } + + /* segment adjustment magic; only if we are running the whole show */ + if (!parse->priv->passthrough && parse->segment.rate > 0.0 && + (parse->priv->pad_mode == GST_PAD_MODE_PULL || + parse->priv->upstream_seekable)) { + /* handle gaps */ if (GST_CLOCK_TIME_IS_VALID (parse->segment.position) && GST_CLOCK_TIME_IS_VALID (last_start)) { GstClockTimeDiff diff; @@ -1907,57 +2020,19 @@ gst_base_parse_push_frame (GstBaseParse * parse, GstBaseParseFrame * frame) GST_TIME_ARGS (parse->segment.position), GST_TIME_ARGS (last_start)); - if (G_UNLIKELY (parse->priv->pending_segment)) { - gst_event_unref (parse->priv->pending_segment); - parse->segment.start = last_start; - parse->segment.time = last_start; - parse->priv->pending_segment = - gst_event_new_segment (&parse->segment); - } else { - /* skip gap FIXME */ - gst_pad_push_event (parse->srcpad, - gst_event_new_segment (&parse->segment)); - } + /* skip gap FIXME */ + gst_pad_push_event (parse->srcpad, + gst_event_new_segment (&parse->segment)); + parse->segment.position = last_start; } } } - /* and should then also be linked downstream, so safe to send some events */ - if (G_UNLIKELY (parse->priv->close_segment)) { - /* only set up by loop */ - GST_DEBUG_OBJECT (parse, "loop sending close segment"); - gst_pad_push_event (parse->srcpad, parse->priv->close_segment); - parse->priv->close_segment = NULL; - } - if (G_UNLIKELY (parse->priv->pending_segment)) { - GstEvent *pending_segment; - - pending_segment = parse->priv->pending_segment; - parse->priv->pending_segment = NULL; - - GST_DEBUG_OBJECT (parse, "%s push pending segment", - parse->priv->pad_mode == GST_PAD_ACTIVATE_PULL ? "loop" : "chain"); - gst_pad_push_event (parse->srcpad, pending_segment); - - /* have caps; check identity */ - gst_base_parse_check_media (parse); - } - /* update bitrates and optionally post corresponding tags * (following newsegment) */ gst_base_parse_update_bitrates (parse, frame); - if (G_UNLIKELY (parse->priv->pending_events)) { - GList *l; - - for (l = parse->priv->pending_events; l != NULL; l = l->next) { - gst_pad_push_event (parse->srcpad, GST_EVENT (l->data)); - } - g_list_free (parse->priv->pending_events); - parse->priv->pending_events = NULL; - } - if (klass->pre_push_frame) { ret = klass->pre_push_frame (parse, frame); } else { @@ -1965,12 +2040,20 @@ gst_base_parse_push_frame (GstBaseParse * parse, GstBaseParseFrame * frame) } /* take final ownership of frame buffer */ - buffer = frame->buffer; - frame->buffer = NULL; + if (frame->out_buffer) { + buffer = frame->out_buffer; + frame->out_buffer = NULL; + gst_buffer_replace (&frame->buffer, NULL); + } else { + buffer = frame->buffer; + frame->buffer = NULL; + } /* subclass must play nice */ g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR); + size = gst_buffer_get_size (buffer); + parse->priv->seen_keyframe |= parse->priv->is_video && !GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT); @@ -2022,7 +2105,7 @@ gst_base_parse_push_frame (GstBaseParse * parse, GstBaseParseFrame * frame) /* if we are not sufficiently in control, let upstream decide on EOS */ if (ret == GST_FLOW_EOS && (parse->priv->passthrough || - (parse->priv->pad_mode == GST_PAD_ACTIVATE_PUSH && + (parse->priv->pad_mode == GST_PAD_MODE_PUSH && !parse->priv->upstream_seekable))) ret = GST_FLOW_OK; } @@ -2032,11 +2115,107 @@ gst_base_parse_push_frame (GstBaseParse * parse, GstBaseParseFrame * frame) parse->segment.position < last_stop) parse->segment.position = last_stop; - gst_base_parse_frame_free (frame); - return ret; + + /* ERRORS */ +no_caps: + { + GST_ELEMENT_ERROR (parse, STREAM, DECODE, ("No caps set"), (NULL)); + return GST_FLOW_ERROR; + } } +/** + * gst_base_parse_finish_frame: + * @parse: a #GstBaseParse + * @frame: a #GstBaseParseFrame + * @size: consumed input data represented by frame + * + * Collects parsed data and pushes this downstream. + * Source pad caps must be set when this is called. + * + * If @frame's out_buffer is set, that will be used as subsequent frame data. + * Otherwise, @size samples will be taken from the input and used for output, + * and the output's metadata (timestamps etc) will be taken as (optionally) + * set by the subclass on @frame's (input) buffer (which is otherwise + * ignored for any but the above purpose/information). + * + * Note that the latter buffer is invalidated by this call, whereas the + * caller retains ownership of @frame. + * + * Returns: a #GstFlowReturn that should be escalated to caller (of caller) + * + * Since: 0.11.1 + */ +GstFlowReturn +gst_base_parse_finish_frame (GstBaseParse * parse, GstBaseParseFrame * frame, + gint size) +{ + GstFlowReturn ret = GST_FLOW_OK; + + g_return_val_if_fail (frame != NULL, GST_FLOW_ERROR); + g_return_val_if_fail (frame->buffer != NULL, GST_FLOW_ERROR); + g_return_val_if_fail (size > 0 || frame->out_buffer, GST_FLOW_ERROR); + g_return_val_if_fail (gst_adapter_available (parse->priv->adapter) >= size, + GST_FLOW_ERROR); + + GST_LOG_OBJECT (parse, "finished frame at offset %" G_GUINT64_FORMAT ", " + "flushing size %d", frame->offset, size); + + if (parse->priv->scanning && frame->buffer) { + if (!parse->priv->scanned_frame) { + parse->priv->scanned_frame = gst_base_parse_frame_copy (frame); + } + goto exit; + } + + parse->priv->flushed += size; + + /* either PUSH or PULL mode arranges for adapter data */ + /* ensure output buffer */ + if (!frame->out_buffer) { + GstBuffer *src, *dest; + + frame->out_buffer = gst_adapter_take_buffer (parse->priv->adapter, size); + dest = frame->out_buffer; + src = frame->buffer; + GST_BUFFER_PTS (dest) = GST_BUFFER_PTS (src); + GST_BUFFER_DTS (dest) = GST_BUFFER_DTS (src); + GST_BUFFER_OFFSET (dest) = GST_BUFFER_OFFSET (src); + GST_BUFFER_DURATION (dest) = GST_BUFFER_DURATION (src); + GST_BUFFER_OFFSET_END (dest) = GST_BUFFER_OFFSET_END (src); + GST_MINI_OBJECT_FLAGS (dest) = GST_MINI_OBJECT_FLAGS (src); + } else { + gst_adapter_flush (parse->priv->adapter, size); + } + + /* use as input for subsequent processing */ + gst_buffer_replace (&frame->buffer, frame->out_buffer); + gst_buffer_unref (frame->out_buffer); + frame->out_buffer = NULL; + + /* mark input size consumed */ + frame->size = size; + + /* subclass might queue frames/data internally if it needs more + * frames to decide on the format, or might request us to queue here. */ + if (frame->flags & GST_BASE_PARSE_FRAME_FLAG_DROP) { + gst_buffer_replace (&frame->buffer, NULL); + goto exit; + } else if (frame->flags & GST_BASE_PARSE_FRAME_FLAG_QUEUE) { + GstBaseParseFrame *copy; + + copy = gst_base_parse_frame_copy (frame); + copy->flags &= ~GST_BASE_PARSE_FRAME_FLAG_QUEUE; + gst_base_parse_queue_frame (parse, copy); + goto exit; + } + + ret = gst_base_parse_handle_and_push_frame (parse, frame); + +exit: + return ret; +} /* gst_base_parse_drain: * @@ -2057,7 +2236,8 @@ gst_base_parse_drain (GstBaseParse * parse) if (!avail) break; - if (gst_base_parse_chain (parse->sinkpad, NULL) != GST_FLOW_OK) { + if (gst_base_parse_chain (parse->sinkpad, GST_OBJECT_CAST (parse), + NULL) != GST_FLOW_OK) { break; } @@ -2113,7 +2293,34 @@ gst_base_parse_send_buffers (GstBaseParse * parse) return ret; } -/* gst_base_parse_process_fragment: +/* gst_base_parse_start_fragment: + * + * Prepares for processing a reverse playback (forward) fragment + * by (re)setting proper state variables. + */ +static GstFlowReturn +gst_base_parse_start_fragment (GstBaseParse * parse) +{ + GST_LOG_OBJECT (parse, "starting fragment"); + + /* invalidate so no fall-back timestamping is performed; + * ok if taken from subclass or upstream */ + parse->priv->next_ts = GST_CLOCK_TIME_NONE; + parse->priv->prev_ts = GST_CLOCK_TIME_NONE; + /* prevent it hanging around stop all the time */ + parse->segment.position = GST_CLOCK_TIME_NONE; + /* mark next run */ + parse->priv->discont = TRUE; + + /* head of previous fragment is now pending tail of current fragment */ + parse->priv->buffers_pending = parse->priv->buffers_head; + parse->priv->buffers_head = NULL; + + return GST_FLOW_OK; +} + + +/* gst_base_parse_finish_fragment: * * Processes a reverse playback (forward) fragment: * - append head of last fragment that was skipped to current fragment data @@ -2122,40 +2329,35 @@ gst_base_parse_send_buffers (GstBaseParse * parse) * - push queued data */ static GstFlowReturn -gst_base_parse_process_fragment (GstBaseParse * parse, gboolean push_only) +gst_base_parse_finish_fragment (GstBaseParse * parse, gboolean prev_head) { GstBuffer *buf; GstFlowReturn ret = GST_FLOW_OK; gboolean seen_key = FALSE, seen_delta = FALSE; - if (push_only) - goto push; + GST_LOG_OBJECT (parse, "finishing fragment"); /* restore order */ parse->priv->buffers_pending = g_slist_reverse (parse->priv->buffers_pending); while (parse->priv->buffers_pending) { buf = GST_BUFFER_CAST (parse->priv->buffers_pending->data); - GST_LOG_OBJECT (parse, "adding pending buffer (size %" G_GSIZE_FORMAT ")", - gst_buffer_get_size (buf)); - gst_adapter_push (parse->priv->adapter, buf); + if (prev_head) { + GST_LOG_OBJECT (parse, "adding pending buffer (size %" G_GSIZE_FORMAT ")", + gst_buffer_get_size (buf)); + gst_adapter_push (parse->priv->adapter, buf); + } else { + GST_LOG_OBJECT (parse, "discarding head buffer"); + gst_buffer_unref (buf); + } parse->priv->buffers_pending = g_slist_delete_link (parse->priv->buffers_pending, parse->priv->buffers_pending); } - /* invalidate so no fall-back timestamping is performed; - * ok if taken from subclass or upstream */ - parse->priv->next_ts = GST_CLOCK_TIME_NONE; - /* prevent it hanging around stop all the time */ - parse->segment.position = GST_CLOCK_TIME_NONE; - /* mark next run */ - parse->priv->discont = TRUE; - /* chain looks for frames and queues resulting ones (in stead of pushing) */ /* initial skipped data is added to buffers_pending */ gst_base_parse_drain (parse); -push: if (parse->priv->buffers_send) { buf = GST_BUFFER_CAST (parse->priv->buffers_send->data); seen_key |= !GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT); @@ -2202,12 +2404,11 @@ push: } seen_key = FALSE; } - } else { seen_delta = TRUE; + } else { + seen_key = TRUE; } - seen_key |= !GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT); - parse->priv->buffers_send = g_slist_prepend (parse->priv->buffers_send, buf); parse->priv->buffers_queued = @@ -2244,21 +2445,19 @@ gst_base_parse_check_sync (GstBaseParse * parse) } static GstFlowReturn -gst_base_parse_chain (GstPad * pad, GstBuffer * buffer) +gst_base_parse_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) { GstBaseParseClass *bclass; GstBaseParse *parse; GstFlowReturn ret = GST_FLOW_OK; - GstBuffer *outbuf = NULL; GstBuffer *tmpbuf = NULL; guint fsize = 1; gint skip = -1; const guint8 *data; - guint old_min_size = 0, min_size, av; + guint min_size, av; GstClockTime timestamp; - GstBaseParseFrame *frame; - parse = GST_BASE_PARSE (GST_OBJECT_PARENT (pad)); + parse = GST_BASE_PARSE (parent); bclass = GST_BASE_PARSE_GET_CLASS (parse); if (parse->priv->detecting) { @@ -2297,7 +2496,7 @@ gst_base_parse_chain (GstPad * pad, GstBuffer * buffer) if (ret == GST_FLOW_OK && !parse->priv->flushing) ret = gst_base_parse_chain (GST_BASE_PARSE_SINK_PAD (parse), - GST_BUFFER_CAST (l->data)); + parent, GST_BUFFER_CAST (l->data)); else gst_buffer_unref (GST_BUFFER_CAST (l->data)); } @@ -2336,159 +2535,53 @@ gst_base_parse_chain (GstPad * pad, GstBuffer * buffer) /* And now handle the current buffer if detection worked */ } - frame = &parse->priv->frame; - if (G_LIKELY (buffer)) { GST_LOG_OBJECT (parse, "buffer size: %" G_GSIZE_FORMAT ", offset = %" G_GINT64_FORMAT, gst_buffer_get_size (buffer), GST_BUFFER_OFFSET (buffer)); if (G_UNLIKELY (parse->priv->passthrough)) { - gst_base_parse_frame_init (frame); - frame->buffer = gst_buffer_make_writable (buffer); - return gst_base_parse_push_frame (parse, frame); + GstBaseParseFrame frame; + + gst_base_parse_frame_init (&frame); + frame.buffer = gst_buffer_make_writable (buffer); + ret = gst_base_parse_push_frame (parse, &frame); + gst_base_parse_frame_free (&frame); + return ret; } /* upstream feeding us in reverse playback; - * gather each fragment, then process it in single run */ + * finish previous fragment and start new upon DISCONT */ if (parse->segment.rate < 0.0) { if (G_UNLIKELY (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT))) { GST_DEBUG_OBJECT (parse, "buffer starts new reverse playback fragment"); - ret = gst_base_parse_process_fragment (parse, FALSE); + ret = gst_base_parse_finish_fragment (parse, TRUE); + gst_base_parse_start_fragment (parse); } - gst_adapter_push (parse->priv->adapter, buffer); - return ret; } gst_adapter_push (parse->priv->adapter, buffer); } - if (G_UNLIKELY (buffer && - GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT))) { - frame->_private_flags |= GST_BASE_PARSE_FRAME_PRIVATE_FLAG_NOALLOC; - gst_base_parse_frame_free (frame); - } - /* Parse and push as many frames as possible */ /* Stop either when adapter is empty or we are flushing */ while (!parse->priv->flushing) { - gboolean res; - - /* maintain frame state for a single frame parsing round across _chain calls, - * so only init when needed */ - if (!frame->_private_flags) - gst_base_parse_frame_init (frame); - - tmpbuf = gst_buffer_new (); - - old_min_size = 0; - /* Synchronization loop */ - for (;;) { - /* note: if subclass indicates MAX fsize, - * this will not likely be available anyway ... */ - min_size = MAX (parse->priv->min_frame_size, fsize); - av = gst_adapter_available (parse->priv->adapter); - - /* loop safety check */ - if (G_UNLIKELY (old_min_size >= min_size)) - goto invalid_min; - old_min_size = min_size; - - if (G_UNLIKELY (parse->priv->drain)) { - min_size = av; - GST_DEBUG_OBJECT (parse, "draining, data left: %d", min_size); - if (G_UNLIKELY (!min_size)) { - gst_buffer_unref (tmpbuf); - goto done; - } - } - - /* Collect at least min_frame_size bytes */ - if (av < min_size) { - GST_DEBUG_OBJECT (parse, "not enough data available (only %d bytes)", - av); - gst_buffer_unref (tmpbuf); - goto done; - } - - /* always pass all available data */ - data = gst_adapter_map (parse->priv->adapter, av); - gst_buffer_take_memory (tmpbuf, -1, - gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY, - (gpointer) data, NULL, av, 0, av)); - GST_BUFFER_OFFSET (tmpbuf) = parse->priv->offset; + gint flush = 0; - if (parse->priv->discont) { - GST_DEBUG_OBJECT (parse, "marking DISCONT"); - GST_BUFFER_FLAG_SET (tmpbuf, GST_BUFFER_FLAG_DISCONT); - } + /* note: if subclass indicates MAX fsize, + * this will not likely be available anyway ... */ + min_size = MAX (parse->priv->min_frame_size, fsize); + av = gst_adapter_available (parse->priv->adapter); - skip = -1; - gst_base_parse_frame_update (parse, frame, tmpbuf); - res = bclass->check_valid_frame (parse, frame, &fsize, &skip); - gst_adapter_unmap (parse->priv->adapter); - gst_buffer_replace (&frame->buffer, NULL); - gst_buffer_remove_memory_range (tmpbuf, 0, -1); - if (res) { - if (gst_adapter_available (parse->priv->adapter) < fsize) { - GST_DEBUG_OBJECT (parse, "found valid frame but not enough data" - " available (only %" G_GSIZE_FORMAT " bytes)", - gst_adapter_available (parse->priv->adapter)); - gst_buffer_unref (tmpbuf); - goto done; - } - GST_LOG_OBJECT (parse, "valid frame of size %d at pos %d", fsize, skip); - break; - } - if (skip == -1) { - /* subclass didn't touch this value. By default we skip 1 byte */ - skip = 1; - } - if (skip > 0) { - GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", skip); - if (parse->segment.rate < 0.0 && !parse->priv->buffers_queued) { - /* reverse playback, and no frames found yet, so we are skipping - * the leading part of a fragment, which may form the tail of - * fragment coming later, hopefully subclass skips efficiently ... */ - timestamp = gst_adapter_prev_timestamp (parse->priv->adapter, NULL); - outbuf = gst_adapter_take_buffer (parse->priv->adapter, skip); - outbuf = gst_buffer_make_writable (outbuf); - GST_BUFFER_TIMESTAMP (outbuf) = timestamp; - parse->priv->buffers_pending = - g_slist_prepend (parse->priv->buffers_pending, outbuf); - outbuf = NULL; - } else { - gst_adapter_flush (parse->priv->adapter, skip); - } - parse->priv->offset += skip; - if (!parse->priv->discont) - parse->priv->sync_offset = parse->priv->offset; - parse->priv->discont = TRUE; - /* something changed least; nullify loop check */ - old_min_size = 0; - } - /* skip == 0 should imply subclass set min_size to need more data; - * we check this shortly */ - if ((ret = gst_base_parse_check_sync (parse)) != GST_FLOW_OK) { - gst_buffer_unref (tmpbuf); + if (G_UNLIKELY (parse->priv->drain)) { + min_size = av; + GST_DEBUG_OBJECT (parse, "draining, data left: %d", min_size); + if (G_UNLIKELY (!min_size)) { goto done; } } - gst_buffer_unref (tmpbuf); - tmpbuf = NULL; - - if (skip > 0) { - /* Subclass found the sync, but still wants to skip some data */ - GST_LOG_OBJECT (parse, "skipping %d bytes", skip); - gst_adapter_flush (parse->priv->adapter, skip); - parse->priv->offset += skip; - } - - /* Grab lock to prevent a race with FLUSH_START handler */ - GST_PAD_STREAM_LOCK (parse->srcpad); - /* FLUSH_START event causes the "flushing" flag to be set. In this - * case we can leave the frame pushing loop */ - if (parse->priv->flushing) { - GST_PAD_STREAM_UNLOCK (parse->srcpad); - break; + /* Collect at least min_frame_size bytes */ + if (av < min_size) { + GST_DEBUG_OBJECT (parse, "not enough data available (only %d bytes)", av); + goto done; } /* move along with upstream timestamp (if any), @@ -2499,38 +2592,33 @@ gst_base_parse_chain (GstPad * pad, GstBuffer * buffer) parse->priv->prev_ts = parse->priv->next_ts = timestamp; } - /* FIXME: Would it be more efficient to make a subbuffer instead? */ - outbuf = gst_adapter_take_buffer (parse->priv->adapter, fsize); - outbuf = gst_buffer_make_writable (outbuf); - - /* Subclass may want to know the data offset */ - GST_BUFFER_OFFSET (outbuf) = parse->priv->offset; - parse->priv->offset += fsize; - GST_BUFFER_TIMESTAMP (outbuf) = GST_CLOCK_TIME_NONE; - GST_BUFFER_DURATION (outbuf) = GST_CLOCK_TIME_NONE; + /* always pass all available data */ + data = gst_adapter_map (parse->priv->adapter, av); + /* arrange for actual data to be copied if subclass tries to, + * since what is passed is tied to the adapter */ + tmpbuf = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY | + GST_MEMORY_FLAG_NO_SHARE, (gpointer) data, av, 0, av, NULL, NULL); - frame->buffer = outbuf; - ret = gst_base_parse_handle_and_push_frame (parse, bclass, frame); - GST_PAD_STREAM_UNLOCK (parse->srcpad); + /* keep the adapter mapped, so keep track of what has to be flushed */ + ret = gst_base_parse_handle_buffer (parse, tmpbuf, &skip, &flush); + tmpbuf = NULL; + /* probably already implicitly unmapped due to adapter operation, + * but for good measure ... */ + gst_adapter_unmap (parse->priv->adapter); if (ret != GST_FLOW_OK) { - GST_LOG_OBJECT (parse, "push returned %d", ret); - break; + goto done; + } + if (skip == 0 && flush == 0) { + GST_LOG_OBJECT (parse, "nothing skipped and no frames finished, " + "breaking to get more data"); + goto done; } } done: GST_LOG_OBJECT (parse, "chain leaving"); return ret; - - /* ERRORS */ -invalid_min: - { - GST_ELEMENT_ERROR (parse, STREAM, FAILED, (NULL), - ("min_size evolution %d -> %d; breaking to avoid looping", - old_min_size, min_size)); - return GST_FLOW_ERROR; - } } /* pull @size bytes at current offset, @@ -2637,7 +2725,7 @@ gst_base_parse_handle_previous_fragment (GstBaseParse * parse) if (parse->priv->exact_position) { offset = gst_base_parse_find_offset (parse, ts, TRUE, NULL); } else { - if (!gst_pad_query_convert (parse->srcpad, GST_FORMAT_TIME, ts, + if (!gst_base_parse_convert (parse, GST_FORMAT_TIME, ts, GST_FORMAT_BYTES, &offset)) { GST_DEBUG_OBJECT (parse, "conversion failed, only BYTE based"); } @@ -2658,8 +2746,9 @@ gst_base_parse_handle_previous_fragment (GstBaseParse * parse) /* offset will increase again as fragment is processed/parsed */ parse->priv->last_offset = offset; + gst_base_parse_start_fragment (parse); gst_adapter_push (parse->priv->adapter, buffer); - ret = gst_base_parse_process_fragment (parse, FALSE); + ret = gst_base_parse_finish_fragment (parse, TRUE); if (ret != GST_FLOW_OK) goto exit; @@ -2675,15 +2764,14 @@ exit: * ajusts sync, drain and offset going along */ static GstFlowReturn gst_base_parse_scan_frame (GstBaseParse * parse, GstBaseParseClass * klass, - GstBaseParseFrame * frame, gboolean full) + gboolean full) { - GstBuffer *buffer, *outbuf; + GstBuffer *buffer; GstFlowReturn ret = GST_FLOW_OK; - guint fsize = 1, min_size, old_min_size = 0; + guint fsize, min_size; + gint flushed = 0; gint skip = 0; - g_return_val_if_fail (frame != NULL, GST_FLOW_ERROR); - GST_LOG_OBJECT (parse, "scanning for frame at offset %" G_GUINT64_FORMAT " (%#" G_GINT64_MODIFIER "x)", parse->priv->offset, parse->priv->offset); @@ -2694,27 +2782,20 @@ gst_base_parse_scan_frame (GstBaseParse * parse, GstBaseParseClass * klass, fsize = 64 * 1024; while (TRUE) { - gboolean res; - min_size = MAX (parse->priv->min_frame_size, fsize); - /* loop safety check */ - if (G_UNLIKELY (old_min_size >= min_size)) - goto invalid_min; - old_min_size = min_size; + + GST_LOG_OBJECT (parse, "reading buffer size %u", min_size); ret = gst_base_parse_pull_range (parse, min_size, &buffer); if (ret != GST_FLOW_OK) goto done; - if (parse->priv->discont) { - GST_DEBUG_OBJECT (parse, "marking DISCONT"); - GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); - } - /* if we got a short read, inform subclass we are draining leftover * and no more is to be expected */ - if (gst_buffer_get_size (buffer) < min_size) + if (gst_buffer_get_size (buffer) < min_size) { + GST_LOG_OBJECT (parse, "... but did not get that; marked draining"); parse->priv->drain = TRUE; + } if (parse->priv->detecting) { ret = klass->detect (parse, buffer); @@ -2740,84 +2821,35 @@ gst_base_parse_scan_frame (GstBaseParse * parse, GstBaseParseClass * klass, /* Else handle this buffer normally */ } - skip = -1; - gst_base_parse_frame_update (parse, frame, buffer); - res = klass->check_valid_frame (parse, frame, &fsize, &skip); - gst_buffer_replace (&frame->buffer, NULL); - if (res) { - parse->priv->drain = FALSE; - GST_LOG_OBJECT (parse, "valid frame of size %d at pos %d", fsize, skip); + ret = gst_base_parse_handle_buffer (parse, buffer, &skip, &flushed); + if (ret != GST_FLOW_OK) break; - } - parse->priv->drain = FALSE; - if (skip == -1) - skip = 1; - if (skip > 0) { - GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", skip); - if (full && parse->segment.rate < 0.0 && !parse->priv->buffers_queued) { - /* reverse playback, and no frames found yet, so we are skipping - * the leading part of a fragment, which may form the tail of - * fragment coming later, hopefully subclass skips efficiently ... */ - outbuf = gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL, 0, skip); - parse->priv->buffers_pending = - g_slist_prepend (parse->priv->buffers_pending, outbuf); - outbuf = NULL; - } - parse->priv->offset += skip; - if (!parse->priv->discont) - parse->priv->sync_offset = parse->priv->offset; - parse->priv->discont = TRUE; - /* something changed at least; nullify loop check */ - if (fsize == G_MAXUINT) - fsize = old_min_size + 64 * 1024; - old_min_size = 0; - } - /* skip == 0 should imply subclass set min_size to need more data; - * we check this shortly */ - GST_DEBUG_OBJECT (parse, "finding sync..."); - gst_buffer_unref (buffer); - if ((ret = gst_base_parse_check_sync (parse)) != GST_FLOW_OK) { - goto done; - } - } - - /* Does the subclass want to skip too? */ - if (skip > 0) - parse->priv->offset += skip; - else if (skip < 0) - skip = 0; - if (fsize + skip <= gst_buffer_get_size (buffer)) { - outbuf = gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL, skip, fsize); - GST_BUFFER_OFFSET (outbuf) = GST_BUFFER_OFFSET (buffer) + skip; - GST_BUFFER_TIMESTAMP (outbuf) = GST_CLOCK_TIME_NONE; - gst_buffer_unref (buffer); - } else { - gst_buffer_unref (buffer); - ret = gst_base_parse_pull_range (parse, fsize, &outbuf); - if (ret != GST_FLOW_OK) - goto done; - if (gst_buffer_get_size (outbuf) < fsize) { - gst_buffer_unref (outbuf); + /* something flushed means something happened, + * and we should bail out of this loop so as not to occupy + * the task thread indefinitely */ + if (flushed) { + GST_LOG_OBJECT (parse, "frame finished, breaking loop"); + break; + } + /* nothing flushed, no skip and draining, so nothing left to do */ + if (!skip && parse->priv->drain) { + GST_LOG_OBJECT (parse, "no activity or result when draining; " + "breaking loop and marking EOS"); ret = GST_FLOW_EOS; + break; + } + /* otherwise, get some more data + * note that is checked this does not happen indefinitely */ + if (!skip) { + GST_LOG_OBJECT (parse, "getting some more data"); + fsize += 64 * 1024; } + parse->priv->drain = FALSE; } - parse->priv->offset += fsize; - - frame->buffer = outbuf; - done: return ret; - - /* ERRORS */ -invalid_min: - { - GST_ELEMENT_ERROR (parse, STREAM, FAILED, (NULL), - ("min_size evolution %d -> %d; breaking to avoid looping", - old_min_size, min_size)); - return GST_FLOW_ERROR; - } } /* Loop that is used in pull mode to retrieve data from upstream */ @@ -2827,7 +2859,6 @@ gst_base_parse_loop (GstPad * pad) GstBaseParse *parse; GstBaseParseClass *klass; GstFlowReturn ret = GST_FLOW_OK; - GstBaseParseFrame frame; parse = GST_BASE_PARSE (gst_pad_get_parent (pad)); klass = GST_BASE_PARSE_GET_CLASS (parse); @@ -2844,20 +2875,16 @@ gst_base_parse_loop (GstPad * pad) } } - gst_base_parse_frame_init (&frame); - ret = gst_base_parse_scan_frame (parse, klass, &frame, TRUE); + ret = gst_base_parse_scan_frame (parse, klass, TRUE); if (ret != GST_FLOW_OK) goto done; - /* This always cleans up frame, even if error occurs */ - ret = gst_base_parse_handle_and_push_frame (parse, klass, &frame); - /* eat expected eos signalling past segment in reverse playback */ if (parse->segment.rate < 0.0 && ret == GST_FLOW_EOS && parse->segment.position >= parse->segment.stop) { GST_DEBUG_OBJECT (parse, "downstream has reached end of segment"); /* push what was accumulated during loop run */ - gst_base_parse_process_fragment (parse, TRUE); + gst_base_parse_finish_fragment (parse, FALSE); /* force previous fragment */ parse->priv->offset = -1; ret = GST_FLOW_OK; @@ -2918,11 +2945,18 @@ pause: push_eos = TRUE; } if (push_eos) { - /* newsegment before eos */ - if (parse->priv->pending_segment) { - gst_pad_push_event (parse->srcpad, parse->priv->pending_segment); - parse->priv->pending_segment = NULL; + /* Push pending events, including NEWSEGMENT events */ + if (G_UNLIKELY (parse->priv->pending_events)) { + GList *l; + + for (l = parse->priv->pending_events; l != NULL; l = l->next) { + gst_pad_push_event (parse->srcpad, GST_EVENT (l->data)); + } + g_list_free (parse->priv->pending_events); + parse->priv->pending_events = NULL; + parse->priv->pending_segment = FALSE; } + gst_pad_push_event (parse->srcpad, gst_event_new_eos ()); } gst_object_unref (parse); @@ -2930,38 +2964,40 @@ pause: } static gboolean -gst_base_parse_sink_activate (GstPad * sinkpad) +gst_base_parse_sink_activate (GstPad * sinkpad, GstObject * parent) { GstBaseParse *parse; - gboolean result = TRUE; GstQuery *query; gboolean pull_mode; - parse = GST_BASE_PARSE (gst_pad_get_parent (sinkpad)); + parse = GST_BASE_PARSE (parent); GST_DEBUG_OBJECT (parse, "sink activate"); query = gst_query_new_scheduling (); - result = gst_pad_peer_query (sinkpad, query); - if (result) { - gst_query_parse_scheduling (query, &pull_mode, NULL, NULL, NULL, NULL, - NULL); - } else { - pull_mode = FALSE; + if (!gst_pad_peer_query (sinkpad, query)) { + gst_query_unref (query); + goto baseparse_push; } + + pull_mode = gst_query_has_scheduling_mode (query, GST_PAD_MODE_PULL); gst_query_unref (query); - if (pull_mode) { - GST_DEBUG_OBJECT (parse, "trying to activate in pull mode"); - result = gst_pad_activate_pull (sinkpad, TRUE); - } else { + if (!pull_mode) + goto baseparse_push; + + GST_DEBUG_OBJECT (parse, "trying to activate in pull mode"); + if (!gst_pad_activate_mode (sinkpad, GST_PAD_MODE_PULL, TRUE)) + goto baseparse_push; + + return gst_pad_start_task (sinkpad, (GstTaskFunction) gst_base_parse_loop, + sinkpad); + /* fallback */ +baseparse_push: + { GST_DEBUG_OBJECT (parse, "trying to activate in push mode"); - result = gst_pad_activate_push (sinkpad, TRUE); + return gst_pad_activate_mode (sinkpad, GST_PAD_MODE_PUSH, TRUE); } - - GST_DEBUG_OBJECT (parse, "sink activate return %d", result); - gst_object_unref (parse); - return result; } static gboolean @@ -2975,7 +3011,7 @@ gst_base_parse_activate (GstBaseParse * parse, gboolean active) klass = GST_BASE_PARSE_GET_CLASS (parse); if (active) { - if (parse->priv->pad_mode == GST_PAD_ACTIVATE_NONE && klass->start) + if (parse->priv->pad_mode == GST_PAD_MODE_NONE && klass->start) result = klass->start (parse); /* If the subclass implements ::detect we want to @@ -2987,71 +3023,60 @@ gst_base_parse_activate (GstBaseParse * parse, gboolean active) GST_PAD_STREAM_LOCK (parse->sinkpad); GST_PAD_STREAM_UNLOCK (parse->sinkpad); - if (parse->priv->pad_mode != GST_PAD_ACTIVATE_NONE && klass->stop) + if (parse->priv->pad_mode != GST_PAD_MODE_NONE && klass->stop) result = klass->stop (parse); - parse->priv->pad_mode = GST_PAD_ACTIVATE_NONE; + parse->priv->pad_mode = GST_PAD_MODE_NONE; } GST_DEBUG_OBJECT (parse, "activate return: %d", result); return result; } static gboolean -gst_base_parse_sink_activate_push (GstPad * pad, gboolean active) +gst_base_parse_sink_activate_mode (GstPad * pad, GstObject * parent, + GstPadMode mode, gboolean active) { - gboolean result = TRUE; + gboolean result; GstBaseParse *parse; - parse = GST_BASE_PARSE (gst_pad_get_parent (pad)); + parse = GST_BASE_PARSE (parent); - GST_DEBUG_OBJECT (parse, "sink activate push %d", active); + GST_DEBUG_OBJECT (parse, "sink activate mode %d, %d", mode, active); - result = gst_base_parse_activate (parse, active); + if (!gst_base_parse_activate (parse, active)) + goto activate_failed; + switch (mode) { + case GST_PAD_MODE_PULL: + if (active) { + parse->priv->pending_events = + g_list_append (parse->priv->pending_events, + gst_event_new_segment (&parse->segment)); + parse->priv->pending_segment = TRUE; + result = TRUE; + } else { + result = gst_pad_stop_task (pad); + } + break; + default: + result = TRUE; + break; + } if (result) - parse->priv->pad_mode = - active ? GST_PAD_ACTIVATE_PUSH : GST_PAD_ACTIVATE_NONE; + parse->priv->pad_mode = active ? mode : GST_PAD_MODE_NONE; - GST_DEBUG_OBJECT (parse, "sink activate push return: %d", result); + GST_DEBUG_OBJECT (parse, "sink activate return: %d", result); - gst_object_unref (parse); return result; -} - -static gboolean -gst_base_parse_sink_activate_pull (GstPad * sinkpad, gboolean active) -{ - gboolean result = FALSE; - GstBaseParse *parse; - - parse = GST_BASE_PARSE (gst_pad_get_parent (sinkpad)); - GST_DEBUG_OBJECT (parse, "activate pull %d", active); - - result = gst_base_parse_activate (parse, active); - - if (result) { - if (active) { - parse->priv->pending_segment = gst_event_new_segment (&parse->segment); - result &= - gst_pad_start_task (sinkpad, (GstTaskFunction) gst_base_parse_loop, - sinkpad); - } else { - result &= gst_pad_stop_task (sinkpad); - } + /* ERRORS */ +activate_failed: + { + GST_DEBUG_OBJECT (parse, "activate failed"); + return FALSE; } - - if (result) - parse->priv->pad_mode = - active ? GST_PAD_ACTIVATE_PULL : GST_PAD_ACTIVATE_NONE; - - GST_DEBUG_OBJECT (parse, "sink activate pull return: %d", result); - - gst_object_unref (parse); - return result; } - /** * gst_base_parse_set_duration: * @parse: #GstBaseParse. @@ -3302,6 +3327,8 @@ gst_base_parse_get_duration (GstBaseParse * parse, GstFormat format, GST_LOG_OBJECT (parse, "using estimated duration"); *duration = parse->priv->estimated_duration; res = TRUE; + } else { + GST_LOG_OBJECT (parse, "cannot estimate duration"); } GST_LOG_OBJECT (parse, "res: %d, duration %" GST_TIME_FORMAT, res, @@ -3310,12 +3337,12 @@ gst_base_parse_get_duration (GstBaseParse * parse, GstFormat format, } static gboolean -gst_base_parse_src_query (GstPad * pad, GstQuery * query) +gst_base_parse_src_query (GstPad * pad, GstObject * parent, GstQuery * query) { GstBaseParse *parse; gboolean res = FALSE; - parse = GST_BASE_PARSE (GST_PAD_PARENT (pad)); + parse = GST_BASE_PARSE (parent); GST_LOG_OBJECT (parse, "handling query: %" GST_PTR_FORMAT, query); @@ -3329,7 +3356,7 @@ gst_base_parse_src_query (GstPad * pad, GstQuery * query) gst_query_parse_position (query, &format, NULL); /* try upstream first */ - res = gst_pad_query_default (pad, query); + res = gst_pad_query_default (pad, parent, query); if (!res) { /* Fall back on interpreting segment */ GST_OBJECT_LOCK (parse); @@ -3363,7 +3390,7 @@ gst_base_parse_src_query (GstPad * pad, GstQuery * query) gst_query_parse_duration (query, &format, NULL); /* consult upstream */ - res = gst_pad_query_default (pad, query); + res = gst_pad_query_default (pad, parent, query); /* otherwise best estimate from us */ if (!res) { @@ -3383,7 +3410,7 @@ gst_base_parse_src_query (GstPad * pad, GstQuery * query) gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL); /* consult upstream */ - res = gst_pad_query_default (pad, query); + res = gst_pad_query_default (pad, parent, query); /* we may be able to help if in TIME */ if (fmt == GST_FORMAT_TIME && gst_base_parse_is_seekable (parse)) { @@ -3452,7 +3479,7 @@ gst_base_parse_src_query (GstPad * pad, GstQuery * query) break; } default: - res = gst_pad_query_default (pad, query); + res = gst_pad_query_default (pad, parent, query); break; } return res; @@ -3469,7 +3496,7 @@ gst_base_parse_find_frame (GstBaseParse * parse, gint64 * pos, gboolean orig_drain, orig_discont; GstFlowReturn ret = GST_FLOW_OK; GstBuffer *buf = NULL; - GstBaseParseFrame frame; + GstBaseParseFrame *sframe = NULL; g_return_val_if_fail (pos != NULL, GST_FLOW_ERROR); g_return_val_if_fail (time != NULL, GST_FLOW_ERROR); @@ -3488,37 +3515,36 @@ gst_base_parse_find_frame (GstBaseParse * parse, gint64 * pos, GST_DEBUG_OBJECT (parse, "scanning for frame starting at %" G_GINT64_FORMAT " (%#" G_GINT64_MODIFIER "x)", *pos, *pos); - gst_base_parse_frame_init (&frame); - /* jump elsewhere and locate next frame */ parse->priv->offset = *pos; - ret = gst_base_parse_scan_frame (parse, klass, &frame, FALSE); - if (ret != GST_FLOW_OK) + /* mark as scanning so frames don't get processed all the way */ + parse->priv->scanning = TRUE; + ret = gst_base_parse_scan_frame (parse, klass, FALSE); + parse->priv->scanning = FALSE; + /* retrieve frame found during scan */ + sframe = parse->priv->scanned_frame; + parse->priv->scanned_frame = NULL; + + if (ret != GST_FLOW_OK || !sframe) goto done; - buf = frame.buffer; - GST_LOG_OBJECT (parse, - "peek parsing frame at offset %" G_GUINT64_FORMAT - " (%#" G_GINT64_MODIFIER "x) of size %" G_GSIZE_FORMAT, - GST_BUFFER_OFFSET (buf), GST_BUFFER_OFFSET (buf), - gst_buffer_get_size (buf)); - /* get offset first, subclass parsing might dump other stuff in there */ - *pos = GST_BUFFER_OFFSET (buf); - ret = klass->parse_frame (parse, &frame); - buf = frame.buffer; + *pos = sframe->offset; + buf = sframe->buffer; + g_assert (buf); /* but it should provide proper time */ *time = GST_BUFFER_TIMESTAMP (buf); *duration = GST_BUFFER_DURATION (buf); - gst_base_parse_frame_free (&frame); - GST_LOG_OBJECT (parse, "frame with time %" GST_TIME_FORMAT " at offset %" G_GINT64_FORMAT, GST_TIME_ARGS (*time), *pos); done: + if (sframe) + gst_base_parse_frame_free (sframe); + /* restore state */ parse->priv->offset = orig_offset; parse->priv->discont = orig_discont; @@ -3544,6 +3570,9 @@ gst_base_parse_locate_time (GstBaseParse * parse, GstClockTime * _time, g_return_val_if_fail (_time != NULL, GST_FLOW_ERROR); g_return_val_if_fail (_offset != NULL, GST_FLOW_ERROR); + GST_DEBUG_OBJECT (parse, "Bisecting for time %" GST_TIME_FORMAT, + GST_TIME_ARGS (*_time)); + /* TODO also make keyframe aware if useful some day */ time = *_time; @@ -3566,13 +3595,21 @@ gst_base_parse_locate_time (GstBaseParse * parse, GstClockTime * _time, /* need initial positions; start and end */ lpos = parse->priv->first_frame_offset; ltime = parse->priv->first_frame_ts; - htime = parse->priv->duration; + if (!gst_base_parse_get_duration (parse, GST_FORMAT_TIME, &htime)) { + GST_DEBUG_OBJECT (parse, "Unknown time duration, cannot bisect"); + return GST_FLOW_ERROR; + } hpos = parse->priv->upstream_size; + GST_DEBUG_OBJECT (parse, + "Bisection initial bounds: bytes %" G_GINT64_FORMAT " %" G_GINT64_FORMAT + ", times %" GST_TIME_FORMAT " %" GST_TIME_FORMAT, lpos, htime, + GST_TIME_ARGS (ltime), GST_TIME_ARGS (htime)); + /* check preconditions are satisfied; * start and end are needed, except for special case where we scan for * last frame to determine duration */ - if (parse->priv->pad_mode != GST_PAD_ACTIVATE_PULL || !hpos || + if (parse->priv->pad_mode != GST_PAD_MODE_PULL || !hpos || !GST_CLOCK_TIME_IS_VALID (ltime) || (!GST_CLOCK_TIME_IS_VALID (htime) && time != G_MAXINT64)) { return GST_FLOW_OK; @@ -3677,13 +3714,13 @@ gst_base_parse_find_offset (GstBaseParse * parse, GstClockTime time, goto exit; } - g_static_mutex_lock (&parse->priv->index_lock); + GST_BASE_PARSE_INDEX_LOCK (parse); if (parse->priv->index) { /* Let's check if we have an index entry for that time */ entry = gst_index_get_assoc_entry (parse->priv->index, parse->priv->index_id, before ? GST_INDEX_LOOKUP_BEFORE : GST_INDEX_LOOKUP_AFTER, - GST_ASSOCIATION_FLAG_KEY_UNIT, GST_FORMAT_TIME, time); + GST_INDEX_ASSOCIATION_FLAG_KEY_UNIT, GST_FORMAT_TIME, time); } if (entry) { @@ -3701,7 +3738,7 @@ gst_base_parse_find_offset (GstBaseParse * parse, GstClockTime time, ts = GST_CLOCK_TIME_NONE; } } - g_static_mutex_unlock (&parse->priv->index_lock); + GST_BASE_PARSE_INDEX_UNLOCK (parse); exit: if (_ts) @@ -3732,7 +3769,7 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event) cur_type, GST_TIME_ARGS (cur), stop_type, GST_TIME_ARGS (stop)); /* no negative rates in push mode */ - if (rate < 0.0 && parse->priv->pad_mode == GST_PAD_ACTIVATE_PUSH) + if (rate < 0.0 && parse->priv->pad_mode == GST_PAD_MODE_PUSH) goto negative_rate; if (cur_type != GST_SEEK_TYPE_SET || @@ -3742,16 +3779,9 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event) /* For any format other than TIME, see if upstream handles * it directly or fail. For TIME, try upstream, but do it ourselves if * it fails upstream */ - if (format != GST_FORMAT_TIME) { - /* default action delegates to upstream */ - res = FALSE; + res = gst_pad_push_event (parse->sinkpad, event); + if (format != GST_FORMAT_TIME || res) goto done; - } else { - gst_event_ref (event); - if ((res = gst_pad_push_event (parse->sinkpad, event))) { - goto done; - } - } /* get flush flag */ flush = flags & GST_SEEK_FLAG_FLUSH; @@ -3787,10 +3817,10 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event) NULL); } else { start_ts = seeksegment.position; - if (!gst_pad_query_convert (parse->srcpad, format, seeksegment.position, + if (!gst_base_parse_convert (parse, format, seeksegment.position, GST_FORMAT_BYTES, &seekpos)) goto convert_failed; - if (!gst_pad_query_convert (parse->srcpad, format, seeksegment.stop, + if (!gst_base_parse_convert (parse, format, seeksegment.stop, GST_FORMAT_BYTES, &seekstop)) goto convert_failed; } @@ -3802,7 +3832,7 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event) "seek stop %" G_GINT64_FORMAT " in bytes: %" G_GINT64_FORMAT, seeksegment.stop, seekstop); - if (parse->priv->pad_mode == GST_PAD_ACTIVATE_PULL) { + if (parse->priv->pad_mode == GST_PAD_MODE_PULL) { gint64 last_stop; GST_DEBUG_OBJECT (parse, "seek in PULL mode"); @@ -3844,11 +3874,11 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event) memcpy (&parse->segment, &seeksegment, sizeof (GstSegment)); /* store the newsegment event so it can be sent from the streaming thread. */ - if (parse->priv->pending_segment) - gst_event_unref (parse->priv->pending_segment); - /* This will be sent later in _loop() */ - parse->priv->pending_segment = gst_event_new_segment (&parse->segment); + parse->priv->pending_segment = TRUE; + parse->priv->pending_events = + g_list_append (parse->priv->pending_events, + gst_event_new_segment (&parse->segment)); GST_DEBUG_OBJECT (parse, "Created newseg format %d, " "start = %" GST_TIME_FORMAT ", stop = %" GST_TIME_FORMAT @@ -3938,9 +3968,6 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event) } done: - /* handled event is ours to free */ - if (res) - gst_event_unref (event); return res; /* ERRORS */ @@ -3989,12 +4016,13 @@ gst_base_parse_handle_tag (GstBaseParse * parse, GstEvent * event) } } +#if 0 static void gst_base_parse_set_index (GstElement * element, GstIndex * index) { GstBaseParse *parse = GST_BASE_PARSE (element); - g_static_mutex_lock (&parse->priv->index_lock); + GST_BASE_PARSE_INDEX_LOCK (parse); if (parse->priv->index) gst_object_unref (parse->priv->index); if (index) { @@ -4005,7 +4033,7 @@ gst_base_parse_set_index (GstElement * element, GstIndex * index) } else { parse->priv->index = NULL; } - g_static_mutex_unlock (&parse->priv->index_lock); + GST_BASE_PARSE_INDEX_UNLOCK (parse); } static GstIndex * @@ -4014,13 +4042,14 @@ gst_base_parse_get_index (GstElement * element) GstBaseParse *parse = GST_BASE_PARSE (element); GstIndex *result = NULL; - g_static_mutex_lock (&parse->priv->index_lock); + GST_BASE_PARSE_INDEX_LOCK (parse); if (parse->priv->index) result = gst_object_ref (parse->priv->index); - g_static_mutex_unlock (&parse->priv->index_lock); + GST_BASE_PARSE_INDEX_UNLOCK (parse); return result; } +#endif static GstStateChangeReturn gst_base_parse_change_state (GstElement * element, GstStateChange transition) @@ -4034,7 +4063,7 @@ gst_base_parse_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_READY_TO_PAUSED: /* If this is our own index destroy it as the * old entries might be wrong for the new stream */ - g_static_mutex_lock (&parse->priv->index_lock); + GST_BASE_PARSE_INDEX_LOCK (parse); if (parse->priv->own_index) { gst_object_unref (parse->priv->index); parse->priv->index = NULL; @@ -4045,12 +4074,12 @@ gst_base_parse_change_state (GstElement * element, GstStateChange transition) if (G_UNLIKELY (!parse->priv->index)) { GST_DEBUG_OBJECT (parse, "no index provided creating our own"); - parse->priv->index = gst_index_factory_make ("memindex"); + parse->priv->index = g_object_new (gst_mem_index_get_type (), NULL); gst_index_get_writer_id (parse->priv->index, GST_OBJECT (parse), &parse->priv->index_id); parse->priv->own_index = TRUE; } - g_static_mutex_unlock (&parse->priv->index_lock); + GST_BASE_PARSE_INDEX_UNLOCK (parse); break; default: break;