multiqueue: Optimize multiqueue sizes based on interleave
authorEdward Hervey <edward@centricular.com>
Mon, 26 Oct 2015 07:06:01 +0000 (08:06 +0100)
committerEdward Hervey <bilboed@bilboed.com>
Wed, 2 Dec 2015 15:03:20 +0000 (16:03 +0100)
Multiqueue should only be used to cope with:
* decoupling upstream and dowstream threading (i.e. having separate threads
  for elementary streams).
* Ensuring individual queues have enough space to cope with upstream interleave
  (distance in stream time between co-located samples). This is to guarantee
  that we have enough room in each individual queues to provide new data in
  each, without being blocked.
* Limit the queue sizes to that interleave distance (and an extra minimal
  buffering size). This is to ensure we don't consume too much memory.

Based on that, multiqueue now continuously calculates the input interleave
(per incoming streaming thread). Based on that, it calculates a target
interleave (currently 1.5 x real_interleave + 250ms padding).

If the target interleave is greater than the current max_size.time, it will
update it accordingly (to allow enough margin to not block).
If the target interleave goes down by more than 50%, we re-adjust it once
we know we have gone past a safe distance (2 x current max_size.time).

This mode can only be used for incoming streams that are guaranteed to be
properly timestamped.

Furthermore, we ignore sparse streams when calculating interleave and maximum
size of queues.

For the simplest of use-cases (single stream), multiqueue acts as a single
queue with a time limit of 250ms.
If there are multiple inputs, but each come from a different streaming thread,
the maximum time limit will also end up being 250ms.

On regular files (more than one input stream from the same upstream streaming
thread), it can reduce the total memory used as much as 10x, ending up with
max_size.time around 500ms.

Due to the adaptive nature, it can also cope with changing interleave (which
can happen commonly on some files at startup/pre-roll time)

plugins/elements/gstmultiqueue.c
plugins/elements/gstmultiqueue.h

index 642bba6..b32e3f0 100644 (file)
@@ -150,6 +150,8 @@ struct _GstSingleQueue
 
   /* position of src/sink */
   GstClockTime sinktime, srctime;
+  /* cached input value, used for interleave */
+  GstClockTimeDiff cached_sinktime;
   /* TRUE if either position needs to be recalculated */
   gboolean sink_tainted, src_tainted;
 
@@ -160,6 +162,7 @@ struct _GstSingleQueue
   gboolean is_eos;
   gboolean is_sparse;
   gboolean flushing;
+  gboolean active;
 
   /* Protected by global lock */
   guint32 nextid;               /* ID of the next object waiting to be pushed */
@@ -173,6 +176,9 @@ struct _GstSingleQueue
   GCond query_handled;
   gboolean last_query;
   GstQuery *last_handled_query;
+
+  /* For interleave calculation */
+  GThread *thread;
 };
 
 
@@ -249,6 +255,7 @@ enum
 #define DEFAULT_LOW_PERCENT   10
 #define DEFAULT_HIGH_PERCENT  99
 #define DEFAULT_SYNC_BY_RUNNING_TIME FALSE
+#define DEFAULT_USE_INTERLEAVE FALSE
 
 enum
 {
@@ -263,6 +270,7 @@ enum
   PROP_LOW_PERCENT,
   PROP_HIGH_PERCENT,
   PROP_SYNC_BY_RUNNING_TIME,
+  PROP_USE_INTERLEAVE,
   PROP_LAST
 };
 
@@ -429,6 +437,11 @@ gst_multi_queue_class_init (GstMultiQueueClass * klass)
           DEFAULT_SYNC_BY_RUNNING_TIME,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+  g_object_class_install_property (gobject_class, PROP_USE_INTERLEAVE,
+      g_param_spec_boolean ("use-interleave", "Use interleave",
+          "Adjust time limits based on input interleave",
+          DEFAULT_USE_INTERLEAVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   gobject_class->finalize = gst_multi_queue_finalize;
 
   gst_element_class_set_static_metadata (gstelement_class,
@@ -466,6 +479,7 @@ gst_multi_queue_init (GstMultiQueue * mqueue)
   mqueue->high_percent = DEFAULT_HIGH_PERCENT;
 
   mqueue->sync_by_running_time = DEFAULT_SYNC_BY_RUNNING_TIME;
+  mqueue->use_interleave = DEFAULT_USE_INTERLEAVE;
 
   mqueue->counter = 1;
   mqueue->highid = -1;
@@ -607,6 +621,9 @@ gst_multi_queue_set_property (GObject * object, guint prop_id,
     case PROP_SYNC_BY_RUNNING_TIME:
       mq->sync_by_running_time = g_value_get_boolean (value);
       break;
+    case PROP_USE_INTERLEAVE:
+      mq->use_interleave = g_value_get_boolean (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -652,6 +669,9 @@ gst_multi_queue_get_property (GObject * object, guint prop_id,
     case PROP_SYNC_BY_RUNNING_TIME:
       g_value_set_boolean (value, mq->sync_by_running_time);
       break;
+    case PROP_USE_INTERLEAVE:
+      g_value_set_boolean (value, mq->use_interleave);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -868,6 +888,7 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush,
     sq->last_oldid = G_MAXUINT32;
     sq->next_time = GST_CLOCK_TIME_NONE;
     sq->last_time = GST_CLOCK_TIME_NONE;
+    sq->cached_sinktime = GST_CLOCK_TIME_NONE;
     gst_data_queue_set_flushing (sq->queue, FALSE);
 
     /* Reset high time to be recomputed next */
@@ -985,6 +1006,79 @@ gst_multi_queue_post_buffering (GstMultiQueue * mq)
   g_mutex_unlock (&mq->buffering_post_lock);
 }
 
+static void
+calculate_interleave (GstMultiQueue * mq)
+{
+  GstClockTimeDiff low, high;
+  GstClockTime interleave;
+  GList *tmp;
+
+  low = high = GST_CLOCK_STIME_NONE;
+  interleave = mq->interleave;
+  /* Go over all single queues and calculate lowest/highest value */
+  for (tmp = mq->queues; tmp; tmp = tmp->next) {
+    GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
+    /* Ignore sparse streams for interleave calculation */
+    if (sq->is_sparse)
+      continue;
+    /* If a stream is not active yet (hasn't received any buffers), set
+     * a maximum interleave to allow it to receive more data */
+    if (!sq->active) {
+      GST_LOG_OBJECT (mq,
+          "queue %d is not active yet, forcing interleave to 5s", sq->id);
+      mq->interleave = 5 * GST_SECOND;
+      /* Update max-size time */
+      mq->max_size.time = mq->interleave;
+      SET_CHILD_PROPERTY (mq, time);
+      goto beach;
+    }
+    if (GST_CLOCK_STIME_IS_VALID (sq->cached_sinktime)) {
+      if (low == GST_CLOCK_STIME_NONE || sq->cached_sinktime < low)
+        low = sq->cached_sinktime;
+      if (high == GST_CLOCK_STIME_NONE || sq->cached_sinktime > high)
+        high = sq->cached_sinktime;
+    }
+    GST_LOG_OBJECT (mq,
+        "queue %d , sinktime:%" GST_STIME_FORMAT " low:%" GST_STIME_FORMAT
+        " high:%" GST_STIME_FORMAT, sq->id,
+        GST_STIME_ARGS (sq->cached_sinktime), GST_STIME_ARGS (low),
+        GST_STIME_ARGS (high));
+  }
+
+  if (GST_CLOCK_STIME_IS_VALID (low) && GST_CLOCK_STIME_IS_VALID (high)) {
+    interleave = high - low;
+    /* Padding of interleave and minimum value */
+    /* FIXME : Make the minimum time interleave a property */
+    interleave = (150 * interleave / 100) + 250 * GST_MSECOND;
+
+    /* Update the stored interleave if:
+     * * No data has arrived yet (high == low)
+     * * Or it went higher
+     * * Or it went lower and we've gone past the previous interleave needed */
+    if (high == low || interleave > mq->interleave ||
+        ((mq->last_interleave_update + (2 * MIN (GST_SECOND,
+                        mq->interleave)) < low)
+            && interleave < (mq->interleave * 3 / 4))) {
+      /* Update the interleave */
+      mq->interleave = interleave;
+      mq->last_interleave_update = high;
+      /* Update max-size time */
+      mq->max_size.time = mq->interleave;
+      SET_CHILD_PROPERTY (mq, time);
+    }
+  }
+
+beach:
+  GST_DEBUG_OBJECT (mq,
+      "low:%" GST_STIME_FORMAT " high:%" GST_STIME_FORMAT " interleave:%"
+      GST_TIME_FORMAT " mq->interleave:%" GST_TIME_FORMAT
+      " last_interleave_update:%" GST_TIME_FORMAT, GST_TIME_ARGS (low),
+      GST_TIME_ARGS (high), GST_TIME_ARGS (interleave),
+      GST_TIME_ARGS (mq->interleave),
+      GST_TIME_ARGS (mq->last_interleave_update));
+}
+
+
 /* calculate the diff between running time on the sink and src of the queue.
  * This is the total amount of time in the queue. 
  * WITH LOCK TAKEN */
@@ -1013,6 +1107,11 @@ update_time_level (GstMultiQueue * mq, GstSingleQueue * sq)
     if (G_UNLIKELY (sink_time != GST_CLOCK_TIME_NONE)) {
       /* if we have a time, we become untainted and use the time */
       sq->sink_tainted = FALSE;
+      if (mq->use_interleave) {
+        sq->cached_sinktime = sink_time;
+        calculate_interleave (mq);
+      }
+    }
   } else
     sink_time = sq->sinktime;
 
@@ -1039,8 +1138,9 @@ update_time_level (GstMultiQueue * mq, GstSingleQueue * sq)
     src_time = sq->srctime =
         gst_segment_to_running_time (segment, GST_FORMAT_TIME, position);
     /* if we have a time, we become untainted and use the time */
-    if (G_UNLIKELY (src_time != GST_CLOCK_TIME_NONE))
+    if (G_UNLIKELY (src_time != GST_CLOCK_TIME_NONE)) {
       sq->src_tainted = FALSE;
+    }
   } else
     src_time = sq->srctime;
 
@@ -1114,8 +1214,9 @@ apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp,
   if (duration != GST_CLOCK_TIME_NONE)
     timestamp += duration;
 
-  GST_DEBUG_OBJECT (mq, "queue %d, position updated to %" GST_TIME_FORMAT,
-      sq->id, GST_TIME_ARGS (timestamp));
+  GST_DEBUG_OBJECT (mq, "queue %d, %s position updated to %" GST_TIME_FORMAT,
+      sq->id, segment == &sq->sink_segment ? "sink" : "src",
+      GST_TIME_ARGS (timestamp));
 
   segment->position = timestamp;
 
@@ -1681,16 +1782,44 @@ gst_multi_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   if (sq->is_eos)
     goto was_eos;
 
+  sq->active = TRUE;
+
   /* Get a unique incrementing id */
   curid = g_atomic_int_add ((gint *) & mq->counter, 1);
 
-  GST_LOG_OBJECT (mq, "SingleQueue %d : about to enqueue buffer %p with id %d",
-      sq->id, buffer, curid);
+  timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
+  duration = GST_BUFFER_DURATION (buffer);
+
+  GST_LOG_OBJECT (mq,
+      "SingleQueue %d : about to enqueue buffer %p with id %d (pts:%"
+      GST_TIME_FORMAT " dts:%" GST_TIME_FORMAT " dur:%" GST_TIME_FORMAT ")",
+      sq->id, buffer, curid, GST_TIME_ARGS (GST_BUFFER_PTS (buffer)),
+      GST_TIME_ARGS (GST_BUFFER_DTS (buffer)), GST_TIME_ARGS (duration));
 
   item = gst_multi_queue_buffer_item_new (GST_MINI_OBJECT_CAST (buffer), curid);
 
-  timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
-  duration = GST_BUFFER_DURATION (buffer);
+  /* Update interleave before pushing data into queue */
+  if (mq->use_interleave) {
+    GstClockTime val = timestamp;
+    GstClockTimeDiff dval;
+
+    GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+    if (val == GST_CLOCK_TIME_NONE)
+      val = sq->sink_segment.position;
+    if (duration != GST_CLOCK_TIME_NONE)
+      val += duration;
+
+    dval = my_segment_to_running_time (&sq->sink_segment, val);
+    if (GST_CLOCK_STIME_IS_VALID (dval)) {
+      sq->cached_sinktime = dval;
+      GST_DEBUG_OBJECT (mq,
+          "Queue %d cached sink time now %" G_GINT64_FORMAT " %"
+          GST_STIME_FORMAT, sq->id, sq->cached_sinktime,
+          GST_STIME_ARGS (sq->cached_sinktime));
+      calculate_interleave (mq);
+    }
+    GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+  }
 
   if (!(gst_data_queue_push (sq->queue, (GstDataQueueItem *) item)))
     goto flushing;
@@ -1804,6 +1933,8 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
         sq->thread = g_thread_self ();
       }
 
+      sq->thread = g_thread_self ();
+
       /* Remove EOS flag */
       sq->is_eos = FALSE;
     }
@@ -1827,10 +1958,29 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       goto done;
 
     case GST_EVENT_SEGMENT:
+      sref = gst_event_ref (event);
+      break;
     case GST_EVENT_GAP:
       /* take ref because the queue will take ownership and we need the event
        * afterwards to update the segment */
       sref = gst_event_ref (event);
+      if (mq->use_interleave) {
+        GstClockTime val, dur;
+        gst_event_parse_gap (event, &val, &dur);
+        if (GST_CLOCK_TIME_IS_VALID (val)) {
+          GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+          if (GST_CLOCK_TIME_IS_VALID (dur))
+            val += dur;
+          val =
+              gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME,
+              val);
+          if (GST_CLOCK_TIME_IS_VALID (val)) {
+            sq->cached_sinktime = val;
+            calculate_interleave (mq);
+          }
+          GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+        }
+      }
       break;
 
     default:
@@ -1896,6 +2046,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
       break;
     case GST_EVENT_GAP:
+      sq->active = TRUE;
       apply_gap (mq, sq, sref, &sq->sink_segment);
       gst_event_unref (sref);
     default:
@@ -2455,6 +2606,7 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id)
   sq->is_eos = FALSE;
   sq->is_sparse = FALSE;
   sq->flushing = FALSE;
+  sq->active = FALSE;
   gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
   gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
 
index d63eda5..f09fd58 100644 (file)
@@ -51,6 +51,7 @@ struct _GstMultiQueue {
   GstElement element;
 
   gboolean sync_by_running_time;
+  gboolean use_interleave;
 
   /* number of queues */
   guint        nbqueues;
@@ -77,6 +78,9 @@ struct _GstMultiQueue {
 
   gboolean percent_changed;
   GMutex buffering_post_lock; /* assures only one posted at a time */
+
+  GstClockTime interleave;     /* Input interleave */
+  GstClockTime last_interleave_update;
 };
 
 struct _GstMultiQueueClass {