video-converter: switch to using a task pool ..
authorMathieu Duponchelle <mathieu@centricular.com>
Tue, 27 Oct 2020 23:01:16 +0000 (00:01 +0100)
committerGStreamer Merge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Thu, 12 Nov 2020 17:38:34 +0000 (17:38 +0000)
.. and make use of that API in videoaggregator.

When setting certain properties, such as cropping or the scaled
size of pads, a new converter is created by videoaggregator.

Before that patch, this implied spawning new threads, potentially
at each aggregate cycle when interpolating pad properties. This
is obviously wasteful, and re-using a task pool removes that
overhead.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/merge_requests/896>

gst-libs/gst/video/gstvideoaggregator.c
gst-libs/gst/video/video-converter.c
gst-libs/gst/video/video-converter.h
tests/check/libs/video.c

index 4e186e0..d0f0a1a 100644 (file)
@@ -415,6 +415,8 @@ struct _GstVideoAggregatorConvertPadPrivate
    * and as such are protected with the object lock */
   GstStructure *converter_config;
   gboolean converter_config_changed;
+
+  GstTaskPool *task_pool;
 };
 
 G_DEFINE_TYPE_WITH_PRIVATE (GstVideoAggregatorConvertPad,
@@ -433,6 +435,11 @@ gst_video_aggregator_convert_pad_finalize (GObject * o)
     gst_structure_free (vaggpad->priv->converter_config);
   vaggpad->priv->converter_config = NULL;
 
+  if (vaggpad->priv->task_pool)
+    gst_task_pool_cleanup (vaggpad->priv->task_pool);
+
+  gst_object_replace ((GstObject **) & vaggpad->priv->task_pool, NULL);
+
   G_OBJECT_CLASS (gst_video_aggregator_pad_parent_class)->finalize (o);
 }
 
@@ -447,6 +454,15 @@ static void
   GST_OBJECT_UNLOCK (pad);
 }
 
+static guint
+get_opt_uint (const GstStructure * config, const gchar * opt, guint def)
+{
+  guint res;
+  if (!gst_structure_get_uint (config, opt, &res))
+    res = def;
+  return res;
+}
+
 static gboolean
 gst_video_aggregator_convert_pad_prepare_frame (GstVideoAggregatorPad * vpad,
     GstVideoAggregator * vagg, GstBuffer * buffer,
@@ -475,10 +491,22 @@ gst_video_aggregator_convert_pad_prepare_frame (GstVideoAggregatorPad * vpad,
     pad->priv->convert = NULL;
 
     if (!gst_video_info_is_equal (&vpad->info, &pad->priv->conversion_info)) {
+      if (pad->priv->converter_config) {
+        guint n_threads = get_opt_uint (pad->priv->converter_config,
+            GST_VIDEO_CONVERTER_OPT_THREADS, 1);
+
+        if (n_threads == 0 || n_threads > g_get_num_processors ())
+          n_threads = g_get_num_processors ();
+
+        gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (pad->priv->
+                task_pool), n_threads);
+      }
+
       pad->priv->convert =
-          gst_video_converter_new (&vpad->info, &pad->priv->conversion_info,
-          pad->priv->converter_config ? gst_structure_copy (pad->priv->
-              converter_config) : NULL);
+          gst_video_converter_new_with_pool (&vpad->info,
+          &pad->priv->conversion_info,
+          pad->priv->converter_config ? gst_structure_copy (pad->
+              priv->converter_config) : NULL, pad->priv->task_pool);
       if (!pad->priv->convert) {
         GST_WARNING_OBJECT (pad, "No path found for conversion");
         return FALSE;
@@ -486,8 +514,8 @@ gst_video_aggregator_convert_pad_prepare_frame (GstVideoAggregatorPad * vpad,
 
       GST_DEBUG_OBJECT (pad, "This pad will be converted from %s to %s",
           gst_video_format_to_string (GST_VIDEO_INFO_FORMAT (&vpad->info)),
-          gst_video_format_to_string (GST_VIDEO_INFO_FORMAT (&pad->
-                  priv->conversion_info)));
+          gst_video_format_to_string (GST_VIDEO_INFO_FORMAT (&pad->priv->
+                  conversion_info)));
     } else {
       GST_DEBUG_OBJECT (pad, "This pad will not need conversion");
     }
@@ -689,8 +717,10 @@ gst_video_aggregator_convert_pad_init (GstVideoAggregatorConvertPad * vaggpad)
   vaggpad->priv->convert = NULL;
   vaggpad->priv->converter_config = NULL;
   vaggpad->priv->converter_config_changed = FALSE;
-}
+  vaggpad->priv->task_pool = gst_shared_task_pool_new ();
 
+  gst_task_pool_prepare (vaggpad->priv->task_pool, NULL);
+}
 
 /**
  * gst_video_aggregator_convert_pad_update_conversion_info:
index 7a8a6e2..3bfee92 100644 (file)
@@ -34,6 +34,7 @@
 #include <glib.h>
 #include <string.h>
 #include <math.h>
+#include <gst/base/base.h>
 
 #include "video-orc.h"
 
@@ -120,161 +121,107 @@ typedef void (*GstParallelizedTaskFunc) (gpointer user_data);
 typedef struct _GstParallelizedTaskRunner GstParallelizedTaskRunner;
 typedef struct _GstParallelizedTaskThread GstParallelizedTaskThread;
 
-struct _GstParallelizedTaskThread
-{
-  GstParallelizedTaskRunner *runner;
-  guint idx;
-  GThread *thread;
-};
-
 struct _GstParallelizedTaskRunner
 {
+  GstTaskPool *pool;
+  gboolean own_pool;
   guint n_threads;
 
-  GstParallelizedTaskThread *threads;
+  GstQueueArray *tasks;
 
   GstParallelizedTaskFunc func;
   gpointer *task_data;
 
   GMutex lock;
-  GCond cond_todo, cond_done;
-  gint n_todo, n_done;
-  gboolean quit;
+  gint n_todo;
 };
 
-static gpointer
+static void
 gst_parallelized_task_thread_func (gpointer data)
 {
-  GstParallelizedTaskThread *self = data;
-
-#if 0
-#ifdef HAVE_PTHREAD
-  {
-    pthread_t thread = pthread_self ();
-    cpu_set_t cpuset;
-    int r;
-
-    CPU_ZERO (&cpuset);
-    CPU_SET (self->idx, &cpuset);
-    if ((r = pthread_setaffinity_np (thread, sizeof (cpuset), &cpuset)) != 0)
-      GST_ERROR ("Failed to set thread affinity for thread %d: %s", self->idx,
-          g_strerror (r));
-  }
-#endif
-#endif
-
-  g_mutex_lock (&self->runner->lock);
-  self->runner->n_done++;
-  if (self->runner->n_done == self->runner->n_threads - 1)
-    g_cond_signal (&self->runner->cond_done);
-
-  do {
-    gint idx;
-
-    while (self->runner->n_todo == -1 && !self->runner->quit)
-      g_cond_wait (&self->runner->cond_todo, &self->runner->lock);
-
-    if (self->runner->quit)
-      break;
-
-    idx = self->runner->n_todo--;
-    g_assert (self->runner->n_todo >= -1);
-    g_mutex_unlock (&self->runner->lock);
+  GstParallelizedTaskRunner *runner = data;
+  gint idx;
 
-    g_assert (self->runner->func != NULL);
+  g_mutex_lock (&runner->lock);
+  idx = runner->n_todo--;
+  g_assert (runner->n_todo >= -1);
+  g_mutex_unlock (&runner->lock);
 
-    self->runner->func (self->runner->task_data[idx]);
+  g_assert (runner->func != NULL);
 
-    g_mutex_lock (&self->runner->lock);
-    self->runner->n_done++;
-    if (self->runner->n_done == self->runner->n_threads - 1)
-      g_cond_signal (&self->runner->cond_done);
-  } while (TRUE);
+  runner->func (runner->task_data[idx]);
+}
 
-  g_mutex_unlock (&self->runner->lock);
+static void
+gst_parallelized_task_runner_join (GstParallelizedTaskRunner * self)
+{
+  gboolean joined = FALSE;
 
-  return NULL;
+  while (!joined) {
+    g_mutex_lock (&self->lock);
+    if (!(joined = gst_queue_array_is_empty (self->tasks))) {
+      gpointer task = gst_queue_array_pop_head (self->tasks);
+      g_mutex_unlock (&self->lock);
+      gst_task_pool_join (self->pool, task);
+    } else {
+      g_mutex_unlock (&self->lock);
+    }
+  }
 }
 
 static void
 gst_parallelized_task_runner_free (GstParallelizedTaskRunner * self)
 {
-  guint i;
-
-  g_mutex_lock (&self->lock);
-  self->quit = TRUE;
-  g_cond_broadcast (&self->cond_todo);
-  g_mutex_unlock (&self->lock);
-
-  for (i = 1; i < self->n_threads; i++) {
-    if (!self->threads[i].thread)
-      continue;
-
-    g_thread_join (self->threads[i].thread);
-  }
+  gst_parallelized_task_runner_join (self);
 
+  gst_queue_array_free (self->tasks);
+  if (self->own_pool)
+    gst_task_pool_cleanup (self->pool);
+  gst_object_unref (self->pool);
   g_mutex_clear (&self->lock);
-  g_cond_clear (&self->cond_todo);
-  g_cond_clear (&self->cond_done);
-  g_free (self->threads);
   g_free (self);
 }
 
 static GstParallelizedTaskRunner *
-gst_parallelized_task_runner_new (guint n_threads)
+gst_parallelized_task_runner_new (guint n_threads, GstTaskPool * pool)
 {
   GstParallelizedTaskRunner *self;
-  guint i;
-  GError *err = NULL;
 
   if (n_threads == 0)
     n_threads = g_get_num_processors ();
 
   self = g_new0 (GstParallelizedTaskRunner, 1);
+
+  if (pool) {
+    self->pool = g_object_ref (pool);
+    self->own_pool = FALSE;
+
+    /* No reason to split up the work between more threads than the
+     * pool can spawn */
+    if (GST_IS_SHARED_TASK_POOL (pool))
+      n_threads =
+          MIN (n_threads,
+          gst_shared_task_pool_get_max_threads (GST_SHARED_TASK_POOL (pool)));
+  } else {
+    self->pool = gst_shared_task_pool_new ();
+    self->own_pool = TRUE;
+    gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (self->pool),
+        n_threads);
+    gst_task_pool_prepare (self->pool, NULL);
+  }
+
+  self->tasks = gst_queue_array_new (n_threads);
+
   self->n_threads = n_threads;
-  self->threads = g_new0 (GstParallelizedTaskThread, n_threads);
 
-  self->quit = FALSE;
   self->n_todo = -1;
-  self->n_done = 0;
   g_mutex_init (&self->lock);
-  g_cond_init (&self->cond_todo);
-  g_cond_init (&self->cond_done);
 
   /* Set when scheduling a job */
   self->func = NULL;
   self->task_data = NULL;
 
-  for (i = 0; i < n_threads; i++) {
-    self->threads[i].runner = self;
-    self->threads[i].idx = i;
-
-    /* First thread is the one calling run() */
-    if (i > 0) {
-      self->threads[i].thread =
-          g_thread_try_new ("videoconvert", gst_parallelized_task_thread_func,
-          &self->threads[i], &err);
-      if (!self->threads[i].thread)
-        goto error;
-    }
-  }
-
-  g_mutex_lock (&self->lock);
-  while (self->n_done < self->n_threads - 1)
-    g_cond_wait (&self->cond_done, &self->lock);
-  self->n_done = 0;
-  g_mutex_unlock (&self->lock);
-
   return self;
-
-error:
-  {
-    GST_ERROR ("Failed to start thread %u: %s", i, err->message);
-    g_clear_error (&err);
-
-    gst_parallelized_task_runner_free (self);
-    return NULL;
-  }
 }
 
 static void
@@ -287,22 +234,24 @@ gst_parallelized_task_runner_run (GstParallelizedTaskRunner * self,
   self->task_data = task_data;
 
   if (n_threads > 1) {
+    guint i = 0;
     g_mutex_lock (&self->lock);
     self->n_todo = self->n_threads - 2;
-    self->n_done = 0;
-    g_cond_broadcast (&self->cond_todo);
+    for (i = 1; i < n_threads; i++) {
+      gpointer task =
+          gst_task_pool_push (self->pool, gst_parallelized_task_thread_func,
+          self, NULL);
+
+      /* The return value of push() is unfortunately nullable, and we can't deal with that */
+      g_assert (task != NULL);
+      gst_queue_array_push_tail (self->tasks, task);
+    }
     g_mutex_unlock (&self->lock);
   }
 
   self->func (self->task_data[self->n_threads - 1]);
 
-  if (n_threads > 1) {
-    g_mutex_lock (&self->lock);
-    while (self->n_done < self->n_threads - 1)
-      g_cond_wait (&self->cond_done, &self->lock);
-    self->n_done = 0;
-    g_mutex_unlock (&self->lock);
-  }
+  gst_parallelized_task_runner_join (self);
 
   self->func = NULL;
   self->task_data = NULL;
@@ -2281,21 +2230,25 @@ convert_get_alpha_mode (GstVideoConverter * convert)
 }
 
 /**
- * gst_video_converter_new: (skip)
+ * gst_video_converter_new_with_pool: (skip)
  * @in_info: a #GstVideoInfo
  * @out_info: a #GstVideoInfo
  * @config: (transfer full): a #GstStructure with configuration options
+ * @pool: (nullable): a #GstTaskPool to spawn threads from
  *
  * Create a new converter object to convert between @in_info and @out_info
  * with @config.
  *
+ * The optional @pool can be used to spawn threads, this is useful when
+ * creating new converters rapidly, for example when updating cropping.
+ *
  * Returns: a #GstVideoConverter or %NULL if conversion is not possible.
  *
- * Since: 1.6
+ * Since: 1.20
  */
 GstVideoConverter *
-gst_video_converter_new (GstVideoInfo * in_info, GstVideoInfo * out_info,
-    GstStructure * config)
+gst_video_converter_new_with_pool (GstVideoInfo * in_info,
+    GstVideoInfo * out_info, GstStructure * config, GstTaskPool * pool)
 {
   GstVideoConverter *convert;
   GstLineCache *prev;
@@ -2431,7 +2384,8 @@ gst_video_converter_new (GstVideoInfo * in_info, GstVideoInfo * out_info,
   if (n_threads < 1)
     n_threads = 1;
 
-  convert->conversion_runner = gst_parallelized_task_runner_new (n_threads);
+  convert->conversion_runner =
+      gst_parallelized_task_runner_new (n_threads, pool);
 
   if (video_converter_lookup_fastpath (convert))
     goto done;
@@ -2520,6 +2474,26 @@ no_pack_func:
   }
 }
 
+/**
+ * gst_video_converter_new: (skip)
+ * @in_info: a #GstVideoInfo
+ * @out_info: a #GstVideoInfo
+ * @config: (transfer full): a #GstStructure with configuration options
+ *
+ * Create a new converter object to convert between @in_info and @out_info
+ * with @config.
+ *
+ * Returns: a #GstVideoConverter or %NULL if conversion is not possible.
+ *
+ * Since: 1.6
+ */
+GstVideoConverter *
+gst_video_converter_new (GstVideoInfo * in_info, GstVideoInfo * out_info,
+    GstStructure * config)
+{
+  return gst_video_converter_new_with_pool (in_info, out_info, config, NULL);
+}
+
 static void
 clear_matrix_data (MatrixData * data)
 {
index bdf03af..5c82d44 100644 (file)
@@ -281,6 +281,12 @@ GstVideoConverter *  gst_video_converter_new            (GstVideoInfo *in_info,
                                                          GstStructure *config);
 
 GST_VIDEO_API
+GstVideoConverter * gst_video_converter_new_with_pool   (GstVideoInfo * in_info,
+                                                         GstVideoInfo * out_info,
+                                                         GstStructure * config,
+                                                         GstTaskPool  * pool);
+
+GST_VIDEO_API
 void                 gst_video_converter_free           (GstVideoConverter * convert);
 
 GST_VIDEO_API
index acea722..b9b9ec7 100644 (file)
@@ -2719,6 +2719,84 @@ GST_START_TEST (test_video_convert)
 
 GST_END_TEST;
 
+GST_START_TEST (test_video_convert_multithreading)
+{
+  GstVideoInfo ininfo, outinfo;
+  GstVideoFrame inframe, outframe, refframe;
+  GstBuffer *inbuffer, *outbuffer, *refbuffer;
+  GstVideoConverter *convert;
+  GstMapInfo info;
+  GstTaskPool *pool;
+
+  /* Large enough input resolution for video-converter to actually use
+   * 4 threads if required */
+  fail_unless (gst_video_info_set_format (&ininfo, GST_VIDEO_FORMAT_ARGB, 1280,
+          720));
+  inbuffer = gst_buffer_new_and_alloc (ininfo.size);
+  gst_buffer_memset (inbuffer, 0, 0, -1);
+  gst_video_frame_map (&inframe, &ininfo, inbuffer, GST_MAP_READ);
+
+  fail_unless (gst_video_info_set_format (&outinfo, GST_VIDEO_FORMAT_BGRx, 400,
+          300));
+  outbuffer = gst_buffer_new_and_alloc (outinfo.size);
+  refbuffer = gst_buffer_new_and_alloc (outinfo.size);
+
+  gst_video_frame_map (&outframe, &outinfo, outbuffer, GST_MAP_WRITE);
+  gst_video_frame_map (&refframe, &outinfo, refbuffer, GST_MAP_WRITE);
+
+  /* Single threaded-conversion */
+  convert = gst_video_converter_new (&ininfo, &outinfo,
+      gst_structure_new_empty ("options"));
+  gst_video_converter_frame (convert, &inframe, &refframe);
+  gst_video_converter_free (convert);
+
+  /* Multithreaded conversion, converter creates pool */
+  convert = gst_video_converter_new (&ininfo, &outinfo,
+      gst_structure_new ("options",
+          GST_VIDEO_CONVERTER_OPT_THREADS, G_TYPE_UINT, 4, NULL)
+      );
+  gst_video_converter_frame (convert, &inframe, &outframe);
+  gst_video_converter_free (convert);
+
+  gst_video_frame_unmap (&outframe);
+  gst_video_frame_unmap (&refframe);
+
+  gst_buffer_map (outbuffer, &info, GST_MAP_READ);
+  fail_unless (gst_buffer_memcmp (refbuffer, 0, info.data, info.size) == 0);
+  gst_buffer_unmap (outbuffer, &info);
+
+  gst_video_frame_map (&outframe, &outinfo, outbuffer, GST_MAP_WRITE);
+  gst_video_frame_map (&refframe, &outinfo, refbuffer, GST_MAP_WRITE);
+
+  /* Multi-threaded conversion, user-provided pool */
+  pool = gst_shared_task_pool_new ();
+  gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (pool), 4);
+  gst_task_pool_prepare (pool, NULL);
+  convert = gst_video_converter_new_with_pool (&ininfo, &outinfo,
+      gst_structure_new ("options",
+          GST_VIDEO_CONVERTER_OPT_THREADS, G_TYPE_UINT, 4, NULL), pool);
+  gst_video_converter_frame (convert, &inframe, &outframe);
+  gst_video_converter_free (convert);
+  gst_task_pool_cleanup (pool);
+  gst_object_unref (pool);
+
+  gst_video_frame_unmap (&outframe);
+  gst_video_frame_unmap (&refframe);
+
+  gst_buffer_map (outbuffer, &info, GST_MAP_READ);
+  fail_unless (gst_buffer_memcmp (refbuffer, 0, info.data, info.size) == 0);
+  gst_buffer_unmap (outbuffer, &info);
+
+
+  gst_buffer_unref (refbuffer);
+  gst_buffer_unref (outbuffer);
+  gst_video_frame_unmap (&inframe);
+  gst_buffer_unref (inbuffer);
+
+}
+
+GST_END_TEST;
+
 GST_START_TEST (test_video_transfer)
 {
   gint i, j;
@@ -3889,6 +3967,7 @@ video_suite (void)
   tcase_add_test (tc_chain, test_video_color_convert_other);
   tcase_add_test (tc_chain, test_video_size_convert);
   tcase_add_test (tc_chain, test_video_convert);
+  tcase_add_test (tc_chain, test_video_convert_multithreading);
   tcase_add_test (tc_chain, test_video_transfer);
   tcase_add_test (tc_chain, test_overlay_blend);
   tcase_add_test (tc_chain, test_video_center_rect);