From 4b8f411c5d94ed956521c358c6052f7ff59af06a Mon Sep 17 00:00:00 2001 From: Edward Hervey Date: Thu, 21 Jul 2022 15:26:14 +0200 Subject: [PATCH] multiqueue: Handle gapless input When dealing with gapless input (i.e. streams with changing group-id in GST_EVENT_STREAM_START), we need to take into account the elapsed running-time (if applicable) in order to properly calculate levels and output time. Without doing this all incoming data from future groups would be considered as being "late" and would be consumed immediately. This does **NOT** modify the actual segment and buffer times, and is only used internally. Part-of: --- .../gstreamer/plugins/elements/gstmultiqueue.c | 63 ++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/subprojects/gstreamer/plugins/elements/gstmultiqueue.c b/subprojects/gstreamer/plugins/elements/gstmultiqueue.c index 10ad203..d23f8ec 100644 --- a/subprojects/gstreamer/plugins/elements/gstmultiqueue.c +++ b/subprojects/gstreamer/plugins/elements/gstmultiqueue.c @@ -147,6 +147,15 @@ struct _GstSingleQueue /* TRUE if either position needs to be recalculated */ gboolean sink_tainted, src_tainted; + /* stream group id */ + guint32 sink_stream_gid; + guint32 src_stream_gid; + + /* TRUE if the stream group-id changed. Resetted to FALSE the next time the + * segment is calculated */ + gboolean sink_stream_gid_changed; + gboolean src_stream_gid_changed; + /* queue of data */ GstDataQueue *queue; GstDataQueueSize max_size, extra_size; @@ -1343,6 +1352,9 @@ gst_single_queue_pause (GstMultiQueue * mq, GstSingleQueue * sq) } sq->sink_tainted = sq->src_tainted = TRUE; + sq->sink_stream_gid = sq->src_stream_gid = GST_GROUP_ID_INVALID; + sq->sink_stream_gid_changed = FALSE; + sq->src_stream_gid_changed = FALSE; return result; } @@ -1358,6 +1370,9 @@ gst_single_queue_stop (GstMultiQueue * mq, GstSingleQueue * sq) gst_object_unref (srcpad); } sq->sink_tainted = sq->src_tainted = TRUE; + sq->sink_stream_gid = sq->src_stream_gid = GST_GROUP_ID_INVALID; + sq->sink_stream_gid_changed = FALSE; + sq->src_stream_gid_changed = FALSE; return result; } @@ -1762,6 +1777,23 @@ static void apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event, GstSegment * segment) { + GstClockTimeDiff ppos = 0; + + /* If we switched groups, grab the previous position */ + if (segment->rate > 0.0) { + if (segment == &sq->sink_segment && sq->sink_stream_gid_changed) { + ppos = + gst_segment_to_running_time (segment, GST_FORMAT_TIME, + segment->position); + sq->sink_stream_gid_changed = FALSE; + } else if (segment == &sq->src_segment && sq->src_stream_gid_changed) { + ppos = + gst_segment_to_running_time (segment, GST_FORMAT_TIME, + segment->position); + sq->src_stream_gid_changed = FALSE; + } + } + gst_event_copy_segment (event, segment); /* now configure the values, we use these to track timestamps on the @@ -1776,12 +1808,19 @@ apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event, } GST_MULTI_QUEUE_MUTEX_LOCK (mq); + if (ppos) { + GST_DEBUG_OBJECT (mq, "queue %d, Applying base of %" GST_TIME_FORMAT, + sq->id, GST_TIME_ARGS (ppos)); + segment->base = ppos; + } + /* Make sure we have a valid initial segment position (and not garbage * from upstream) */ if (segment->rate > 0.0) segment->position = segment->start; else segment->position = segment->stop; + if (segment == &sq->sink_segment) sq->sink_tainted = TRUE; else { @@ -1982,10 +2021,21 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq, *allow_drop = FALSE; break; case GST_EVENT_STREAM_START: + { + guint32 group_id; + if (gst_event_parse_group_id (event, &group_id)) { + if (sq->src_stream_gid == GST_GROUP_ID_INVALID) { + sq->src_stream_gid = group_id; + } else if (group_id != sq->src_stream_gid) { + sq->src_stream_gid = group_id; + sq->src_stream_gid_changed = TRUE; + } + } result = GST_FLOW_OK; if (G_UNLIKELY (*allow_drop)) *allow_drop = FALSE; break; + } case GST_EVENT_SEGMENT: apply_segment (mq, sq, event, &sq->src_segment); /* Applying the segment may have made the queue non-full again, unblock it if needed */ @@ -2607,6 +2657,15 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) switch (type) { case GST_EVENT_STREAM_START: { + guint32 group_id; + if (gst_event_parse_group_id (event, &group_id)) { + if (sq->sink_stream_gid == GST_GROUP_ID_INVALID) { + sq->sink_stream_gid = group_id; + } else if (group_id != sq->sink_stream_gid) { + sq->sink_stream_gid = group_id; + sq->sink_stream_gid_changed = TRUE; + } + } if (mq->sync_by_running_time) { GstStreamFlags stream_flags; gst_event_parse_stream_flags (event, &stream_flags); @@ -3473,6 +3532,10 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id) sq->sink_tainted = TRUE; sq->src_tainted = TRUE; + sq->sink_stream_gid = sq->src_stream_gid = GST_GROUP_ID_INVALID; + sq->sink_stream_gid_changed = FALSE; + sq->src_stream_gid_changed = FALSE; + name = g_strdup_printf ("sink_%u", sq->id); templ = gst_static_pad_template_get (&sinktemplate); sinkpad = g_object_new (GST_TYPE_MULTIQUEUE_PAD, "name", name, -- 2.7.4