compositor: perform conversions in parallel
authorMatthew Waters <matthew@centricular.com>
Fri, 2 Apr 2021 05:43:02 +0000 (16:43 +1100)
committerMatthew Waters <matthew@centricular.com>
Mon, 17 May 2021 09:20:57 +0000 (19:20 +1000)
Improves throughput of the total convert and blend process and allows
for higher performance across slightly more threads.

Also make use of video aggregator's task pool for blending as well in
order to reduce the number of threads.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/merge_requests/1129>

docs/plugins/gst_plugins_cache.json
gst/compositor/compositor.c
gst/compositor/compositor.h

index eef719f711b912b14d3e91b1b39984c60d30787b..10205dc248ab3a7e746ad96b55b1b2596583f9e3 100644 (file)
             "GstCompositorPad": {
                 "hierarchy": [
                     "GstCompositorPad",
+                    "GstVideoAggregatorParallelConvertPad",
                     "GstVideoAggregatorConvertPad",
                     "GstVideoAggregatorPad",
                     "GstAggregatorPad",
index 38eba7616d012904d9c0a0e878c9504fce31762d..8a891f1fc192c78504176f208dd21e7bbdca7a7b 100644 (file)
@@ -183,7 +183,7 @@ enum
 };
 
 G_DEFINE_TYPE (GstCompositorPad, gst_compositor_pad,
-    GST_TYPE_VIDEO_AGGREGATOR_CONVERT_PAD);
+    GST_TYPE_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD);
 
 static void
 gst_compositor_pad_get_property (GObject * object, guint prop_id,
@@ -386,8 +386,8 @@ _pad_obscures_rectangle (GstVideoAggregator * vagg, GstVideoAggregatorPad * pad,
   return TRUE;
 }
 
-static gboolean
-gst_compositor_pad_prepare_frame (GstVideoAggregatorPad * pad,
+static void
+gst_compositor_pad_prepare_frame_start (GstVideoAggregatorPad * pad,
     GstVideoAggregator * vagg, GstBuffer * buffer,
     GstVideoFrame * prepared_frame)
 {
@@ -417,7 +417,7 @@ gst_compositor_pad_prepare_frame (GstVideoAggregatorPad * pad,
 
   if (cpad->alpha == 0.0) {
     GST_DEBUG_OBJECT (pad, "Pad has alpha 0.0, not converting frame");
-    goto done;
+    return;
   }
 
   frame_rect = clamp_rectangle (cpad->xpos, cpad->ypos, width, height,
@@ -426,7 +426,7 @@ gst_compositor_pad_prepare_frame (GstVideoAggregatorPad * pad,
   if (frame_rect.w == 0 || frame_rect.h == 0) {
     GST_DEBUG_OBJECT (pad, "Resulting frame is zero-width or zero-height "
         "(w: %i, h: %i), skipping", frame_rect.w, frame_rect.h);
-    goto done;
+    return;
   }
 
   GST_OBJECT_LOCK (vagg);
@@ -446,16 +446,11 @@ gst_compositor_pad_prepare_frame (GstVideoAggregatorPad * pad,
   GST_OBJECT_UNLOCK (vagg);
 
   if (frame_obscured)
-    goto done;
+    return;
 
-  return
-      GST_VIDEO_AGGREGATOR_PAD_CLASS
-      (gst_compositor_pad_parent_class)->prepare_frame (pad, vagg, buffer,
+  GST_VIDEO_AGGREGATOR_PAD_CLASS
+      (gst_compositor_pad_parent_class)->prepare_frame_start (pad, vagg, buffer,
       prepared_frame);
-
-done:
-
-  return TRUE;
 }
 
 static void
@@ -539,8 +534,8 @@ gst_compositor_pad_class_init (GstCompositorPadClass * klass)
           GST_TYPE_COMPOSITOR_OPERATOR, DEFAULT_PAD_OPERATOR,
           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
 
-  vaggpadclass->prepare_frame =
-      GST_DEBUG_FUNCPTR (gst_compositor_pad_prepare_frame);
+  vaggpadclass->prepare_frame_start =
+      GST_DEBUG_FUNCPTR (gst_compositor_pad_prepare_frame_start);
 
   vaggcpadclass->create_conversion_info =
       GST_DEBUG_FUNCPTR (gst_compositor_pad_create_conversion_info);
@@ -861,123 +856,105 @@ _fixate_caps (GstAggregator * agg, GstCaps * caps)
   return ret;
 }
 
-static gpointer
+static void
 gst_parallelized_task_thread_func (gpointer data)
 {
-  GstParallelizedTaskThread *self = data;
-
-  g_mutex_lock (&self->runner->lock);
-  self->runner->n_done++;
-  if (self->runner->n_done == self->runner->n_threads - 1)
-    g_cond_signal (&self->runner->cond_done);
-
-  do {
-    gint idx;
-
-    while (self->runner->n_todo == -1 && !self->runner->quit)
-      g_cond_wait (&self->runner->cond_todo, &self->runner->lock);
-
-    if (self->runner->quit)
-      break;
+  GstParallelizedTaskRunner *runner = data;
+  gint idx;
 
-    idx = self->runner->n_todo--;
-    g_assert (self->runner->n_todo >= -1);
-    g_mutex_unlock (&self->runner->lock);
+  g_mutex_lock (&runner->lock);
+  idx = runner->n_todo--;
+  g_assert (runner->n_todo >= -1);
+  g_mutex_unlock (&runner->lock);
 
-    g_assert (self->runner->func != NULL);
+  g_assert (runner->func != NULL);
 
-    self->runner->func (self->runner->task_data[idx]);
-
-    g_mutex_lock (&self->runner->lock);
-    self->runner->n_done++;
-    if (self->runner->n_done == self->runner->n_threads - 1)
-      g_cond_signal (&self->runner->cond_done);
-  } while (TRUE);
+  runner->func (runner->task_data[idx]);
+}
 
-  g_mutex_unlock (&self->runner->lock);
+static void
+gst_parallelized_task_runner_join (GstParallelizedTaskRunner * self)
+{
+  gboolean joined = FALSE;
 
-  return NULL;
+  while (!joined) {
+    g_mutex_lock (&self->lock);
+    if (!(joined = gst_queue_array_is_empty (self->tasks))) {
+      gpointer task = gst_queue_array_pop_head (self->tasks);
+      g_mutex_unlock (&self->lock);
+      gst_task_pool_join (self->pool, task);
+    } else {
+      g_mutex_unlock (&self->lock);
+    }
+  }
 }
 
 static void
 gst_parallelized_task_runner_free (GstParallelizedTaskRunner * self)
 {
-  guint i;
-
-  g_mutex_lock (&self->lock);
-  self->quit = TRUE;
-  g_cond_broadcast (&self->cond_todo);
-  g_mutex_unlock (&self->lock);
-
-  for (i = 1; i < self->n_threads; i++) {
-    if (!self->threads[i].thread)
-      continue;
-
-    g_thread_join (self->threads[i].thread);
-  }
+  gst_parallelized_task_runner_join (self);
 
+  gst_queue_array_free (self->tasks);
+  if (self->own_pool)
+    gst_task_pool_cleanup (self->pool);
+  gst_object_unref (self->pool);
   g_mutex_clear (&self->lock);
-  g_cond_clear (&self->cond_todo);
-  g_cond_clear (&self->cond_done);
-  g_free (self->threads);
   g_free (self);
 }
 
 static GstParallelizedTaskRunner *
-gst_parallelized_task_runner_new (guint n_threads)
+gst_parallelized_task_runner_new (guint n_threads, GstTaskPool * pool,
+    gboolean async_tasks)
 {
   GstParallelizedTaskRunner *self;
-  guint i;
-  GError *err = NULL;
 
   if (n_threads == 0)
     n_threads = g_get_num_processors ();
 
   self = g_new0 (GstParallelizedTaskRunner, 1);
+
+  if (pool) {
+    self->pool = g_object_ref (pool);
+    self->own_pool = FALSE;
+
+    /* No reason to split up the work between more threads than the
+     * pool can spawn */
+    if (GST_IS_SHARED_TASK_POOL (pool))
+      n_threads =
+          MIN (n_threads,
+          gst_shared_task_pool_get_max_threads (GST_SHARED_TASK_POOL (pool)));
+  } else {
+    self->pool = gst_shared_task_pool_new ();
+    self->own_pool = TRUE;
+    gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (self->pool),
+        n_threads);
+    gst_task_pool_prepare (self->pool, NULL);
+  }
+
+  self->tasks = gst_queue_array_new (n_threads);
+
   self->n_threads = n_threads;
-  self->threads = g_new0 (GstParallelizedTaskThread, n_threads);
 
-  self->quit = FALSE;
   self->n_todo = -1;
-  self->n_done = 0;
   g_mutex_init (&self->lock);
-  g_cond_init (&self->cond_todo);
-  g_cond_init (&self->cond_done);
 
   /* Set when scheduling a job */
   self->func = NULL;
   self->task_data = NULL;
-
-  for (i = 0; i < n_threads; i++) {
-    self->threads[i].runner = self;
-    self->threads[i].idx = i;
-
-    /* First thread is the one calling run() */
-    if (i > 0) {
-      self->threads[i].thread =
-          g_thread_try_new ("compositor-blend",
-          gst_parallelized_task_thread_func, &self->threads[i], &err);
-      if (!self->threads[i].thread)
-        goto error;
-    }
-  }
-
-  g_mutex_lock (&self->lock);
-  while (self->n_done < self->n_threads - 1)
-    g_cond_wait (&self->cond_done, &self->lock);
-  self->n_done = 0;
-  g_mutex_unlock (&self->lock);
+  self->async_tasks = async_tasks;
 
   return self;
+}
 
-error:
-  {
-    GST_ERROR ("Failed to start thread %u: %s", i, err->message);
-    g_clear_error (&err);
+static void
+gst_parallelized_task_runner_finish (GstParallelizedTaskRunner * self)
+{
+  g_return_if_fail (self->func != NULL);
 
-    gst_parallelized_task_runner_free (self);
-    return NULL;
-  }
+  gst_parallelized_task_runner_join (self);
+
+  self->func = NULL;
+  self->task_data = NULL;
 }
 
 static void
@@ -989,32 +966,43 @@ gst_parallelized_task_runner_run (GstParallelizedTaskRunner * self,
   self->func = func;
   self->task_data = task_data;
 
-  if (n_threads > 1) {
+  if (n_threads > 1 || self->async_tasks) {
+    guint i = 0;
     g_mutex_lock (&self->lock);
-    self->n_todo = self->n_threads - 2;
-    self->n_done = 0;
-    g_cond_broadcast (&self->cond_todo);
+    self->n_todo = self->n_threads - 1;
+    if (!self->async_tasks) {
+      /* if not async, perform one of the functions in the current thread */
+      self->n_todo--;
+      i = 1;
+    }
+    for (; i < n_threads; i++) {
+      gpointer task =
+          gst_task_pool_push (self->pool, gst_parallelized_task_thread_func,
+          self, NULL);
+
+      /* The return value of push() is nullable but NULL is only returned
+       * with the shared task pool when gst_task_pool_prepare() has not been
+       * called and would thus be a programming error that we should hard-fail
+       * on.
+       */
+      g_assert (task != NULL);
+      gst_queue_array_push_tail (self->tasks, task);
+    }
     g_mutex_unlock (&self->lock);
   }
 
-  self->func (self->task_data[self->n_threads - 1]);
+  if (!self->async_tasks) {
+    self->func (self->task_data[self->n_threads - 1]);
 
-  if (n_threads > 1) {
-    g_mutex_lock (&self->lock);
-    while (self->n_done < self->n_threads - 1)
-      g_cond_wait (&self->cond_done, &self->lock);
-    self->n_done = 0;
-    g_mutex_unlock (&self->lock);
+    gst_parallelized_task_runner_finish (self);
   }
-
-  self->func = NULL;
-  self->task_data = NULL;
 }
 
 static gboolean
 _negotiated_caps (GstAggregator * agg, GstCaps * caps)
 {
   GstCompositor *compositor = GST_COMPOSITOR (agg);
+  GstVideoAggregator *vagg = GST_VIDEO_AGGREGATOR (agg);
   GstVideoInfo v_info;
   guint n_threads;
 
@@ -1041,8 +1029,12 @@ _negotiated_caps (GstAggregator * agg, GstCaps * caps)
     gst_parallelized_task_runner_free (compositor->blend_runner);
     compositor->blend_runner = NULL;
   }
-  if (!compositor->blend_runner)
-    compositor->blend_runner = gst_parallelized_task_runner_new (n_threads);
+  if (!compositor->blend_runner) {
+    GstTaskPool *pool = gst_video_aggregator_get_execution_task_pool (vagg);
+    compositor->blend_runner =
+        gst_parallelized_task_runner_new (n_threads, pool, FALSE);
+    gst_clear_object (&pool);
+  }
 
   return GST_AGGREGATOR_CLASS (parent_class)->negotiated_src_caps (agg, caps);
 }
index 4067fb1db274fc56205186cd8a4721442ae9f3a1..38e069315daea7487793c99a61ff4e7eb90d6701 100644 (file)
@@ -24,6 +24,7 @@
 #include <gst/gst.h>
 #include <gst/video/video.h>
 #include <gst/video/gstvideoaggregator.h>
+#include <gst/base/base.h>
 
 #include "blend.h"
 
@@ -35,7 +36,7 @@ G_DECLARE_FINAL_TYPE (GstCompositor, gst_compositor, GST, COMPOSITOR,
 
 #define GST_TYPE_COMPOSITOR_PAD (gst_compositor_pad_get_type())
 G_DECLARE_FINAL_TYPE (GstCompositorPad, gst_compositor_pad, GST, COMPOSITOR_PAD,
-    GstVideoAggregatorConvertPad)
+    GstVideoAggregatorParallelConvertPad)
 
 /**
  * GstCompositorBackground:
@@ -80,28 +81,22 @@ typedef enum
 typedef void (*GstParallelizedTaskFunc) (gpointer user_data);
 
 typedef struct _GstParallelizedTaskRunner GstParallelizedTaskRunner;
-typedef struct _GstParallelizedTaskThread GstParallelizedTaskThread;
-
-struct _GstParallelizedTaskThread
-{
-  GstParallelizedTaskRunner *runner;
-  guint idx;
-  GThread *thread;
-};
 
 struct _GstParallelizedTaskRunner
 {
+  GstTaskPool *pool;
+  gboolean own_pool;
   guint n_threads;
 
-  GstParallelizedTaskThread *threads;
+  GstQueueArray *tasks;
 
   GstParallelizedTaskFunc func;
   gpointer *task_data;
 
   GMutex lock;
-  GCond cond_todo, cond_done;
-  gint n_todo, n_done;
-  gboolean quit;
+  gint n_todo;
+
+  gboolean async_tasks;
 };
 
 /**
@@ -139,7 +134,7 @@ struct _GstCompositor
  */
 struct _GstCompositorPad
 {
-  GstVideoAggregatorConvertPad parent;
+  GstVideoAggregatorParallelConvertPad parent;
 
   /* properties */
   gint xpos, ypos;