From 72c31876aa979992beadf783ed59e26d8af33428 Mon Sep 17 00:00:00 2001 From: Edward Hervey Date: Mon, 26 Oct 2015 08:06:01 +0100 Subject: [PATCH] multiqueue: Optimize multiqueue sizes based on interleave Multiqueue should only be used to cope with: * decoupling upstream and dowstream threading (i.e. having separate threads for elementary streams). * Ensuring individual queues have enough space to cope with upstream interleave (distance in stream time between co-located samples). This is to guarantee that we have enough room in each individual queues to provide new data in each, without being blocked. * Limit the queue sizes to that interleave distance (and an extra minimal buffering size). This is to ensure we don't consume too much memory. Based on that, multiqueue now continuously calculates the input interleave (per incoming streaming thread). Based on that, it calculates a target interleave (currently 1.5 x real_interleave + 250ms padding). If the target interleave is greater than the current max_size.time, it will update it accordingly (to allow enough margin to not block). If the target interleave goes down by more than 50%, we re-adjust it once we know we have gone past a safe distance (2 x current max_size.time). This mode can only be used for incoming streams that are guaranteed to be properly timestamped. Furthermore, we ignore sparse streams when calculating interleave and maximum size of queues. For the simplest of use-cases (single stream), multiqueue acts as a single queue with a time limit of 250ms. If there are multiple inputs, but each come from a different streaming thread, the maximum time limit will also end up being 250ms. On regular files (more than one input stream from the same upstream streaming thread), it can reduce the total memory used as much as 10x, ending up with max_size.time around 500ms. Due to the adaptive nature, it can also cope with changing interleave (which can happen commonly on some files at startup/pre-roll time) --- plugins/elements/gstmultiqueue.c | 166 +++++++++++++++++++++++++++++++++++++-- plugins/elements/gstmultiqueue.h | 4 + 2 files changed, 163 insertions(+), 7 deletions(-) diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c index 642bba6..b32e3f0 100644 --- a/plugins/elements/gstmultiqueue.c +++ b/plugins/elements/gstmultiqueue.c @@ -150,6 +150,8 @@ struct _GstSingleQueue /* position of src/sink */ GstClockTime sinktime, srctime; + /* cached input value, used for interleave */ + GstClockTimeDiff cached_sinktime; /* TRUE if either position needs to be recalculated */ gboolean sink_tainted, src_tainted; @@ -160,6 +162,7 @@ struct _GstSingleQueue gboolean is_eos; gboolean is_sparse; gboolean flushing; + gboolean active; /* Protected by global lock */ guint32 nextid; /* ID of the next object waiting to be pushed */ @@ -173,6 +176,9 @@ struct _GstSingleQueue GCond query_handled; gboolean last_query; GstQuery *last_handled_query; + + /* For interleave calculation */ + GThread *thread; }; @@ -249,6 +255,7 @@ enum #define DEFAULT_LOW_PERCENT 10 #define DEFAULT_HIGH_PERCENT 99 #define DEFAULT_SYNC_BY_RUNNING_TIME FALSE +#define DEFAULT_USE_INTERLEAVE FALSE enum { @@ -263,6 +270,7 @@ enum PROP_LOW_PERCENT, PROP_HIGH_PERCENT, PROP_SYNC_BY_RUNNING_TIME, + PROP_USE_INTERLEAVE, PROP_LAST }; @@ -429,6 +437,11 @@ gst_multi_queue_class_init (GstMultiQueueClass * klass) DEFAULT_SYNC_BY_RUNNING_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_USE_INTERLEAVE, + g_param_spec_boolean ("use-interleave", "Use interleave", + "Adjust time limits based on input interleave", + DEFAULT_USE_INTERLEAVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + gobject_class->finalize = gst_multi_queue_finalize; gst_element_class_set_static_metadata (gstelement_class, @@ -466,6 +479,7 @@ gst_multi_queue_init (GstMultiQueue * mqueue) mqueue->high_percent = DEFAULT_HIGH_PERCENT; mqueue->sync_by_running_time = DEFAULT_SYNC_BY_RUNNING_TIME; + mqueue->use_interleave = DEFAULT_USE_INTERLEAVE; mqueue->counter = 1; mqueue->highid = -1; @@ -607,6 +621,9 @@ gst_multi_queue_set_property (GObject * object, guint prop_id, case PROP_SYNC_BY_RUNNING_TIME: mq->sync_by_running_time = g_value_get_boolean (value); break; + case PROP_USE_INTERLEAVE: + mq->use_interleave = g_value_get_boolean (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -652,6 +669,9 @@ gst_multi_queue_get_property (GObject * object, guint prop_id, case PROP_SYNC_BY_RUNNING_TIME: g_value_set_boolean (value, mq->sync_by_running_time); break; + case PROP_USE_INTERLEAVE: + g_value_set_boolean (value, mq->use_interleave); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -868,6 +888,7 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush, sq->last_oldid = G_MAXUINT32; sq->next_time = GST_CLOCK_TIME_NONE; sq->last_time = GST_CLOCK_TIME_NONE; + sq->cached_sinktime = GST_CLOCK_TIME_NONE; gst_data_queue_set_flushing (sq->queue, FALSE); /* Reset high time to be recomputed next */ @@ -985,6 +1006,79 @@ gst_multi_queue_post_buffering (GstMultiQueue * mq) g_mutex_unlock (&mq->buffering_post_lock); } +static void +calculate_interleave (GstMultiQueue * mq) +{ + GstClockTimeDiff low, high; + GstClockTime interleave; + GList *tmp; + + low = high = GST_CLOCK_STIME_NONE; + interleave = mq->interleave; + /* Go over all single queues and calculate lowest/highest value */ + for (tmp = mq->queues; tmp; tmp = tmp->next) { + GstSingleQueue *sq = (GstSingleQueue *) tmp->data; + /* Ignore sparse streams for interleave calculation */ + if (sq->is_sparse) + continue; + /* If a stream is not active yet (hasn't received any buffers), set + * a maximum interleave to allow it to receive more data */ + if (!sq->active) { + GST_LOG_OBJECT (mq, + "queue %d is not active yet, forcing interleave to 5s", sq->id); + mq->interleave = 5 * GST_SECOND; + /* Update max-size time */ + mq->max_size.time = mq->interleave; + SET_CHILD_PROPERTY (mq, time); + goto beach; + } + if (GST_CLOCK_STIME_IS_VALID (sq->cached_sinktime)) { + if (low == GST_CLOCK_STIME_NONE || sq->cached_sinktime < low) + low = sq->cached_sinktime; + if (high == GST_CLOCK_STIME_NONE || sq->cached_sinktime > high) + high = sq->cached_sinktime; + } + GST_LOG_OBJECT (mq, + "queue %d , sinktime:%" GST_STIME_FORMAT " low:%" GST_STIME_FORMAT + " high:%" GST_STIME_FORMAT, sq->id, + GST_STIME_ARGS (sq->cached_sinktime), GST_STIME_ARGS (low), + GST_STIME_ARGS (high)); + } + + if (GST_CLOCK_STIME_IS_VALID (low) && GST_CLOCK_STIME_IS_VALID (high)) { + interleave = high - low; + /* Padding of interleave and minimum value */ + /* FIXME : Make the minimum time interleave a property */ + interleave = (150 * interleave / 100) + 250 * GST_MSECOND; + + /* Update the stored interleave if: + * * No data has arrived yet (high == low) + * * Or it went higher + * * Or it went lower and we've gone past the previous interleave needed */ + if (high == low || interleave > mq->interleave || + ((mq->last_interleave_update + (2 * MIN (GST_SECOND, + mq->interleave)) < low) + && interleave < (mq->interleave * 3 / 4))) { + /* Update the interleave */ + mq->interleave = interleave; + mq->last_interleave_update = high; + /* Update max-size time */ + mq->max_size.time = mq->interleave; + SET_CHILD_PROPERTY (mq, time); + } + } + +beach: + GST_DEBUG_OBJECT (mq, + "low:%" GST_STIME_FORMAT " high:%" GST_STIME_FORMAT " interleave:%" + GST_TIME_FORMAT " mq->interleave:%" GST_TIME_FORMAT + " last_interleave_update:%" GST_TIME_FORMAT, GST_TIME_ARGS (low), + GST_TIME_ARGS (high), GST_TIME_ARGS (interleave), + GST_TIME_ARGS (mq->interleave), + GST_TIME_ARGS (mq->last_interleave_update)); +} + + /* calculate the diff between running time on the sink and src of the queue. * This is the total amount of time in the queue. * WITH LOCK TAKEN */ @@ -1013,6 +1107,11 @@ update_time_level (GstMultiQueue * mq, GstSingleQueue * sq) if (G_UNLIKELY (sink_time != GST_CLOCK_TIME_NONE)) { /* if we have a time, we become untainted and use the time */ sq->sink_tainted = FALSE; + if (mq->use_interleave) { + sq->cached_sinktime = sink_time; + calculate_interleave (mq); + } + } } else sink_time = sq->sinktime; @@ -1039,8 +1138,9 @@ update_time_level (GstMultiQueue * mq, GstSingleQueue * sq) src_time = sq->srctime = gst_segment_to_running_time (segment, GST_FORMAT_TIME, position); /* if we have a time, we become untainted and use the time */ - if (G_UNLIKELY (src_time != GST_CLOCK_TIME_NONE)) + if (G_UNLIKELY (src_time != GST_CLOCK_TIME_NONE)) { sq->src_tainted = FALSE; + } } else src_time = sq->srctime; @@ -1114,8 +1214,9 @@ apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp, if (duration != GST_CLOCK_TIME_NONE) timestamp += duration; - GST_DEBUG_OBJECT (mq, "queue %d, position updated to %" GST_TIME_FORMAT, - sq->id, GST_TIME_ARGS (timestamp)); + GST_DEBUG_OBJECT (mq, "queue %d, %s position updated to %" GST_TIME_FORMAT, + sq->id, segment == &sq->sink_segment ? "sink" : "src", + GST_TIME_ARGS (timestamp)); segment->position = timestamp; @@ -1681,16 +1782,44 @@ gst_multi_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) if (sq->is_eos) goto was_eos; + sq->active = TRUE; + /* Get a unique incrementing id */ curid = g_atomic_int_add ((gint *) & mq->counter, 1); - GST_LOG_OBJECT (mq, "SingleQueue %d : about to enqueue buffer %p with id %d", - sq->id, buffer, curid); + timestamp = GST_BUFFER_DTS_OR_PTS (buffer); + duration = GST_BUFFER_DURATION (buffer); + + GST_LOG_OBJECT (mq, + "SingleQueue %d : about to enqueue buffer %p with id %d (pts:%" + GST_TIME_FORMAT " dts:%" GST_TIME_FORMAT " dur:%" GST_TIME_FORMAT ")", + sq->id, buffer, curid, GST_TIME_ARGS (GST_BUFFER_PTS (buffer)), + GST_TIME_ARGS (GST_BUFFER_DTS (buffer)), GST_TIME_ARGS (duration)); item = gst_multi_queue_buffer_item_new (GST_MINI_OBJECT_CAST (buffer), curid); - timestamp = GST_BUFFER_DTS_OR_PTS (buffer); - duration = GST_BUFFER_DURATION (buffer); + /* Update interleave before pushing data into queue */ + if (mq->use_interleave) { + GstClockTime val = timestamp; + GstClockTimeDiff dval; + + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + if (val == GST_CLOCK_TIME_NONE) + val = sq->sink_segment.position; + if (duration != GST_CLOCK_TIME_NONE) + val += duration; + + dval = my_segment_to_running_time (&sq->sink_segment, val); + if (GST_CLOCK_STIME_IS_VALID (dval)) { + sq->cached_sinktime = dval; + GST_DEBUG_OBJECT (mq, + "Queue %d cached sink time now %" G_GINT64_FORMAT " %" + GST_STIME_FORMAT, sq->id, sq->cached_sinktime, + GST_STIME_ARGS (sq->cached_sinktime)); + calculate_interleave (mq); + } + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + } if (!(gst_data_queue_push (sq->queue, (GstDataQueueItem *) item))) goto flushing; @@ -1804,6 +1933,8 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) sq->thread = g_thread_self (); } + sq->thread = g_thread_self (); + /* Remove EOS flag */ sq->is_eos = FALSE; } @@ -1827,10 +1958,29 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) goto done; case GST_EVENT_SEGMENT: + sref = gst_event_ref (event); + break; case GST_EVENT_GAP: /* take ref because the queue will take ownership and we need the event * afterwards to update the segment */ sref = gst_event_ref (event); + if (mq->use_interleave) { + GstClockTime val, dur; + gst_event_parse_gap (event, &val, &dur); + if (GST_CLOCK_TIME_IS_VALID (val)) { + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + if (GST_CLOCK_TIME_IS_VALID (dur)) + val += dur; + val = + gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME, + val); + if (GST_CLOCK_TIME_IS_VALID (val)) { + sq->cached_sinktime = val; + calculate_interleave (mq); + } + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + } + } break; default: @@ -1896,6 +2046,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); break; case GST_EVENT_GAP: + sq->active = TRUE; apply_gap (mq, sq, sref, &sq->sink_segment); gst_event_unref (sref); default: @@ -2455,6 +2606,7 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id) sq->is_eos = FALSE; sq->is_sparse = FALSE; sq->flushing = FALSE; + sq->active = FALSE; gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME); gst_segment_init (&sq->src_segment, GST_FORMAT_TIME); diff --git a/plugins/elements/gstmultiqueue.h b/plugins/elements/gstmultiqueue.h index d63eda5..f09fd58 100644 --- a/plugins/elements/gstmultiqueue.h +++ b/plugins/elements/gstmultiqueue.h @@ -51,6 +51,7 @@ struct _GstMultiQueue { GstElement element; gboolean sync_by_running_time; + gboolean use_interleave; /* number of queues */ guint nbqueues; @@ -77,6 +78,9 @@ struct _GstMultiQueue { gboolean percent_changed; GMutex buffering_post_lock; /* assures only one posted at a time */ + + GstClockTime interleave; /* Input interleave */ + GstClockTime last_interleave_update; }; struct _GstMultiQueueClass { -- 2.7.4