* and as such are protected with the object lock */
GstStructure *converter_config;
gboolean converter_config_changed;
+
+ GstTaskPool *task_pool;
};
G_DEFINE_TYPE_WITH_PRIVATE (GstVideoAggregatorConvertPad,
gst_structure_free (vaggpad->priv->converter_config);
vaggpad->priv->converter_config = NULL;
+ if (vaggpad->priv->task_pool)
+ gst_task_pool_cleanup (vaggpad->priv->task_pool);
+
+ gst_object_replace ((GstObject **) & vaggpad->priv->task_pool, NULL);
+
G_OBJECT_CLASS (gst_video_aggregator_pad_parent_class)->finalize (o);
}
GST_OBJECT_UNLOCK (pad);
}
+static guint
+get_opt_uint (const GstStructure * config, const gchar * opt, guint def)
+{
+ guint res;
+ if (!gst_structure_get_uint (config, opt, &res))
+ res = def;
+ return res;
+}
+
static gboolean
gst_video_aggregator_convert_pad_prepare_frame (GstVideoAggregatorPad * vpad,
GstVideoAggregator * vagg, GstBuffer * buffer,
pad->priv->convert = NULL;
if (!gst_video_info_is_equal (&vpad->info, &pad->priv->conversion_info)) {
+ if (pad->priv->converter_config) {
+ guint n_threads = get_opt_uint (pad->priv->converter_config,
+ GST_VIDEO_CONVERTER_OPT_THREADS, 1);
+
+ if (n_threads == 0 || n_threads > g_get_num_processors ())
+ n_threads = g_get_num_processors ();
+
+ gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (pad->priv->
+ task_pool), n_threads);
+ }
+
pad->priv->convert =
- gst_video_converter_new (&vpad->info, &pad->priv->conversion_info,
- pad->priv->converter_config ? gst_structure_copy (pad->priv->
- converter_config) : NULL);
+ gst_video_converter_new_with_pool (&vpad->info,
+ &pad->priv->conversion_info,
+ pad->priv->converter_config ? gst_structure_copy (pad->
+ priv->converter_config) : NULL, pad->priv->task_pool);
if (!pad->priv->convert) {
GST_WARNING_OBJECT (pad, "No path found for conversion");
return FALSE;
GST_DEBUG_OBJECT (pad, "This pad will be converted from %s to %s",
gst_video_format_to_string (GST_VIDEO_INFO_FORMAT (&vpad->info)),
- gst_video_format_to_string (GST_VIDEO_INFO_FORMAT (&pad->
- priv->conversion_info)));
+ gst_video_format_to_string (GST_VIDEO_INFO_FORMAT (&pad->priv->
+ conversion_info)));
} else {
GST_DEBUG_OBJECT (pad, "This pad will not need conversion");
}
vaggpad->priv->convert = NULL;
vaggpad->priv->converter_config = NULL;
vaggpad->priv->converter_config_changed = FALSE;
-}
+ vaggpad->priv->task_pool = gst_shared_task_pool_new ();
+ gst_task_pool_prepare (vaggpad->priv->task_pool, NULL);
+}
/**
* gst_video_aggregator_convert_pad_update_conversion_info:
#include <glib.h>
#include <string.h>
#include <math.h>
+#include <gst/base/base.h>
#include "video-orc.h"
typedef struct _GstParallelizedTaskRunner GstParallelizedTaskRunner;
typedef struct _GstParallelizedTaskThread GstParallelizedTaskThread;
-struct _GstParallelizedTaskThread
-{
- GstParallelizedTaskRunner *runner;
- guint idx;
- GThread *thread;
-};
-
struct _GstParallelizedTaskRunner
{
+ GstTaskPool *pool;
+ gboolean own_pool;
guint n_threads;
- GstParallelizedTaskThread *threads;
+ GstQueueArray *tasks;
GstParallelizedTaskFunc func;
gpointer *task_data;
GMutex lock;
- GCond cond_todo, cond_done;
- gint n_todo, n_done;
- gboolean quit;
+ gint n_todo;
};
-static gpointer
+static void
gst_parallelized_task_thread_func (gpointer data)
{
- GstParallelizedTaskThread *self = data;
-
-#if 0
-#ifdef HAVE_PTHREAD
- {
- pthread_t thread = pthread_self ();
- cpu_set_t cpuset;
- int r;
-
- CPU_ZERO (&cpuset);
- CPU_SET (self->idx, &cpuset);
- if ((r = pthread_setaffinity_np (thread, sizeof (cpuset), &cpuset)) != 0)
- GST_ERROR ("Failed to set thread affinity for thread %d: %s", self->idx,
- g_strerror (r));
- }
-#endif
-#endif
-
- g_mutex_lock (&self->runner->lock);
- self->runner->n_done++;
- if (self->runner->n_done == self->runner->n_threads - 1)
- g_cond_signal (&self->runner->cond_done);
-
- do {
- gint idx;
-
- while (self->runner->n_todo == -1 && !self->runner->quit)
- g_cond_wait (&self->runner->cond_todo, &self->runner->lock);
-
- if (self->runner->quit)
- break;
-
- idx = self->runner->n_todo--;
- g_assert (self->runner->n_todo >= -1);
- g_mutex_unlock (&self->runner->lock);
+ GstParallelizedTaskRunner *runner = data;
+ gint idx;
- g_assert (self->runner->func != NULL);
+ g_mutex_lock (&runner->lock);
+ idx = runner->n_todo--;
+ g_assert (runner->n_todo >= -1);
+ g_mutex_unlock (&runner->lock);
- self->runner->func (self->runner->task_data[idx]);
+ g_assert (runner->func != NULL);
- g_mutex_lock (&self->runner->lock);
- self->runner->n_done++;
- if (self->runner->n_done == self->runner->n_threads - 1)
- g_cond_signal (&self->runner->cond_done);
- } while (TRUE);
+ runner->func (runner->task_data[idx]);
+}
- g_mutex_unlock (&self->runner->lock);
+static void
+gst_parallelized_task_runner_join (GstParallelizedTaskRunner * self)
+{
+ gboolean joined = FALSE;
- return NULL;
+ while (!joined) {
+ g_mutex_lock (&self->lock);
+ if (!(joined = gst_queue_array_is_empty (self->tasks))) {
+ gpointer task = gst_queue_array_pop_head (self->tasks);
+ g_mutex_unlock (&self->lock);
+ gst_task_pool_join (self->pool, task);
+ } else {
+ g_mutex_unlock (&self->lock);
+ }
+ }
}
static void
gst_parallelized_task_runner_free (GstParallelizedTaskRunner * self)
{
- guint i;
-
- g_mutex_lock (&self->lock);
- self->quit = TRUE;
- g_cond_broadcast (&self->cond_todo);
- g_mutex_unlock (&self->lock);
-
- for (i = 1; i < self->n_threads; i++) {
- if (!self->threads[i].thread)
- continue;
-
- g_thread_join (self->threads[i].thread);
- }
+ gst_parallelized_task_runner_join (self);
+ gst_queue_array_free (self->tasks);
+ if (self->own_pool)
+ gst_task_pool_cleanup (self->pool);
+ gst_object_unref (self->pool);
g_mutex_clear (&self->lock);
- g_cond_clear (&self->cond_todo);
- g_cond_clear (&self->cond_done);
- g_free (self->threads);
g_free (self);
}
static GstParallelizedTaskRunner *
-gst_parallelized_task_runner_new (guint n_threads)
+gst_parallelized_task_runner_new (guint n_threads, GstTaskPool * pool)
{
GstParallelizedTaskRunner *self;
- guint i;
- GError *err = NULL;
if (n_threads == 0)
n_threads = g_get_num_processors ();
self = g_new0 (GstParallelizedTaskRunner, 1);
+
+ if (pool) {
+ self->pool = g_object_ref (pool);
+ self->own_pool = FALSE;
+
+ /* No reason to split up the work between more threads than the
+ * pool can spawn */
+ if (GST_IS_SHARED_TASK_POOL (pool))
+ n_threads =
+ MIN (n_threads,
+ gst_shared_task_pool_get_max_threads (GST_SHARED_TASK_POOL (pool)));
+ } else {
+ self->pool = gst_shared_task_pool_new ();
+ self->own_pool = TRUE;
+ gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (self->pool),
+ n_threads);
+ gst_task_pool_prepare (self->pool, NULL);
+ }
+
+ self->tasks = gst_queue_array_new (n_threads);
+
self->n_threads = n_threads;
- self->threads = g_new0 (GstParallelizedTaskThread, n_threads);
- self->quit = FALSE;
self->n_todo = -1;
- self->n_done = 0;
g_mutex_init (&self->lock);
- g_cond_init (&self->cond_todo);
- g_cond_init (&self->cond_done);
/* Set when scheduling a job */
self->func = NULL;
self->task_data = NULL;
- for (i = 0; i < n_threads; i++) {
- self->threads[i].runner = self;
- self->threads[i].idx = i;
-
- /* First thread is the one calling run() */
- if (i > 0) {
- self->threads[i].thread =
- g_thread_try_new ("videoconvert", gst_parallelized_task_thread_func,
- &self->threads[i], &err);
- if (!self->threads[i].thread)
- goto error;
- }
- }
-
- g_mutex_lock (&self->lock);
- while (self->n_done < self->n_threads - 1)
- g_cond_wait (&self->cond_done, &self->lock);
- self->n_done = 0;
- g_mutex_unlock (&self->lock);
-
return self;
-
-error:
- {
- GST_ERROR ("Failed to start thread %u: %s", i, err->message);
- g_clear_error (&err);
-
- gst_parallelized_task_runner_free (self);
- return NULL;
- }
}
static void
self->task_data = task_data;
if (n_threads > 1) {
+ guint i = 0;
g_mutex_lock (&self->lock);
self->n_todo = self->n_threads - 2;
- self->n_done = 0;
- g_cond_broadcast (&self->cond_todo);
+ for (i = 1; i < n_threads; i++) {
+ gpointer task =
+ gst_task_pool_push (self->pool, gst_parallelized_task_thread_func,
+ self, NULL);
+
+ /* The return value of push() is unfortunately nullable, and we can't deal with that */
+ g_assert (task != NULL);
+ gst_queue_array_push_tail (self->tasks, task);
+ }
g_mutex_unlock (&self->lock);
}
self->func (self->task_data[self->n_threads - 1]);
- if (n_threads > 1) {
- g_mutex_lock (&self->lock);
- while (self->n_done < self->n_threads - 1)
- g_cond_wait (&self->cond_done, &self->lock);
- self->n_done = 0;
- g_mutex_unlock (&self->lock);
- }
+ gst_parallelized_task_runner_join (self);
self->func = NULL;
self->task_data = NULL;
}
/**
- * gst_video_converter_new: (skip)
+ * gst_video_converter_new_with_pool: (skip)
* @in_info: a #GstVideoInfo
* @out_info: a #GstVideoInfo
* @config: (transfer full): a #GstStructure with configuration options
+ * @pool: (nullable): a #GstTaskPool to spawn threads from
*
* Create a new converter object to convert between @in_info and @out_info
* with @config.
*
+ * The optional @pool can be used to spawn threads, this is useful when
+ * creating new converters rapidly, for example when updating cropping.
+ *
* Returns: a #GstVideoConverter or %NULL if conversion is not possible.
*
- * Since: 1.6
+ * Since: 1.20
*/
GstVideoConverter *
-gst_video_converter_new (GstVideoInfo * in_info, GstVideoInfo * out_info,
- GstStructure * config)
+gst_video_converter_new_with_pool (GstVideoInfo * in_info,
+ GstVideoInfo * out_info, GstStructure * config, GstTaskPool * pool)
{
GstVideoConverter *convert;
GstLineCache *prev;
if (n_threads < 1)
n_threads = 1;
- convert->conversion_runner = gst_parallelized_task_runner_new (n_threads);
+ convert->conversion_runner =
+ gst_parallelized_task_runner_new (n_threads, pool);
if (video_converter_lookup_fastpath (convert))
goto done;
}
}
+/**
+ * gst_video_converter_new: (skip)
+ * @in_info: a #GstVideoInfo
+ * @out_info: a #GstVideoInfo
+ * @config: (transfer full): a #GstStructure with configuration options
+ *
+ * Create a new converter object to convert between @in_info and @out_info
+ * with @config.
+ *
+ * Returns: a #GstVideoConverter or %NULL if conversion is not possible.
+ *
+ * Since: 1.6
+ */
+GstVideoConverter *
+gst_video_converter_new (GstVideoInfo * in_info, GstVideoInfo * out_info,
+ GstStructure * config)
+{
+ return gst_video_converter_new_with_pool (in_info, out_info, config, NULL);
+}
+
static void
clear_matrix_data (MatrixData * data)
{
GST_END_TEST;
+GST_START_TEST (test_video_convert_multithreading)
+{
+ GstVideoInfo ininfo, outinfo;
+ GstVideoFrame inframe, outframe, refframe;
+ GstBuffer *inbuffer, *outbuffer, *refbuffer;
+ GstVideoConverter *convert;
+ GstMapInfo info;
+ GstTaskPool *pool;
+
+ /* Large enough input resolution for video-converter to actually use
+ * 4 threads if required */
+ fail_unless (gst_video_info_set_format (&ininfo, GST_VIDEO_FORMAT_ARGB, 1280,
+ 720));
+ inbuffer = gst_buffer_new_and_alloc (ininfo.size);
+ gst_buffer_memset (inbuffer, 0, 0, -1);
+ gst_video_frame_map (&inframe, &ininfo, inbuffer, GST_MAP_READ);
+
+ fail_unless (gst_video_info_set_format (&outinfo, GST_VIDEO_FORMAT_BGRx, 400,
+ 300));
+ outbuffer = gst_buffer_new_and_alloc (outinfo.size);
+ refbuffer = gst_buffer_new_and_alloc (outinfo.size);
+
+ gst_video_frame_map (&outframe, &outinfo, outbuffer, GST_MAP_WRITE);
+ gst_video_frame_map (&refframe, &outinfo, refbuffer, GST_MAP_WRITE);
+
+ /* Single threaded-conversion */
+ convert = gst_video_converter_new (&ininfo, &outinfo,
+ gst_structure_new_empty ("options"));
+ gst_video_converter_frame (convert, &inframe, &refframe);
+ gst_video_converter_free (convert);
+
+ /* Multithreaded conversion, converter creates pool */
+ convert = gst_video_converter_new (&ininfo, &outinfo,
+ gst_structure_new ("options",
+ GST_VIDEO_CONVERTER_OPT_THREADS, G_TYPE_UINT, 4, NULL)
+ );
+ gst_video_converter_frame (convert, &inframe, &outframe);
+ gst_video_converter_free (convert);
+
+ gst_video_frame_unmap (&outframe);
+ gst_video_frame_unmap (&refframe);
+
+ gst_buffer_map (outbuffer, &info, GST_MAP_READ);
+ fail_unless (gst_buffer_memcmp (refbuffer, 0, info.data, info.size) == 0);
+ gst_buffer_unmap (outbuffer, &info);
+
+ gst_video_frame_map (&outframe, &outinfo, outbuffer, GST_MAP_WRITE);
+ gst_video_frame_map (&refframe, &outinfo, refbuffer, GST_MAP_WRITE);
+
+ /* Multi-threaded conversion, user-provided pool */
+ pool = gst_shared_task_pool_new ();
+ gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (pool), 4);
+ gst_task_pool_prepare (pool, NULL);
+ convert = gst_video_converter_new_with_pool (&ininfo, &outinfo,
+ gst_structure_new ("options",
+ GST_VIDEO_CONVERTER_OPT_THREADS, G_TYPE_UINT, 4, NULL), pool);
+ gst_video_converter_frame (convert, &inframe, &outframe);
+ gst_video_converter_free (convert);
+ gst_task_pool_cleanup (pool);
+ gst_object_unref (pool);
+
+ gst_video_frame_unmap (&outframe);
+ gst_video_frame_unmap (&refframe);
+
+ gst_buffer_map (outbuffer, &info, GST_MAP_READ);
+ fail_unless (gst_buffer_memcmp (refbuffer, 0, info.data, info.size) == 0);
+ gst_buffer_unmap (outbuffer, &info);
+
+
+ gst_buffer_unref (refbuffer);
+ gst_buffer_unref (outbuffer);
+ gst_video_frame_unmap (&inframe);
+ gst_buffer_unref (inbuffer);
+
+}
+
+GST_END_TEST;
+
GST_START_TEST (test_video_transfer)
{
gint i, j;
tcase_add_test (tc_chain, test_video_color_convert_other);
tcase_add_test (tc_chain, test_video_size_convert);
tcase_add_test (tc_chain, test_video_convert);
+ tcase_add_test (tc_chain, test_video_convert_multithreading);
tcase_add_test (tc_chain, test_video_transfer);
tcase_add_test (tc_chain, test_overlay_blend);
tcase_add_test (tc_chain, test_video_center_rect);