From 3304f194a91a34044c3d31f06e0dd23d874691b3 Mon Sep 17 00:00:00 2001 From: Mark Nauwelaerts Date: Tue, 21 Sep 2010 13:57:10 +0200 Subject: [PATCH] baseparse: proper and more extended segment and seek handling That is, loop pause handling, segment seek support, newsegment for gaps, etc --- gst/audioparsers/gstbaseparse.c | 206 +++++++++++++++++++++++++++++----------- 1 file changed, 151 insertions(+), 55 deletions(-) diff --git a/gst/audioparsers/gstbaseparse.c b/gst/audioparsers/gstbaseparse.c index ca8fdb5..bc50a9c 100644 --- a/gst/audioparsers/gstbaseparse.c +++ b/gst/audioparsers/gstbaseparse.c @@ -1323,17 +1323,73 @@ gst_base_parse_push_buffer (GstBaseParse * parse, GstBuffer * buffer) gst_buffer_set_caps (buffer, GST_PAD_CAPS (parse->srcpad)); - /* segment times are typically estimates, - * actual frame data might lead subclass to different timestamps, - * so override segment start from what is supplied there */ - if (G_UNLIKELY (parse->pending_segment && !parse->priv->passthrough && - GST_CLOCK_TIME_IS_VALID (last_start))) { - gst_event_unref (parse->pending_segment); - /* stop time possibly lost this way, - * but unlikely and not really supported */ - parse->pending_segment = - gst_event_new_new_segment (FALSE, parse->segment.rate, - parse->segment.format, last_start, -1, last_start); + /* segment adjustment magic; only if we are running the whole show */ + if (!parse->priv->passthrough && + (parse->priv->pad_mode == GST_ACTIVATE_PULL || + parse->priv->upstream_seekable)) { + /* segment times are typically estimates, + * actual frame data might lead subclass to different timestamps, + * so override segment start from what is supplied there */ + if (G_UNLIKELY (parse->pending_segment && + GST_CLOCK_TIME_IS_VALID (last_start))) { + gst_event_unref (parse->pending_segment); + parse->segment.start = + MIN ((guint64) last_start, (guint64) parse->segment.stop); + GST_DEBUG_OBJECT (parse, + "adjusting pending segment start to %" GST_TIME_FORMAT, + GST_TIME_ARGS (parse->segment.start)); + parse->pending_segment = + gst_event_new_new_segment (FALSE, parse->segment.rate, + parse->segment.format, parse->segment.start, parse->segment.stop, + parse->segment.start); + } + /* handle gaps, e.g. non-zero start-time, in as much not handled by above */ + if (GST_CLOCK_TIME_IS_VALID (parse->segment.last_stop) && + GST_CLOCK_TIME_IS_VALID (last_start)) { + GstClockTimeDiff diff; + + /* only send newsegments with increasing start times, + * otherwise if these go back and forth downstream (sinks) increase + * accumulated time and running_time */ + diff = GST_CLOCK_DIFF (parse->segment.last_stop, last_start); + if (G_UNLIKELY (diff > 2 * GST_SECOND && last_start > parse->segment.start + && (!GST_CLOCK_TIME_IS_VALID (parse->segment.stop) || + last_start < parse->segment.stop))) { + GST_DEBUG_OBJECT (parse, + "Gap of %" G_GINT64_FORMAT " ns detected in stream " + "(%" GST_TIME_FORMAT " -> %" GST_TIME_FORMAT "). " + "Sending updated NEWSEGMENT events", diff, + GST_TIME_ARGS (parse->segment.last_stop), + GST_TIME_ARGS (last_start)); + if (G_UNLIKELY (parse->pending_segment)) { + gst_event_unref (parse->pending_segment); + parse->segment.start = last_start; + parse->pending_segment = + gst_event_new_new_segment (FALSE, parse->segment.rate, + parse->segment.format, parse->segment.start, parse->segment.stop, + parse->segment.start); + } else { + /* send newsegment events such that the gap is not accounted in + * accum time, hence running_time */ + /* close ahead of gap */ + gst_pad_push_event (parse->srcpad, + gst_event_new_new_segment (TRUE, parse->segment.rate, + parse->segment.format, parse->segment.last_stop, + parse->segment.last_stop, parse->segment.last_stop)); + /* skip gap */ + gst_pad_push_event (parse->srcpad, + gst_event_new_new_segment (FALSE, parse->segment.rate, + parse->segment.format, last_start, parse->segment.stop, + last_start)); + } + /* align segment view with downstream, + * prevents double-counting accum when closing segment */ + gst_segment_set_newsegment (&parse->segment, FALSE, + parse->segment.rate, parse->segment.format, last_start, + parse->segment.stop, last_start); + parse->segment.last_stop = last_start; + } + } } /* and should then also be linked downstream, so safe to send some events */ @@ -1405,10 +1461,17 @@ gst_base_parse_push_buffer (GstBaseParse * parse, GstBuffer * buffer) gst_buffer_unref (buffer); GST_LOG_OBJECT (parse, "frame (%d bytes) not pushed: %s", GST_BUFFER_SIZE (buffer), gst_flow_get_name (ret)); + /* if we are not sufficiently in control, let upstream decide on EOS */ + if (ret == GST_FLOW_UNEXPECTED && + (parse->priv->passthrough || + (parse->priv->pad_mode == GST_ACTIVATE_PUSH && + !parse->priv->upstream_seekable))) + ret = GST_FLOW_OK; } /* Update current running segment position */ - if (ret == GST_FLOW_OK && last_stop != GST_CLOCK_TIME_NONE) + if (ret == GST_FLOW_OK && last_stop != GST_CLOCK_TIME_NONE && + parse->segment.last_stop < last_stop) gst_segment_set_last_stop (&parse->segment, GST_FORMAT_TIME, last_stop); return ret; @@ -1729,7 +1792,7 @@ gst_base_parse_loop (GstPad * pad) if (ret == GST_FLOW_UNEXPECTED) goto eos; else if (ret != GST_FLOW_OK) - goto need_pause; + goto pause; if (parse->priv->discont) { GST_DEBUG_OBJECT (parse, "marking DISCONT"); @@ -1779,7 +1842,7 @@ gst_base_parse_loop (GstPad * pad) if (ret == GST_FLOW_UNEXPECTED) goto eos; else if (ret != GST_FLOW_OK) - goto need_pause; + goto pause; if (GST_BUFFER_SIZE (outbuf) < fsize) goto eos; } @@ -1793,34 +1856,67 @@ gst_base_parse_loop (GstPad * pad) /* This always unrefs the outbuf, even if error occurs */ ret = gst_base_parse_handle_and_push_buffer (parse, klass, outbuf); - if (ret != GST_FLOW_OK) { - GST_DEBUG_OBJECT (parse, "flow: %s", gst_flow_get_name (ret)); - if (ret == GST_FLOW_UNEXPECTED) { - gst_pad_push_event (parse->srcpad, gst_event_new_eos ()); - } else if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_UNEXPECTED) { - GST_ELEMENT_ERROR (parse, STREAM, FAILED, (NULL), - ("streaming task paused, reason: %s", gst_flow_get_name (ret))); - gst_pad_push_event (parse->srcpad, gst_event_new_eos ()); - } - goto need_pause; - } + if (ret != GST_FLOW_OK) + goto pause; done: gst_object_unref (parse); return; -need_pause: + /* ERRORS */ +eos: { - GST_LOG_OBJECT (parse, "pausing task %d", ret); - gst_pad_pause_task (pad); - gst_object_unref (parse); - return; + ret = GST_FLOW_UNEXPECTED; + GST_DEBUG_OBJECT (parse, "eos"); + /* fall-through */ } -eos: +pause: { - GST_LOG_OBJECT (parse, "sending eos"); - gst_pad_push_event (parse->srcpad, gst_event_new_eos ()); - goto need_pause; + gboolean push_eos = FALSE; + + GST_DEBUG_OBJECT (parse, "pausing task, reason %s", + gst_flow_get_name (ret)); + gst_pad_pause_task (parse->sinkpad); + + if (ret == GST_FLOW_UNEXPECTED) { + /* handle end-of-stream/segment */ + if (parse->segment.flags & GST_SEEK_FLAG_SEGMENT) { + gint64 stop; + + if ((stop = parse->segment.stop) == -1) + stop = parse->segment.duration; + + GST_DEBUG_OBJECT (parse, "sending segment_done"); + + gst_element_post_message + (GST_ELEMENT_CAST (parse), + gst_message_new_segment_done (GST_OBJECT_CAST (parse), + GST_FORMAT_TIME, stop)); + } else { + /* If we STILL have zero frames processed, fire an error */ + if (parse->priv->framecount == 0) { + GST_ELEMENT_ERROR (parse, STREAM, WRONG_TYPE, + ("No valid frames found before end of stream"), (NULL)); + } + push_eos = TRUE; + } + } else if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_UNEXPECTED) { + /* for fatal errors we post an error message, wrong-state is + * not fatal because it happens due to flushes and only means + * that we should stop now. */ + GST_ELEMENT_ERROR (parse, STREAM, FAILED, (NULL), + ("streaming stopped, reason %s", gst_flow_get_name (ret))); + push_eos = TRUE; + } + if (push_eos) { + /* newsegment before eos */ + if (parse->pending_segment) { + gst_pad_push_event (parse->srcpad, parse->pending_segment); + parse->pending_segment = NULL; + } + gst_pad_push_event (parse->srcpad, gst_event_new_eos ()); + } + gst_object_unref (parse); } } @@ -2368,7 +2464,8 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event) if (rate < 0.0) goto negative_rate; - if (cur_type != GST_SEEK_TYPE_SET) + if (cur_type != GST_SEEK_TYPE_SET || + (stop_type != GST_SEEK_TYPE_SET && stop_type != GST_SEEK_TYPE_NONE)) goto wrong_type; /* For any format other than TIME, see if upstream handles @@ -2384,14 +2481,6 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event) } } - /* too much estimating going on to support this sensibly, - * and no eos/end-of-segment loop handling either ... */ - if ((stop_type == GST_SEEK_TYPE_SET && stop != GST_CLOCK_TIME_NONE) || - (stop_type != GST_SEEK_TYPE_NONE && stop_type != GST_SEEK_TYPE_SET) || - (flags & GST_SEEK_FLAG_SEGMENT)) - goto wrong_type; - stop = -1; - /* get flush flag */ flush = flags & GST_SEEK_FLAG_FLUSH; @@ -2403,17 +2492,10 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event) gst_segment_set_seek (&seeksegment, rate, format, flags, cur_type, cur, stop_type, stop, &update); - /* figure out the last position we need to play. If it's configured (stop != - * -1), use that, else we play until the total duration of the file */ - if ((stop = seeksegment.stop) == -1) - stop = seeksegment.duration; - dstformat = GST_FORMAT_BYTES; if (!gst_pad_query_convert (parse->srcpad, format, seeksegment.last_stop, - &dstformat, &seekpos)) { - GST_DEBUG_OBJECT (parse, "conversion failed"); - return FALSE; - } + &dstformat, &seekpos)) + goto convert_failed; GST_DEBUG_OBJECT (parse, "seek position %" G_GINT64_FORMAT " in bytes: %" G_GINT64_FORMAT, cur, @@ -2476,8 +2558,8 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event) /* This will be sent later in _loop() */ parse->pending_segment = gst_event_new_new_segment (FALSE, parse->segment.rate, - parse->segment.format, - parse->segment.last_stop, stop, parse->segment.last_stop); + parse->segment.format, parse->segment.last_stop, parse->segment.stop, + parse->segment.last_stop); GST_DEBUG_OBJECT (parse, "Created newseg format %d, " "start = %" GST_TIME_FORMAT ", stop = %" GST_TIME_FORMAT @@ -2502,12 +2584,20 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event) GST_PAD_STREAM_UNLOCK (parse->sinkpad); } else { GstEvent *new_event; + gint64 stop_pos; + /* The only thing we need to do in PUSH-mode is to send the seek event (in bytes) to upstream. Segment / flush handling happens in corresponding src event handlers */ GST_DEBUG_OBJECT (parse, "seek in PUSH mode"); + + /* already converted start, need same for stop */ + dstformat = GST_FORMAT_BYTES; + if (!gst_pad_query_convert (parse->srcpad, format, seeksegment.start, + &dstformat, &stop_pos)) + goto convert_failed; new_event = gst_event_new_seek (rate, GST_FORMAT_BYTES, flush, - GST_SEEK_TYPE_SET, seekpos, stop_type, stop); + GST_SEEK_TYPE_SET, seekpos, stop_type, stop_pos); res = gst_pad_push_event (parse->sinkpad, new_event); } @@ -2528,6 +2618,12 @@ wrong_type: res = FALSE; goto done; } +convert_failed: + { + GST_DEBUG_OBJECT (parse, "conversion TIME to BYTES failed."); + res = FALSE; + goto done; + } } /** -- 2.7.4