X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=libs%2Fgst%2Fbase%2Fgstbaseparse.c;h=ae41523a474942c2342972ced28eea88d334d119;hb=4e7944b0b92f9b1f14fc4ed4a1cc02460af4487a;hp=7c9507e4f98356abdd2ecea7efdc34eff4124202;hpb=e34c26b509258c6407fc5d00ca6cfbd43da05e82;p=platform%2Fupstream%2Fgstreamer.git diff --git a/libs/gst/base/gstbaseparse.c b/libs/gst/base/gstbaseparse.c index 7c9507e..ae41523 100644 --- a/libs/gst/base/gstbaseparse.c +++ b/libs/gst/base/gstbaseparse.c @@ -2,6 +2,8 @@ * Copyright (C) 2008 Nokia Corporation. All rights reserved. * Contact: Stefan Kost * Copyright (C) 2008 Sebastian Dröge . + * Copyright (C) 2011, Hewlett-Packard Development Company, L.P. + * Author: Sebastian Dröge , Collabora Ltd. * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -82,7 +84,11 @@ * contain a valid frame, this call must return FALSE and optionally * set the @skipsize value to inform base class that how many bytes * it needs to skip in order to find a valid frame. @framesize can always - * indicate a new minimum for current frame parsing. The passed buffer + * indicate a new minimum for current frame parsing. Indicating G_MAXUINT + * for requested amount means subclass simply needs best available + * subsequent data. In push mode this amounts to an additional input buffer + * (thus minimal additional latency), in pull mode this amounts to some + * arbitrary reasonable buffer size increase. The passed buffer * is read-only. Note that @check_valid_frame might receive any small * amount of input data when leftover data is being drained (e.g. at EOS). * @@ -100,7 +106,7 @@ * * Finally the buffer can be pushed downstream and the parsing loop starts * over again. Just prior to actually pushing the buffer in question, - * it is passed to @pre_push_buffer which gives subclass yet one + * it is passed to @pre_push_frame which gives subclass yet one * last chance to examine buffer metadata, or to send some custom (tag) * events, or to perform custom (segment) filtering. * @@ -212,7 +218,7 @@ static const GstFormat fmtlist[] = { GST_FORMAT_DEFAULT, GST_FORMAT_BYTES, GST_FORMAT_TIME, - 0 + GST_FORMAT_UNDEFINED }; #define GST_BASE_PARSE_GET_PRIVATE(obj) \ @@ -227,6 +233,7 @@ struct _GstBaseParsePrivate gint64 duration; GstFormat duration_fmt; gint64 estimated_duration; + gint64 estimated_drift; guint min_frame_size; gboolean passthrough; @@ -237,6 +244,7 @@ struct _GstBaseParsePrivate guint bitrate; guint lead_in, lead_out; GstClockTime lead_in_ts, lead_out_ts; + GstClockTime min_latency, max_latency; gboolean discont; gboolean flushing; @@ -306,6 +314,15 @@ struct _GstBaseParsePrivate /* Segment event that closes the running segment prior to SEEK */ GstEvent *close_segment; + + /* push mode helper frame */ + GstBaseParseFrame frame; + + /* TRUE if we're still detecting the format, i.e. + * if ::detect() is still called for future buffers */ + gboolean detecting; + GList *detect_buffers; + guint detect_buffers_size; }; typedef struct _GstBaseParseSeek @@ -368,8 +385,8 @@ static void gst_base_parse_handle_tag (GstBaseParse * parse, GstEvent * event); static gboolean gst_base_parse_src_event (GstPad * pad, GstEvent * event); static gboolean gst_base_parse_sink_event (GstPad * pad, GstEvent * event); -static gboolean gst_base_parse_query (GstPad * pad, GstQuery ** query); -static gboolean gst_base_parse_sink_setcaps (GstPad * pad, GstCaps * caps); +static gboolean gst_base_parse_query (GstPad * pad, GstQuery * query); +static GstCaps *gst_base_parse_sink_getcaps (GstPad * pad, GstCaps * filter); static const GstQueryType *gst_base_parse_get_querytypes (GstPad * pad); static GstFlowReturn gst_base_parse_chain (GstPad * pad, GstBuffer * buffer); @@ -414,6 +431,11 @@ gst_base_parse_clear_queues (GstBaseParse * parse) g_slist_foreach (parse->priv->buffers_send, (GFunc) gst_buffer_unref, NULL); g_slist_free (parse->priv->buffers_send); parse->priv->buffers_send = NULL; + + g_list_foreach (parse->priv->detect_buffers, (GFunc) gst_buffer_unref, NULL); + g_list_free (parse->priv->detect_buffers); + parse->priv->detect_buffers = NULL; + parse->priv->detect_buffers_size = 0; } static void @@ -501,8 +523,8 @@ gst_base_parse_init (GstBaseParse * parse, GstBaseParseClass * bclass) parse->sinkpad = gst_pad_new_from_template (pad_template, "sink"); gst_pad_set_event_function (parse->sinkpad, GST_DEBUG_FUNCPTR (gst_base_parse_sink_event)); - gst_pad_set_setcaps_function (parse->sinkpad, - GST_DEBUG_FUNCPTR (gst_base_parse_sink_setcaps)); + gst_pad_set_getcaps_function (parse->sinkpad, + GST_DEBUG_FUNCPTR (gst_base_parse_sink_getcaps)); gst_pad_set_chain_function (parse->sinkpad, GST_DEBUG_FUNCPTR (gst_base_parse_chain)); gst_pad_set_activate_function (parse->sinkpad, @@ -566,8 +588,11 @@ gst_base_parse_frame_free (GstBaseParseFrame * frame) frame->buffer = NULL; } - if (!(frame->_private_flags & GST_BASE_PARSE_FRAME_PRIVATE_FLAG_NOALLOC)) + if (!(frame->_private_flags & GST_BASE_PARSE_FRAME_PRIVATE_FLAG_NOALLOC)) { g_slice_free (GstBaseParseFrame, frame); + } else { + memset (frame, 0, sizeof (*frame)); + } } GType @@ -677,6 +702,7 @@ gst_base_parse_reset (GstBaseParse * parse) parse->priv->first_frame_ts = GST_CLOCK_TIME_NONE; parse->priv->first_frame_offset = -1; parse->priv->estimated_duration = -1; + parse->priv->estimated_drift = 0; parse->priv->next_ts = 0; parse->priv->syncable = TRUE; parse->priv->passthrough = FALSE; @@ -720,6 +746,16 @@ gst_base_parse_reset (GstBaseParse * parse) g_slist_foreach (parse->priv->pending_seeks, (GFunc) g_free, NULL); g_slist_free (parse->priv->pending_seeks); parse->priv->pending_seeks = NULL; + + /* we know it is not alloc'ed, but maybe other stuff to free, some day ... */ + parse->priv->frame._private_flags |= + GST_BASE_PARSE_FRAME_PRIVATE_FLAG_NOALLOC; + gst_base_parse_frame_free (&parse->priv->frame); + + g_list_foreach (parse->priv->detect_buffers, (GFunc) gst_buffer_unref, NULL); + g_list_free (parse->priv->detect_buffers); + parse->priv->detect_buffers = NULL; + parse->priv->detect_buffers_size = 0; GST_OBJECT_UNLOCK (parse); } @@ -897,6 +933,24 @@ gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event) GstEvent **eventp; switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_CAPS: + { + GstCaps *caps; + GstBaseParseClass *klass; + + klass = GST_BASE_PARSE_GET_CLASS (parse); + + gst_event_parse_caps (event, &caps); + GST_DEBUG_OBJECT (parse, "caps: %" GST_PTR_FORMAT, caps); + + if (klass->set_sink_caps) + klass->set_sink_caps (parse, caps); + + /* will send our own caps downstream */ + gst_event_unref (event); + handled = TRUE; + break; + } case GST_EVENT_SEGMENT: { const GstSegment *in_segment; @@ -910,7 +964,7 @@ gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event) gboolean update; #endif - in_segment = gst_event_get_segment (event); + gst_event_parse_segment (event, &in_segment); gst_segment_init (&out_segment, GST_FORMAT_TIME); GST_DEBUG_OBJECT (parse, "segment %" GST_SEGMENT_FORMAT, in_segment); @@ -965,7 +1019,7 @@ gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event) event = gst_event_new_segment (&out_segment); - GST_DEBUG_OBJECT (parse, "Converted incoming segment to TIME. " + GST_DEBUG_OBJECT (parse, "Converted incoming segment to TIME. %" GST_SEGMENT_FORMAT, in_segment); } else if (in_segment->format != GST_FORMAT_TIME) { @@ -1036,6 +1090,9 @@ gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event) parse->priv->flushing = FALSE; parse->priv->discont = TRUE; parse->priv->last_ts = GST_CLOCK_TIME_NONE; + parse->priv->frame._private_flags |= + GST_BASE_PARSE_FRAME_PRIVATE_FLAG_NOALLOC; + gst_base_parse_frame_free (&parse->priv->frame); break; case GST_EVENT_EOS: @@ -1224,15 +1281,25 @@ gst_base_parse_update_duration (GstBaseParse * baseparse) peer = gst_pad_get_peer (parse->sinkpad); if (peer) { - GstFormat pformat = GST_FORMAT_BYTES; gboolean qres = FALSE; gint64 ptot, dest_value; - qres = gst_pad_query_duration (peer, &pformat, &ptot); + qres = gst_pad_query_duration (peer, GST_FORMAT_BYTES, &ptot); gst_object_unref (GST_OBJECT (peer)); if (qres) { - if (gst_base_parse_convert (parse, pformat, ptot, + if (gst_base_parse_convert (parse, GST_FORMAT_BYTES, ptot, GST_FORMAT_TIME, &dest_value)) { + + /* inform if duration changed, but try to avoid spamming */ + parse->priv->estimated_drift += + dest_value - parse->priv->estimated_duration; + if (parse->priv->estimated_drift > GST_SECOND || + parse->priv->estimated_drift < -GST_SECOND) { + gst_element_post_message (GST_ELEMENT (parse), + gst_message_new_duration (GST_OBJECT (parse), + GST_FORMAT_TIME, dest_value)); + parse->priv->estimated_drift = 0; + } parse->priv->estimated_duration = dest_value; GST_LOG_OBJECT (parse, "updated estimated duration to %" GST_TIME_FORMAT, @@ -1249,7 +1316,7 @@ gst_base_parse_post_bitrates (GstBaseParse * parse, gboolean post_min, GstTagList *taglist = NULL; if (post_min && parse->priv->post_min_bitrate) { - taglist = gst_tag_list_new (); + taglist = gst_tag_list_new_empty (); gst_tag_list_add (taglist, GST_TAG_MERGE_REPLACE, GST_TAG_MINIMUM_BITRATE, parse->priv->min_bitrate, NULL); @@ -1257,7 +1324,7 @@ gst_base_parse_post_bitrates (GstBaseParse * parse, gboolean post_min, if (post_avg && parse->priv->post_avg_bitrate) { if (taglist == NULL) - taglist = gst_tag_list_new (); + taglist = gst_tag_list_new_empty (); parse->priv->posted_avg_bitrate = parse->priv->avg_bitrate; gst_tag_list_add (taglist, GST_TAG_MERGE_REPLACE, GST_TAG_BITRATE, @@ -1266,7 +1333,7 @@ gst_base_parse_post_bitrates (GstBaseParse * parse, gboolean post_min, if (post_max && parse->priv->post_max_bitrate) { if (taglist == NULL) - taglist = gst_tag_list_new (); + taglist = gst_tag_list_new_empty (); gst_tag_list_add (taglist, GST_TAG_MERGE_REPLACE, GST_TAG_MAXIMUM_BITRATE, parse->priv->max_bitrate, NULL); @@ -1475,7 +1542,7 @@ gst_base_parse_check_seekability (GstBaseParse * parse) guint idx_interval = 0; query = gst_query_new_seeking (GST_FORMAT_BYTES); - if (!gst_pad_peer_query (parse->sinkpad, &query)) { + if (!gst_pad_peer_query (parse->sinkpad, query)) { GST_DEBUG_OBJECT (parse, "seeking query failed"); goto done; } @@ -1484,10 +1551,8 @@ gst_base_parse_check_seekability (GstBaseParse * parse) /* try harder to query upstream size if we didn't get it the first time */ if (seekable && stop == -1) { - GstFormat fmt = GST_FORMAT_BYTES; - GST_DEBUG_OBJECT (parse, "doing duration query to fix up unset stop"); - gst_pad_query_peer_duration (parse->sinkpad, &fmt, &stop); + gst_pad_query_peer_duration (parse->sinkpad, GST_FORMAT_BYTES, &stop); } /* if upstream doesn't know the size, it's likely that it's not seekable in @@ -1523,10 +1588,9 @@ done: static void gst_base_parse_check_upstream (GstBaseParse * parse) { - GstFormat fmt = GST_FORMAT_TIME; gint64 stop; - if (gst_pad_query_peer_duration (parse->sinkpad, &fmt, &stop)) + if (gst_pad_query_peer_duration (parse->sinkpad, GST_FORMAT_TIME, &stop)) if (GST_CLOCK_TIME_IS_VALID (stop) && stop) { /* upstream has one, accept it also, and no further updates */ gst_base_parse_set_duration (parse, GST_FORMAT_TIME, stop, 0); @@ -1556,7 +1620,7 @@ gst_base_parse_check_media (GstBaseParse * parse) if (caps) gst_caps_unref (caps); - GST_DEBUG_OBJECT (parse, "media is video == %d", parse->priv->is_video); + GST_DEBUG_OBJECT (parse, "media is video: %d", parse->priv->is_video); } /* takes ownership of frame */ @@ -1616,7 +1680,7 @@ gst_base_parse_handle_and_push_frame (GstBaseParse * parse, GST_LOG_OBJECT (parse, "parsing frame at offset %" G_GUINT64_FORMAT - " (%#" G_GINT64_MODIFIER "x) of size %d", + " (%#" G_GINT64_MODIFIER "x) of size %" G_GSIZE_FORMAT, GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET (buffer), gst_buffer_get_size (buffer)); @@ -1696,7 +1760,6 @@ gst_base_parse_handle_and_push_frame (GstBaseParse * parse, while ((queued_frame = g_queue_pop_head (&parse->priv->queued_frames))) { gst_base_parse_push_frame (parse, queued_frame); - gst_base_parse_frame_free (queued_frame); } } @@ -1737,7 +1800,7 @@ gst_base_parse_push_frame (GstBaseParse * parse, GstBaseParseFrame * frame) buffer = frame->buffer; GST_LOG_OBJECT (parse, - "processing buffer of size %d with ts %" GST_TIME_FORMAT + "processing buffer of size %" G_GSIZE_FORMAT " with ts %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT, gst_buffer_get_size (buffer), GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)), GST_TIME_ARGS (GST_BUFFER_DURATION (buffer))); @@ -1832,10 +1895,14 @@ gst_base_parse_push_frame (GstBaseParse * parse, GstBaseParseFrame * frame) parse->priv->close_segment = NULL; } if (G_UNLIKELY (parse->priv->pending_segment)) { + GstEvent *pending_segment; + + pending_segment = parse->priv->pending_segment; + parse->priv->pending_segment = NULL; + GST_DEBUG_OBJECT (parse, "%s push pending segment", parse->priv->pad_mode == GST_ACTIVATE_PULL ? "loop" : "chain"); - gst_pad_push_event (parse->srcpad, parse->priv->pending_segment); - parse->priv->pending_segment = NULL; + gst_pad_push_event (parse->srcpad, pending_segment); /* have caps; check identity */ gst_base_parse_check_media (parse); @@ -1877,7 +1944,7 @@ gst_base_parse_push_frame (GstBaseParse * parse, GstBaseParseFrame * frame) GST_BUFFER_TIMESTAMP (buffer) > parse->segment.stop + parse->priv->lead_out_ts) { GST_LOG_OBJECT (parse, "Dropped frame, after segment"); - ret = GST_FLOW_UNEXPECTED; + ret = GST_FLOW_EOS; } else if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer) && GST_BUFFER_DURATION_IS_VALID (buffer) && GST_CLOCK_TIME_IS_VALID (parse->segment.start) && @@ -1901,9 +1968,10 @@ gst_base_parse_push_frame (GstBaseParse * parse, GstBaseParseFrame * frame) ret = GST_FLOW_OK; } else if (ret == GST_FLOW_OK) { if (parse->segment.rate > 0.0) { + GST_LOG_OBJECT (parse, "pushing frame (%" G_GSIZE_FORMAT " bytes) now..", + size); ret = gst_pad_push (parse->srcpad, buffer); - GST_LOG_OBJECT (parse, "frame (%" G_GSIZE_FORMAT " bytes) pushed: %s", - size, gst_flow_get_name (ret)); + GST_LOG_OBJECT (parse, "frame pushed, flow %s", gst_flow_get_name (ret)); } else { GST_LOG_OBJECT (parse, "frame (%" G_GSIZE_FORMAT " bytes) queued for now", size); @@ -1912,11 +1980,11 @@ gst_base_parse_push_frame (GstBaseParse * parse, GstBaseParseFrame * frame) ret = GST_FLOW_OK; } } else { - gst_buffer_unref (buffer); GST_LOG_OBJECT (parse, "frame (%" G_GSIZE_FORMAT " bytes) not pushed: %s", size, gst_flow_get_name (ret)); + gst_buffer_unref (buffer); /* if we are not sufficiently in control, let upstream decide on EOS */ - if (ret == GST_FLOW_UNEXPECTED && + if (ret == GST_FLOW_EOS && (parse->priv->passthrough || (parse->priv->pad_mode == GST_ACTIVATE_PUSH && !parse->priv->upstream_seekable))) @@ -2031,7 +2099,7 @@ gst_base_parse_process_fragment (GstBaseParse * parse, gboolean push_only) parse->priv->buffers_pending = g_slist_reverse (parse->priv->buffers_pending); while (parse->priv->buffers_pending) { buf = GST_BUFFER_CAST (parse->priv->buffers_pending->data); - GST_LOG_OBJECT (parse, "adding pending buffer (size %d)", + GST_LOG_OBJECT (parse, "adding pending buffer (size %" G_GSIZE_FORMAT ")", gst_buffer_get_size (buf)); gst_adapter_push (parse->priv->adapter, buf); parse->priv->buffers_pending = @@ -2117,7 +2185,7 @@ push: /* any trailing unused no longer usable (ideally none) */ if (G_UNLIKELY (gst_adapter_available (parse->priv->adapter))) { - GST_DEBUG_OBJECT (parse, "discarding %d trailing bytes", + GST_DEBUG_OBJECT (parse, "discarding %" G_GSIZE_FORMAT " trailing bytes", gst_adapter_available (parse->priv->adapter)); gst_adapter_clear (parse->priv->adapter); } @@ -2152,19 +2220,94 @@ gst_base_parse_chain (GstPad * pad, GstBuffer * buffer) const guint8 *data; guint old_min_size = 0, min_size, av; GstClockTime timestamp; - GstBaseParseFrame _frame; GstBaseParseFrame *frame; parse = GST_BASE_PARSE (GST_OBJECT_PARENT (pad)); bclass = GST_BASE_PARSE_GET_CLASS (parse); - frame = &_frame; - gst_base_parse_frame_init (frame); + if (parse->priv->detecting) { + GstBuffer *detect_buf; + + if (parse->priv->detect_buffers_size == 0) { + detect_buf = gst_buffer_ref (buffer); + } else { + GList *l; + guint offset = 0; + + detect_buf = gst_buffer_new (); + + for (l = parse->priv->detect_buffers; l; l = l->next) { + gsize tmpsize = gst_buffer_get_size (l->data); + + gst_buffer_copy_into (detect_buf, GST_BUFFER_CAST (l->data), + GST_BUFFER_COPY_MEMORY, offset, tmpsize); + offset += tmpsize; + } + if (buffer) + gst_buffer_copy_into (detect_buf, buffer, GST_BUFFER_COPY_MEMORY, + offset, gst_buffer_get_size (buffer)); + } + + ret = bclass->detect (parse, detect_buf); + gst_buffer_unref (detect_buf); + + if (ret == GST_FLOW_OK) { + GList *l; + + /* Detected something */ + parse->priv->detecting = FALSE; + + for (l = parse->priv->detect_buffers; l; l = l->next) { + if (ret == GST_FLOW_OK && !parse->priv->flushing) + ret = + gst_base_parse_chain (GST_BASE_PARSE_SINK_PAD (parse), + GST_BUFFER_CAST (l->data)); + else + gst_buffer_unref (GST_BUFFER_CAST (l->data)); + } + g_list_free (parse->priv->detect_buffers); + parse->priv->detect_buffers = NULL; + parse->priv->detect_buffers_size = 0; + + if (ret != GST_FLOW_OK) { + return ret; + } + + /* Handle the current buffer */ + } else if (ret == GST_FLOW_NOT_NEGOTIATED) { + /* Still detecting, append buffer or error out if draining */ + + if (parse->priv->drain) { + GST_DEBUG_OBJECT (parse, "Draining but did not detect format yet"); + return GST_FLOW_ERROR; + } else if (parse->priv->flushing) { + g_list_foreach (parse->priv->detect_buffers, (GFunc) gst_buffer_unref, + NULL); + g_list_free (parse->priv->detect_buffers); + parse->priv->detect_buffers = NULL; + parse->priv->detect_buffers_size = 0; + } else { + parse->priv->detect_buffers = + g_list_append (parse->priv->detect_buffers, buffer); + parse->priv->detect_buffers_size += gst_buffer_get_size (buffer); + return GST_FLOW_OK; + } + } else { + /* Something went wrong, subclass responsible for error reporting */ + return ret; + } + + /* And now handle the current buffer if detection worked */ + } + + frame = &parse->priv->frame; if (G_LIKELY (buffer)) { - GST_LOG_OBJECT (parse, "buffer size: %d, offset = %" G_GINT64_FORMAT, + GST_LOG_OBJECT (parse, + "buffer size: %" G_GSIZE_FORMAT ", offset = %" G_GINT64_FORMAT, gst_buffer_get_size (buffer), GST_BUFFER_OFFSET (buffer)); if (G_UNLIKELY (parse->priv->passthrough)) { + gst_base_parse_frame_init (frame); frame->buffer = gst_buffer_make_writable (buffer); return gst_base_parse_push_frame (parse, frame); } @@ -2181,16 +2324,29 @@ gst_base_parse_chain (GstPad * pad, GstBuffer * buffer) gst_adapter_push (parse->priv->adapter, buffer); } + if (G_UNLIKELY (buffer && + GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT))) { + frame->_private_flags |= GST_BASE_PARSE_FRAME_PRIVATE_FLAG_NOALLOC; + gst_base_parse_frame_free (frame); + } + /* Parse and push as many frames as possible */ /* Stop either when adapter is empty or we are flushing */ while (!parse->priv->flushing) { gboolean res; + /* maintain frame state for a single frame parsing round across _chain calls, + * so only init when needed */ + if (!frame->_private_flags) + gst_base_parse_frame_init (frame); + tmpbuf = gst_buffer_new (); old_min_size = 0; /* Synchronization loop */ for (;;) { + /* note: if subclass indicates MAX fsize, + * this will not likely be available anyway ... */ min_size = MAX (parse->priv->min_frame_size, fsize); av = gst_adapter_available (parse->priv->adapter); @@ -2218,9 +2374,9 @@ gst_base_parse_chain (GstPad * pad, GstBuffer * buffer) /* always pass all available data */ data = gst_adapter_map (parse->priv->adapter, av); - gst_buffer_take_memory (tmpbuf, + gst_buffer_take_memory (tmpbuf, -1, gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY, - (gpointer) data, NULL, min_size, 0, min_size)); + (gpointer) data, NULL, av, 0, av)); GST_BUFFER_OFFSET (tmpbuf) = parse->priv->offset; if (parse->priv->discont) { @@ -2233,10 +2389,11 @@ gst_base_parse_chain (GstPad * pad, GstBuffer * buffer) res = bclass->check_valid_frame (parse, frame, &fsize, &skip); gst_adapter_unmap (parse->priv->adapter, 0); gst_buffer_replace (&frame->buffer, NULL); + gst_buffer_remove_memory_range (tmpbuf, 0, -1); if (res) { if (gst_adapter_available (parse->priv->adapter) < fsize) { - GST_DEBUG_OBJECT (parse, - "found valid frame but not enough data available (only %d bytes)", + GST_DEBUG_OBJECT (parse, "found valid frame but not enough data" + " available (only %" G_GSIZE_FORMAT " bytes)", gst_adapter_available (parse->priv->adapter)); gst_buffer_unref (tmpbuf); goto done; @@ -2401,8 +2558,8 @@ gst_base_parse_pull_range (GstBaseParse * parse, guint size, if (gst_buffer_get_size (parse->priv->cache) < size) { GST_DEBUG_OBJECT (parse, "Returning short buffer at offset %" - G_GUINT64_FORMAT ": wanted %u bytes, got %u bytes", parse->priv->offset, - size, gst_buffer_get_size (parse->priv->cache)); + G_GUINT64_FORMAT ": wanted %u bytes, got %" G_GSIZE_FORMAT " bytes", + parse->priv->offset, size, gst_buffer_get_size (parse->priv->cache)); *buffer = parse->priv->cache; parse->priv->cache = NULL; @@ -2432,7 +2589,7 @@ gst_base_parse_handle_previous_fragment (GstBaseParse * parse) if (!parse->priv->last_offset || parse->priv->last_ts <= parse->segment.start) { GST_DEBUG_OBJECT (parse, "past start of segment %" GST_TIME_FORMAT, GST_TIME_ARGS (parse->segment.start)); - ret = GST_FLOW_UNEXPECTED; + ret = GST_FLOW_EOS; goto exit; } @@ -2444,10 +2601,8 @@ gst_base_parse_handle_previous_fragment (GstBaseParse * parse) if (parse->priv->exact_position) { offset = gst_base_parse_find_offset (parse, ts, TRUE, NULL); } else { - GstFormat dstformat = GST_FORMAT_BYTES; - if (!gst_pad_query_convert (parse->srcpad, GST_FORMAT_TIME, ts, - &dstformat, &offset)) { + GST_FORMAT_BYTES, &offset)) { GST_DEBUG_OBJECT (parse, "conversion failed, only BYTE based"); } } @@ -2496,6 +2651,12 @@ gst_base_parse_scan_frame (GstBaseParse * parse, GstBaseParseClass * klass, GST_LOG_OBJECT (parse, "scanning for frame at offset %" G_GUINT64_FORMAT " (%#" G_GINT64_MODIFIER "x)", parse->priv->offset, parse->priv->offset); + /* let's make this efficient for all subclass once and for all; + * maybe it does not need this much, but in the latter case, we know we are + * in pull mode here and might as well try to read and supply more anyway + * (so does the buffer caching mechanism) */ + fsize = 64 * 1024; + while (TRUE) { gboolean res; @@ -2519,6 +2680,30 @@ gst_base_parse_scan_frame (GstBaseParse * parse, GstBaseParseClass * klass, if (gst_buffer_get_size (buffer) < min_size) parse->priv->drain = TRUE; + if (parse->priv->detecting) { + ret = klass->detect (parse, buffer); + if (ret == GST_FLOW_NOT_NEGOTIATED) { + /* If draining we error out, otherwise request a buffer + * with 64kb more */ + if (parse->priv->drain) { + gst_buffer_unref (buffer); + GST_ERROR_OBJECT (parse, "Failed to detect format but draining"); + return GST_FLOW_ERROR; + } else { + fsize += 64 * 1024; + gst_buffer_unref (buffer); + continue; + } + } else if (ret != GST_FLOW_OK) { + gst_buffer_unref (buffer); + GST_ERROR_OBJECT (parse, "detect() returned %s", + gst_flow_get_name (ret)); + return ret; + } + + /* Else handle this buffer normally */ + } + skip = -1; gst_base_parse_frame_update (parse, frame, buffer); res = klass->check_valid_frame (parse, frame, &fsize, &skip); @@ -2546,7 +2731,9 @@ gst_base_parse_scan_frame (GstBaseParse * parse, GstBaseParseClass * klass, if (!parse->priv->discont) parse->priv->sync_offset = parse->priv->offset; parse->priv->discont = TRUE; - /* something changed least; nullify loop check */ + /* something changed at least; nullify loop check */ + if (fsize == G_MAXUINT) + fsize = old_min_size + 64 * 1024; old_min_size = 0; } /* skip == 0 should imply subclass set min_size to need more data; @@ -2576,7 +2763,7 @@ gst_base_parse_scan_frame (GstBaseParse * parse, GstBaseParseClass * klass, goto done; if (gst_buffer_get_size (outbuf) < fsize) { gst_buffer_unref (outbuf); - ret = GST_FLOW_UNEXPECTED; + ret = GST_FLOW_EOS; } } @@ -2630,7 +2817,7 @@ gst_base_parse_loop (GstPad * pad) ret = gst_base_parse_handle_and_push_frame (parse, klass, &frame); /* eat expected eos signalling past segment in reverse playback */ - if (parse->segment.rate < 0.0 && ret == GST_FLOW_UNEXPECTED && + if (parse->segment.rate < 0.0 && ret == GST_FLOW_EOS && parse->segment.position >= parse->segment.stop) { GST_DEBUG_OBJECT (parse, "downstream has reached end of segment"); /* push what was accumulated during loop run */ @@ -2641,7 +2828,7 @@ gst_base_parse_loop (GstPad * pad) } done: - if (ret == GST_FLOW_UNEXPECTED) + if (ret == GST_FLOW_EOS) goto eos; else if (ret != GST_FLOW_OK) goto pause; @@ -2652,7 +2839,7 @@ done: /* ERRORS */ eos: { - ret = GST_FLOW_UNEXPECTED; + ret = GST_FLOW_EOS; GST_DEBUG_OBJECT (parse, "eos"); /* fall-through */ } @@ -2664,7 +2851,7 @@ pause: gst_flow_get_name (ret)); gst_pad_pause_task (parse->sinkpad); - if (ret == GST_FLOW_UNEXPECTED) { + if (ret == GST_FLOW_EOS) { /* handle end-of-stream/segment */ if (parse->segment.flags & GST_SEEK_FLAG_SEGMENT) { gint64 stop; @@ -2686,7 +2873,7 @@ pause: } push_eos = TRUE; } - } else if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_UNEXPECTED) { + } else if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS) { /* for fatal errors we post an error message, wrong-state is * not fatal because it happens due to flushes and only means * that we should stop now. */ @@ -2711,12 +2898,24 @@ gst_base_parse_sink_activate (GstPad * sinkpad) { GstBaseParse *parse; gboolean result = TRUE; + GstQuery *query; + gboolean pull_mode; parse = GST_BASE_PARSE (gst_pad_get_parent (sinkpad)); GST_DEBUG_OBJECT (parse, "sink activate"); - if (gst_pad_check_pull_range (sinkpad)) { + query = gst_query_new_scheduling (); + result = gst_pad_peer_query (sinkpad, query); + if (result) { + gst_query_parse_scheduling (query, &pull_mode, NULL, NULL, NULL, NULL, + NULL); + } else { + pull_mode = FALSE; + } + gst_query_unref (query); + + if (pull_mode) { GST_DEBUG_OBJECT (parse, "trying to activate in pull mode"); result = gst_pad_activate_pull (sinkpad, TRUE); } else { @@ -2733,7 +2932,7 @@ static gboolean gst_base_parse_activate (GstBaseParse * parse, gboolean active) { GstBaseParseClass *klass; - gboolean result = FALSE; + gboolean result = TRUE; GST_DEBUG_OBJECT (parse, "activate %d", active); @@ -2742,6 +2941,10 @@ gst_base_parse_activate (GstBaseParse * parse, gboolean active) if (active) { if (parse->priv->pad_mode == GST_ACTIVATE_NONE && klass->start) result = klass->start (parse); + + /* If the subclass implements ::detect we want to + * call it for the first buffers now */ + parse->priv->detecting = (klass->detect != NULL); } else { /* We must make sure streaming has finished before resetting things * and calling the ::stop vfunc */ @@ -3002,9 +3205,9 @@ gst_base_parse_set_syncable (GstBaseParse * parse, gboolean syncable) * parsing, and the parser should operate in passthrough mode (which only * applies when operating in push mode). That is, incoming buffers are * pushed through unmodified, i.e. no @check_valid_frame or @parse_frame - * callbacks will be invoked, but @pre_push_buffer will still be invoked, + * callbacks will be invoked, but @pre_push_frame will still be invoked, * so subclass can perform as much or as little is appropriate for - * passthrough semantics in @pre_push_buffer. + * passthrough semantics in @pre_push_frame. * * Since: 0.10.33 */ @@ -3015,6 +3218,31 @@ gst_base_parse_set_passthrough (GstBaseParse * parse, gboolean passthrough) GST_INFO_OBJECT (parse, "passthrough: %s", (passthrough) ? "yes" : "no"); } +/** + * gst_base_parse_set_latency: + * @parse: a #GstBaseParse + * @min_latency: minimum parse latency + * @max_latency: maximum parse latency + * + * Sets the minimum and maximum (which may likely be equal) latency introduced + * by the parsing process. If there is such a latency, which depends on the + * particular parsing of the format, it typically corresponds to 1 frame duration. + * + * Since: 0.10.34 + */ +void +gst_base_parse_set_latency (GstBaseParse * parse, GstClockTime min_latency, + GstClockTime max_latency) +{ + GST_OBJECT_LOCK (parse); + parse->priv->min_latency = min_latency; + parse->priv->max_latency = max_latency; + GST_OBJECT_UNLOCK (parse); + GST_INFO_OBJECT (parse, "min/max latency %" GST_TIME_FORMAT ", %" + GST_TIME_FORMAT, GST_TIME_ARGS (min_latency), + GST_TIME_ARGS (max_latency)); +} + static gboolean gst_base_parse_get_duration (GstBaseParse * parse, GstFormat format, GstClockTime * duration) @@ -3052,14 +3280,14 @@ gst_base_parse_get_querytypes (GstPad * pad) GST_QUERY_FORMATS, GST_QUERY_SEEKING, GST_QUERY_CONVERT, - 0 + GST_QUERY_NONE }; return list; } static gboolean -gst_base_parse_query (GstPad * pad, GstQuery ** query) +gst_base_parse_query (GstPad * pad, GstQuery * query) { GstBaseParse *parse; gboolean res = FALSE; @@ -3068,37 +3296,38 @@ gst_base_parse_query (GstPad * pad, GstQuery ** query) GST_LOG_OBJECT (parse, "handling query: %" GST_PTR_FORMAT, query); - switch (GST_QUERY_TYPE (*query)) { + switch (GST_QUERY_TYPE (query)) { case GST_QUERY_POSITION: { gint64 dest_value; GstFormat format; GST_DEBUG_OBJECT (parse, "position query"); - gst_query_parse_position (*query, &format, NULL); - - GST_OBJECT_LOCK (parse); - if (format == GST_FORMAT_BYTES) { - dest_value = parse->priv->offset; - res = TRUE; - } else if (format == parse->segment.format && - GST_CLOCK_TIME_IS_VALID (parse->segment.position)) { - dest_value = parse->segment.position; - res = TRUE; - } - GST_OBJECT_UNLOCK (parse); + gst_query_parse_position (query, &format, NULL); - if (res) { - *query = gst_query_make_writable (*query); - gst_query_set_position (*query, format, dest_value); - } else { - res = gst_pad_query_default (pad, query); + /* try upstream first */ + res = gst_pad_query_default (pad, query); + if (!res) { + /* Fall back on interpreting segment */ + GST_OBJECT_LOCK (parse); + if (format == GST_FORMAT_BYTES) { + dest_value = parse->priv->offset; + res = TRUE; + } else if (format == parse->segment.format && + GST_CLOCK_TIME_IS_VALID (parse->segment.position)) { + dest_value = gst_segment_to_stream_time (&parse->segment, + parse->segment.format, parse->segment.position); + res = TRUE; + } + GST_OBJECT_UNLOCK (parse); if (!res) { /* no precise result, upstream no idea either, then best estimate */ /* priv->offset is updated in both PUSH/PULL modes */ res = gst_base_parse_convert (parse, GST_FORMAT_BYTES, parse->priv->offset, format, &dest_value); } + if (res) + gst_query_set_position (query, format, dest_value); } break; } @@ -3108,7 +3337,7 @@ gst_base_parse_query (GstPad * pad, GstQuery ** query) GstClockTime duration; GST_DEBUG_OBJECT (parse, "duration query"); - gst_query_parse_duration (*query, &format, NULL); + gst_query_parse_duration (query, &format, NULL); /* consult upstream */ res = gst_pad_query_default (pad, query); @@ -3116,10 +3345,8 @@ gst_base_parse_query (GstPad * pad, GstQuery ** query) /* otherwise best estimate from us */ if (!res) { res = gst_base_parse_get_duration (parse, format, &duration); - if (res) { - *query = gst_query_make_writable (*query); - gst_query_set_duration (*query, format, duration); - } + if (res) + gst_query_set_duration (query, format, duration); } break; } @@ -3130,14 +3357,14 @@ gst_base_parse_query (GstPad * pad, GstQuery ** query) gboolean seekable = FALSE; GST_DEBUG_OBJECT (parse, "seeking query"); - gst_query_parse_seeking (*query, &fmt, NULL, NULL, NULL); + gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL); /* consult upstream */ res = gst_pad_query_default (pad, query); /* we may be able to help if in TIME */ if (fmt == GST_FORMAT_TIME && gst_base_parse_is_seekable (parse)) { - gst_query_parse_seeking (*query, &fmt, &seekable, NULL, NULL); + gst_query_parse_seeking (query, &fmt, &seekable, NULL, NULL); /* already OK if upstream takes care */ GST_LOG_OBJECT (parse, "upstream handled %d, seekable %d", res, seekable); @@ -3152,17 +3379,14 @@ gst_base_parse_query (GstPad * pad, GstQuery ** query) GST_LOG_OBJECT (parse, "already determine upstream seekabled: %d", seekable); } - *query = gst_query_make_writable (*query); - gst_query_set_seeking (*query, GST_FORMAT_TIME, seekable, 0, - duration); + gst_query_set_seeking (query, GST_FORMAT_TIME, seekable, 0, duration); res = TRUE; } } break; } case GST_QUERY_FORMATS: - *query = gst_query_make_writable (*query); - gst_query_set_formatsv (*query, 3, fmtlist); + gst_query_set_formatsv (query, 3, fmtlist); res = TRUE; break; case GST_QUERY_CONVERT: @@ -3170,18 +3394,40 @@ gst_base_parse_query (GstPad * pad, GstQuery ** query) GstFormat src_format, dest_format; gint64 src_value, dest_value; - gst_query_parse_convert (*query, &src_format, &src_value, + gst_query_parse_convert (query, &src_format, &src_value, &dest_format, &dest_value); res = gst_base_parse_convert (parse, src_format, src_value, dest_format, &dest_value); if (res) { - *query = gst_query_make_writable (*query); - gst_query_set_convert (*query, src_format, src_value, + gst_query_set_convert (query, src_format, src_value, dest_format, dest_value); } break; } + case GST_QUERY_LATENCY: + { + if ((res = gst_pad_peer_query (parse->sinkpad, query))) { + gboolean live; + GstClockTime min_latency, max_latency; + + gst_query_parse_latency (query, &live, &min_latency, &max_latency); + GST_DEBUG_OBJECT (parse, "Peer latency: live %d, min %" + GST_TIME_FORMAT " max %" GST_TIME_FORMAT, live, + GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency)); + + GST_OBJECT_LOCK (parse); + /* add our latency */ + if (min_latency != -1) + min_latency += parse->priv->min_latency; + if (max_latency != -1) + max_latency += parse->priv->max_latency; + GST_OBJECT_UNLOCK (parse); + + gst_query_set_latency (query, live, min_latency, max_latency); + } + break; + } default: res = gst_pad_query_default (pad, query); break; @@ -3230,7 +3476,7 @@ gst_base_parse_find_frame (GstBaseParse * parse, gint64 * pos, buf = frame.buffer; GST_LOG_OBJECT (parse, "peek parsing frame at offset %" G_GUINT64_FORMAT - " (%#" G_GINT64_MODIFIER "x) of size %d", + " (%#" G_GINT64_MODIFIER "x) of size %" G_GSIZE_FORMAT, GST_BUFFER_OFFSET (buf), GST_BUFFER_OFFSET (buf), gst_buffer_get_size (buf)); @@ -3349,7 +3595,7 @@ gst_base_parse_locate_time (GstBaseParse * parse, GstClockTime * _time, GST_TIME_ARGS (time), newpos); ret = gst_base_parse_find_frame (parse, &newpos, &newtime, &dur); - if (ret == GST_FLOW_UNEXPECTED) { + if (ret == GST_FLOW_EOS) { /* heuristic HACK */ hpos = MAX (lpos, hpos - chunk); continue; @@ -3452,7 +3698,6 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event) gboolean flush, update, res = TRUE, accurate; gint64 cur, stop, seekpos, seekstop; GstSegment seeksegment = { 0, }; - GstFormat dstformat; GstClockTime start_ts; gst_event_parse_seek (event, &rate, &format, &flags, @@ -3519,12 +3764,11 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event) NULL); } else { start_ts = seeksegment.position; - dstformat = GST_FORMAT_BYTES; if (!gst_pad_query_convert (parse->srcpad, format, seeksegment.position, - &dstformat, &seekpos)) + GST_FORMAT_BYTES, &seekpos)) goto convert_failed; if (!gst_pad_query_convert (parse->srcpad, format, seeksegment.stop, - &dstformat, &seekstop)) + GST_FORMAT_BYTES, &seekstop)) goto convert_failed; } @@ -3565,8 +3809,8 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event) /* prepare for streaming again */ if (flush) { GST_DEBUG_OBJECT (parse, "sending flush stop"); - gst_pad_push_event (parse->srcpad, gst_event_new_flush_stop ()); - gst_pad_push_event (parse->sinkpad, gst_event_new_flush_stop ()); + gst_pad_push_event (parse->srcpad, gst_event_new_flush_stop (TRUE)); + gst_pad_push_event (parse->sinkpad, gst_event_new_flush_stop (TRUE)); gst_base_parse_clear_queues (parse); } else { /* keep track of our position */ @@ -3633,14 +3877,15 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event) } else { GstEvent *new_event; GstBaseParseSeek *seek; + GstSeekFlags flags = (flush ? GST_SEEK_FLAG_FLUSH : GST_SEEK_FLAG_NONE); /* The only thing we need to do in PUSH-mode is to send the seek event (in bytes) to upstream. Segment / flush handling happens in corresponding src event handlers */ GST_DEBUG_OBJECT (parse, "seek in PUSH mode"); - if (seekstop >= 0 && seekpos <= seekpos) + if (seekstop >= 0 && seekstop <= seekpos) seekstop = seekpos; - new_event = gst_event_new_seek (rate, GST_FORMAT_BYTES, flush, + new_event = gst_event_new_seek (rate, GST_FORMAT_BYTES, flags, GST_SEEK_TYPE_SET, seekpos, stop_type, seekstop); /* store segment info so its precise details can be reconstructed when @@ -3721,22 +3966,27 @@ gst_base_parse_handle_tag (GstBaseParse * parse, GstEvent * event) } } -static gboolean -gst_base_parse_sink_setcaps (GstPad * pad, GstCaps * caps) +static GstCaps * +gst_base_parse_sink_getcaps (GstPad * pad, GstCaps * filter) { GstBaseParse *parse; GstBaseParseClass *klass; - gboolean res = TRUE; + GstCaps *caps; - parse = GST_BASE_PARSE (GST_PAD_PARENT (pad)); + parse = GST_BASE_PARSE (gst_pad_get_parent (pad)); klass = GST_BASE_PARSE_GET_CLASS (parse); + g_assert (pad == GST_BASE_PARSE_SINK_PAD (parse)); - GST_DEBUG_OBJECT (parse, "caps: %" GST_PTR_FORMAT, caps); + if (klass->get_sink_caps) + caps = klass->get_sink_caps (parse, filter); + else + caps = gst_pad_proxy_getcaps (pad, filter); + gst_object_unref (parse); - if (klass->set_sink_caps) - res = klass->set_sink_caps (parse, caps); + GST_LOG_OBJECT (parse, "sink getcaps returning caps %" GST_PTR_FORMAT, caps); + + return caps; - return res; } static void