};
G_DEFINE_TYPE (GstCompositorPad, gst_compositor_pad,
- GST_TYPE_VIDEO_AGGREGATOR_CONVERT_PAD);
+ GST_TYPE_VIDEO_AGGREGATOR_PARALLEL_CONVERT_PAD);
static void
gst_compositor_pad_get_property (GObject * object, guint prop_id,
return TRUE;
}
-static gboolean
-gst_compositor_pad_prepare_frame (GstVideoAggregatorPad * pad,
+static void
+gst_compositor_pad_prepare_frame_start (GstVideoAggregatorPad * pad,
GstVideoAggregator * vagg, GstBuffer * buffer,
GstVideoFrame * prepared_frame)
{
if (cpad->alpha == 0.0) {
GST_DEBUG_OBJECT (pad, "Pad has alpha 0.0, not converting frame");
- goto done;
+ return;
}
frame_rect = clamp_rectangle (cpad->xpos, cpad->ypos, width, height,
if (frame_rect.w == 0 || frame_rect.h == 0) {
GST_DEBUG_OBJECT (pad, "Resulting frame is zero-width or zero-height "
"(w: %i, h: %i), skipping", frame_rect.w, frame_rect.h);
- goto done;
+ return;
}
GST_OBJECT_LOCK (vagg);
GST_OBJECT_UNLOCK (vagg);
if (frame_obscured)
- goto done;
+ return;
- return
- GST_VIDEO_AGGREGATOR_PAD_CLASS
- (gst_compositor_pad_parent_class)->prepare_frame (pad, vagg, buffer,
+ GST_VIDEO_AGGREGATOR_PAD_CLASS
+ (gst_compositor_pad_parent_class)->prepare_frame_start (pad, vagg, buffer,
prepared_frame);
-
-done:
-
- return TRUE;
}
static void
GST_TYPE_COMPOSITOR_OPERATOR, DEFAULT_PAD_OPERATOR,
G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
- vaggpadclass->prepare_frame =
- GST_DEBUG_FUNCPTR (gst_compositor_pad_prepare_frame);
+ vaggpadclass->prepare_frame_start =
+ GST_DEBUG_FUNCPTR (gst_compositor_pad_prepare_frame_start);
vaggcpadclass->create_conversion_info =
GST_DEBUG_FUNCPTR (gst_compositor_pad_create_conversion_info);
return ret;
}
-static gpointer
+static void
gst_parallelized_task_thread_func (gpointer data)
{
- GstParallelizedTaskThread *self = data;
-
- g_mutex_lock (&self->runner->lock);
- self->runner->n_done++;
- if (self->runner->n_done == self->runner->n_threads - 1)
- g_cond_signal (&self->runner->cond_done);
-
- do {
- gint idx;
-
- while (self->runner->n_todo == -1 && !self->runner->quit)
- g_cond_wait (&self->runner->cond_todo, &self->runner->lock);
-
- if (self->runner->quit)
- break;
+ GstParallelizedTaskRunner *runner = data;
+ gint idx;
- idx = self->runner->n_todo--;
- g_assert (self->runner->n_todo >= -1);
- g_mutex_unlock (&self->runner->lock);
+ g_mutex_lock (&runner->lock);
+ idx = runner->n_todo--;
+ g_assert (runner->n_todo >= -1);
+ g_mutex_unlock (&runner->lock);
- g_assert (self->runner->func != NULL);
+ g_assert (runner->func != NULL);
- self->runner->func (self->runner->task_data[idx]);
-
- g_mutex_lock (&self->runner->lock);
- self->runner->n_done++;
- if (self->runner->n_done == self->runner->n_threads - 1)
- g_cond_signal (&self->runner->cond_done);
- } while (TRUE);
+ runner->func (runner->task_data[idx]);
+}
- g_mutex_unlock (&self->runner->lock);
+static void
+gst_parallelized_task_runner_join (GstParallelizedTaskRunner * self)
+{
+ gboolean joined = FALSE;
- return NULL;
+ while (!joined) {
+ g_mutex_lock (&self->lock);
+ if (!(joined = gst_queue_array_is_empty (self->tasks))) {
+ gpointer task = gst_queue_array_pop_head (self->tasks);
+ g_mutex_unlock (&self->lock);
+ gst_task_pool_join (self->pool, task);
+ } else {
+ g_mutex_unlock (&self->lock);
+ }
+ }
}
static void
gst_parallelized_task_runner_free (GstParallelizedTaskRunner * self)
{
- guint i;
-
- g_mutex_lock (&self->lock);
- self->quit = TRUE;
- g_cond_broadcast (&self->cond_todo);
- g_mutex_unlock (&self->lock);
-
- for (i = 1; i < self->n_threads; i++) {
- if (!self->threads[i].thread)
- continue;
-
- g_thread_join (self->threads[i].thread);
- }
+ gst_parallelized_task_runner_join (self);
+ gst_queue_array_free (self->tasks);
+ if (self->own_pool)
+ gst_task_pool_cleanup (self->pool);
+ gst_object_unref (self->pool);
g_mutex_clear (&self->lock);
- g_cond_clear (&self->cond_todo);
- g_cond_clear (&self->cond_done);
- g_free (self->threads);
g_free (self);
}
static GstParallelizedTaskRunner *
-gst_parallelized_task_runner_new (guint n_threads)
+gst_parallelized_task_runner_new (guint n_threads, GstTaskPool * pool,
+ gboolean async_tasks)
{
GstParallelizedTaskRunner *self;
- guint i;
- GError *err = NULL;
if (n_threads == 0)
n_threads = g_get_num_processors ();
self = g_new0 (GstParallelizedTaskRunner, 1);
+
+ if (pool) {
+ self->pool = g_object_ref (pool);
+ self->own_pool = FALSE;
+
+ /* No reason to split up the work between more threads than the
+ * pool can spawn */
+ if (GST_IS_SHARED_TASK_POOL (pool))
+ n_threads =
+ MIN (n_threads,
+ gst_shared_task_pool_get_max_threads (GST_SHARED_TASK_POOL (pool)));
+ } else {
+ self->pool = gst_shared_task_pool_new ();
+ self->own_pool = TRUE;
+ gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (self->pool),
+ n_threads);
+ gst_task_pool_prepare (self->pool, NULL);
+ }
+
+ self->tasks = gst_queue_array_new (n_threads);
+
self->n_threads = n_threads;
- self->threads = g_new0 (GstParallelizedTaskThread, n_threads);
- self->quit = FALSE;
self->n_todo = -1;
- self->n_done = 0;
g_mutex_init (&self->lock);
- g_cond_init (&self->cond_todo);
- g_cond_init (&self->cond_done);
/* Set when scheduling a job */
self->func = NULL;
self->task_data = NULL;
-
- for (i = 0; i < n_threads; i++) {
- self->threads[i].runner = self;
- self->threads[i].idx = i;
-
- /* First thread is the one calling run() */
- if (i > 0) {
- self->threads[i].thread =
- g_thread_try_new ("compositor-blend",
- gst_parallelized_task_thread_func, &self->threads[i], &err);
- if (!self->threads[i].thread)
- goto error;
- }
- }
-
- g_mutex_lock (&self->lock);
- while (self->n_done < self->n_threads - 1)
- g_cond_wait (&self->cond_done, &self->lock);
- self->n_done = 0;
- g_mutex_unlock (&self->lock);
+ self->async_tasks = async_tasks;
return self;
+}
-error:
- {
- GST_ERROR ("Failed to start thread %u: %s", i, err->message);
- g_clear_error (&err);
+static void
+gst_parallelized_task_runner_finish (GstParallelizedTaskRunner * self)
+{
+ g_return_if_fail (self->func != NULL);
- gst_parallelized_task_runner_free (self);
- return NULL;
- }
+ gst_parallelized_task_runner_join (self);
+
+ self->func = NULL;
+ self->task_data = NULL;
}
static void
self->func = func;
self->task_data = task_data;
- if (n_threads > 1) {
+ if (n_threads > 1 || self->async_tasks) {
+ guint i = 0;
g_mutex_lock (&self->lock);
- self->n_todo = self->n_threads - 2;
- self->n_done = 0;
- g_cond_broadcast (&self->cond_todo);
+ self->n_todo = self->n_threads - 1;
+ if (!self->async_tasks) {
+ /* if not async, perform one of the functions in the current thread */
+ self->n_todo--;
+ i = 1;
+ }
+ for (; i < n_threads; i++) {
+ gpointer task =
+ gst_task_pool_push (self->pool, gst_parallelized_task_thread_func,
+ self, NULL);
+
+ /* The return value of push() is nullable but NULL is only returned
+ * with the shared task pool when gst_task_pool_prepare() has not been
+ * called and would thus be a programming error that we should hard-fail
+ * on.
+ */
+ g_assert (task != NULL);
+ gst_queue_array_push_tail (self->tasks, task);
+ }
g_mutex_unlock (&self->lock);
}
- self->func (self->task_data[self->n_threads - 1]);
+ if (!self->async_tasks) {
+ self->func (self->task_data[self->n_threads - 1]);
- if (n_threads > 1) {
- g_mutex_lock (&self->lock);
- while (self->n_done < self->n_threads - 1)
- g_cond_wait (&self->cond_done, &self->lock);
- self->n_done = 0;
- g_mutex_unlock (&self->lock);
+ gst_parallelized_task_runner_finish (self);
}
-
- self->func = NULL;
- self->task_data = NULL;
}
static gboolean
_negotiated_caps (GstAggregator * agg, GstCaps * caps)
{
GstCompositor *compositor = GST_COMPOSITOR (agg);
+ GstVideoAggregator *vagg = GST_VIDEO_AGGREGATOR (agg);
GstVideoInfo v_info;
guint n_threads;
gst_parallelized_task_runner_free (compositor->blend_runner);
compositor->blend_runner = NULL;
}
- if (!compositor->blend_runner)
- compositor->blend_runner = gst_parallelized_task_runner_new (n_threads);
+ if (!compositor->blend_runner) {
+ GstTaskPool *pool = gst_video_aggregator_get_execution_task_pool (vagg);
+ compositor->blend_runner =
+ gst_parallelized_task_runner_new (n_threads, pool, FALSE);
+ gst_clear_object (&pool);
+ }
return GST_AGGREGATOR_CLASS (parent_class)->negotiated_src_caps (agg, caps);
}