gst_buffer_set_caps (buffer, GST_PAD_CAPS (parse->srcpad));
- /* segment times are typically estimates,
- * actual frame data might lead subclass to different timestamps,
- * so override segment start from what is supplied there */
- if (G_UNLIKELY (parse->pending_segment && !parse->priv->passthrough &&
- GST_CLOCK_TIME_IS_VALID (last_start))) {
- gst_event_unref (parse->pending_segment);
- /* stop time possibly lost this way,
- * but unlikely and not really supported */
- parse->pending_segment =
- gst_event_new_new_segment (FALSE, parse->segment.rate,
- parse->segment.format, last_start, -1, last_start);
+ /* segment adjustment magic; only if we are running the whole show */
+ if (!parse->priv->passthrough &&
+ (parse->priv->pad_mode == GST_ACTIVATE_PULL ||
+ parse->priv->upstream_seekable)) {
+ /* segment times are typically estimates,
+ * actual frame data might lead subclass to different timestamps,
+ * so override segment start from what is supplied there */
+ if (G_UNLIKELY (parse->pending_segment &&
+ GST_CLOCK_TIME_IS_VALID (last_start))) {
+ gst_event_unref (parse->pending_segment);
+ parse->segment.start =
+ MIN ((guint64) last_start, (guint64) parse->segment.stop);
+ GST_DEBUG_OBJECT (parse,
+ "adjusting pending segment start to %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (parse->segment.start));
+ parse->pending_segment =
+ gst_event_new_new_segment (FALSE, parse->segment.rate,
+ parse->segment.format, parse->segment.start, parse->segment.stop,
+ parse->segment.start);
+ }
+ /* handle gaps, e.g. non-zero start-time, in as much not handled by above */
+ if (GST_CLOCK_TIME_IS_VALID (parse->segment.last_stop) &&
+ GST_CLOCK_TIME_IS_VALID (last_start)) {
+ GstClockTimeDiff diff;
+
+ /* only send newsegments with increasing start times,
+ * otherwise if these go back and forth downstream (sinks) increase
+ * accumulated time and running_time */
+ diff = GST_CLOCK_DIFF (parse->segment.last_stop, last_start);
+ if (G_UNLIKELY (diff > 2 * GST_SECOND && last_start > parse->segment.start
+ && (!GST_CLOCK_TIME_IS_VALID (parse->segment.stop) ||
+ last_start < parse->segment.stop))) {
+ GST_DEBUG_OBJECT (parse,
+ "Gap of %" G_GINT64_FORMAT " ns detected in stream "
+ "(%" GST_TIME_FORMAT " -> %" GST_TIME_FORMAT "). "
+ "Sending updated NEWSEGMENT events", diff,
+ GST_TIME_ARGS (parse->segment.last_stop),
+ GST_TIME_ARGS (last_start));
+ if (G_UNLIKELY (parse->pending_segment)) {
+ gst_event_unref (parse->pending_segment);
+ parse->segment.start = last_start;
+ parse->pending_segment =
+ gst_event_new_new_segment (FALSE, parse->segment.rate,
+ parse->segment.format, parse->segment.start, parse->segment.stop,
+ parse->segment.start);
+ } else {
+ /* send newsegment events such that the gap is not accounted in
+ * accum time, hence running_time */
+ /* close ahead of gap */
+ gst_pad_push_event (parse->srcpad,
+ gst_event_new_new_segment (TRUE, parse->segment.rate,
+ parse->segment.format, parse->segment.last_stop,
+ parse->segment.last_stop, parse->segment.last_stop));
+ /* skip gap */
+ gst_pad_push_event (parse->srcpad,
+ gst_event_new_new_segment (FALSE, parse->segment.rate,
+ parse->segment.format, last_start, parse->segment.stop,
+ last_start));
+ }
+ /* align segment view with downstream,
+ * prevents double-counting accum when closing segment */
+ gst_segment_set_newsegment (&parse->segment, FALSE,
+ parse->segment.rate, parse->segment.format, last_start,
+ parse->segment.stop, last_start);
+ parse->segment.last_stop = last_start;
+ }
+ }
}
/* and should then also be linked downstream, so safe to send some events */
gst_buffer_unref (buffer);
GST_LOG_OBJECT (parse, "frame (%d bytes) not pushed: %s",
GST_BUFFER_SIZE (buffer), gst_flow_get_name (ret));
+ /* if we are not sufficiently in control, let upstream decide on EOS */
+ if (ret == GST_FLOW_UNEXPECTED &&
+ (parse->priv->passthrough ||
+ (parse->priv->pad_mode == GST_ACTIVATE_PUSH &&
+ !parse->priv->upstream_seekable)))
+ ret = GST_FLOW_OK;
}
/* Update current running segment position */
- if (ret == GST_FLOW_OK && last_stop != GST_CLOCK_TIME_NONE)
+ if (ret == GST_FLOW_OK && last_stop != GST_CLOCK_TIME_NONE &&
+ parse->segment.last_stop < last_stop)
gst_segment_set_last_stop (&parse->segment, GST_FORMAT_TIME, last_stop);
return ret;
if (ret == GST_FLOW_UNEXPECTED)
goto eos;
else if (ret != GST_FLOW_OK)
- goto need_pause;
+ goto pause;
if (parse->priv->discont) {
GST_DEBUG_OBJECT (parse, "marking DISCONT");
if (ret == GST_FLOW_UNEXPECTED)
goto eos;
else if (ret != GST_FLOW_OK)
- goto need_pause;
+ goto pause;
if (GST_BUFFER_SIZE (outbuf) < fsize)
goto eos;
}
/* This always unrefs the outbuf, even if error occurs */
ret = gst_base_parse_handle_and_push_buffer (parse, klass, outbuf);
- if (ret != GST_FLOW_OK) {
- GST_DEBUG_OBJECT (parse, "flow: %s", gst_flow_get_name (ret));
- if (ret == GST_FLOW_UNEXPECTED) {
- gst_pad_push_event (parse->srcpad, gst_event_new_eos ());
- } else if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_UNEXPECTED) {
- GST_ELEMENT_ERROR (parse, STREAM, FAILED, (NULL),
- ("streaming task paused, reason: %s", gst_flow_get_name (ret)));
- gst_pad_push_event (parse->srcpad, gst_event_new_eos ());
- }
- goto need_pause;
- }
+ if (ret != GST_FLOW_OK)
+ goto pause;
done:
gst_object_unref (parse);
return;
-need_pause:
+ /* ERRORS */
+eos:
{
- GST_LOG_OBJECT (parse, "pausing task %d", ret);
- gst_pad_pause_task (pad);
- gst_object_unref (parse);
- return;
+ ret = GST_FLOW_UNEXPECTED;
+ GST_DEBUG_OBJECT (parse, "eos");
+ /* fall-through */
}
-eos:
+pause:
{
- GST_LOG_OBJECT (parse, "sending eos");
- gst_pad_push_event (parse->srcpad, gst_event_new_eos ());
- goto need_pause;
+ gboolean push_eos = FALSE;
+
+ GST_DEBUG_OBJECT (parse, "pausing task, reason %s",
+ gst_flow_get_name (ret));
+ gst_pad_pause_task (parse->sinkpad);
+
+ if (ret == GST_FLOW_UNEXPECTED) {
+ /* handle end-of-stream/segment */
+ if (parse->segment.flags & GST_SEEK_FLAG_SEGMENT) {
+ gint64 stop;
+
+ if ((stop = parse->segment.stop) == -1)
+ stop = parse->segment.duration;
+
+ GST_DEBUG_OBJECT (parse, "sending segment_done");
+
+ gst_element_post_message
+ (GST_ELEMENT_CAST (parse),
+ gst_message_new_segment_done (GST_OBJECT_CAST (parse),
+ GST_FORMAT_TIME, stop));
+ } else {
+ /* If we STILL have zero frames processed, fire an error */
+ if (parse->priv->framecount == 0) {
+ GST_ELEMENT_ERROR (parse, STREAM, WRONG_TYPE,
+ ("No valid frames found before end of stream"), (NULL));
+ }
+ push_eos = TRUE;
+ }
+ } else if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_UNEXPECTED) {
+ /* for fatal errors we post an error message, wrong-state is
+ * not fatal because it happens due to flushes and only means
+ * that we should stop now. */
+ GST_ELEMENT_ERROR (parse, STREAM, FAILED, (NULL),
+ ("streaming stopped, reason %s", gst_flow_get_name (ret)));
+ push_eos = TRUE;
+ }
+ if (push_eos) {
+ /* newsegment before eos */
+ if (parse->pending_segment) {
+ gst_pad_push_event (parse->srcpad, parse->pending_segment);
+ parse->pending_segment = NULL;
+ }
+ gst_pad_push_event (parse->srcpad, gst_event_new_eos ());
+ }
+ gst_object_unref (parse);
}
}
if (rate < 0.0)
goto negative_rate;
- if (cur_type != GST_SEEK_TYPE_SET)
+ if (cur_type != GST_SEEK_TYPE_SET ||
+ (stop_type != GST_SEEK_TYPE_SET && stop_type != GST_SEEK_TYPE_NONE))
goto wrong_type;
/* For any format other than TIME, see if upstream handles
}
}
- /* too much estimating going on to support this sensibly,
- * and no eos/end-of-segment loop handling either ... */
- if ((stop_type == GST_SEEK_TYPE_SET && stop != GST_CLOCK_TIME_NONE) ||
- (stop_type != GST_SEEK_TYPE_NONE && stop_type != GST_SEEK_TYPE_SET) ||
- (flags & GST_SEEK_FLAG_SEGMENT))
- goto wrong_type;
- stop = -1;
-
/* get flush flag */
flush = flags & GST_SEEK_FLAG_FLUSH;
gst_segment_set_seek (&seeksegment, rate, format, flags,
cur_type, cur, stop_type, stop, &update);
- /* figure out the last position we need to play. If it's configured (stop !=
- * -1), use that, else we play until the total duration of the file */
- if ((stop = seeksegment.stop) == -1)
- stop = seeksegment.duration;
-
dstformat = GST_FORMAT_BYTES;
if (!gst_pad_query_convert (parse->srcpad, format, seeksegment.last_stop,
- &dstformat, &seekpos)) {
- GST_DEBUG_OBJECT (parse, "conversion failed");
- return FALSE;
- }
+ &dstformat, &seekpos))
+ goto convert_failed;
GST_DEBUG_OBJECT (parse,
"seek position %" G_GINT64_FORMAT " in bytes: %" G_GINT64_FORMAT, cur,
/* This will be sent later in _loop() */
parse->pending_segment =
gst_event_new_new_segment (FALSE, parse->segment.rate,
- parse->segment.format,
- parse->segment.last_stop, stop, parse->segment.last_stop);
+ parse->segment.format, parse->segment.last_stop, parse->segment.stop,
+ parse->segment.last_stop);
GST_DEBUG_OBJECT (parse, "Created newseg format %d, "
"start = %" GST_TIME_FORMAT ", stop = %" GST_TIME_FORMAT
GST_PAD_STREAM_UNLOCK (parse->sinkpad);
} else {
GstEvent *new_event;
+ gint64 stop_pos;
+
/* The only thing we need to do in PUSH-mode is to send the
seek event (in bytes) to upstream. Segment / flush handling happens
in corresponding src event handlers */
GST_DEBUG_OBJECT (parse, "seek in PUSH mode");
+
+ /* already converted start, need same for stop */
+ dstformat = GST_FORMAT_BYTES;
+ if (!gst_pad_query_convert (parse->srcpad, format, seeksegment.start,
+ &dstformat, &stop_pos))
+ goto convert_failed;
new_event = gst_event_new_seek (rate, GST_FORMAT_BYTES, flush,
- GST_SEEK_TYPE_SET, seekpos, stop_type, stop);
+ GST_SEEK_TYPE_SET, seekpos, stop_type, stop_pos);
res = gst_pad_push_event (parse->sinkpad, new_event);
}
res = FALSE;
goto done;
}
+convert_failed:
+ {
+ GST_DEBUG_OBJECT (parse, "conversion TIME to BYTES failed.");
+ res = FALSE;
+ goto done;
+ }
}
/**