From: Matthias Maennich Date: Tue, 4 Feb 2020 13:05:49 +0000 (+0000) Subject: abg-workers: Rework the worker queue to improve concurrent behaviour X-Git-Tag: upstream/1.7~10 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=91a4274c9c676c7479924a958dfa92021207dbf3;p=platform%2Fupstream%2Flibabigail.git abg-workers: Rework the worker queue to improve concurrent behaviour This patch refactors the abigail::workers::queue and abigail::workers::worker implementations to avoid holding locking primitives longer than necessary. In particular, the queue_cond_mutex was held during the entiry worker runtime, effectively serializing the workers. Hence, use a mutex+cond pair for each, the input and output queue and only synchronize around the interaction with their corresponding queues. The tasks_todo_(mutex|cond) are meant to synchronize scheduling and distribution of work among workers, while tasks_done_(mutex|cond) are used for synchronizing threads when putting back the tasks to the output queue and to hold back threads waiting for the queue and workers to drain. Along that way, I did some cleanup that was now possible. - Move entire implementation of abigail::workers::task into header. - Make default_notify a static member. - Replace the multiple constructors with one with default arguments. * include/abg-workers.h (workers::task): move entire implementation to header and drop superfluous forward declaration. * src/abg-workers.cc (workers::task):: Likewise. (workers::queue::priv): Drop queue_cond_mutex, rename queue_cond to tasks_todo_cond, add task_done_cond, make default_notify static. (workers::queue::priv::priv): Add default arguments to fully qualified constructor, drop the remaining ones. (workers::queue:prive::more_tasks_to_execute): Drop method. (workers::queue:prive::schedule_task): Do not synchronize access to the queue condition variable, but only on the mutex. (do_bring_workers_down): Likewise. Also await tasks_done to be empty. (workers::queue:prive::worker::wait_to_execute_a_task): Await tasks on the tasks_todo with tasks_todo_(cond|mutex) and signal task completion to tasks_done_cond. Signed-off-by: Matthias Maennich Signed-off-by: Dodji Seketeli --- diff --git a/include/abg-workers.h b/include/abg-workers.h index 2892837b..ccd547a8 100644 --- a/include/abg-workers.h +++ b/include/abg-workers.h @@ -45,9 +45,6 @@ namespace abigail namespace workers { -class task; -typedef shared_ptr task_sptr; - size_t get_number_of_threads(); /// This represents a task to be performed. @@ -60,13 +57,14 @@ size_t get_number_of_threads(); class task { public: - task(); virtual void perform() = 0; - virtual ~task(); + virtual ~task(){}; }; // end class task. +typedef shared_ptr task_sptr; + /// This represents a queue of tasks to be performed. /// /// Tasks are performed by a number of worker threads. diff --git a/src/abg-workers.cc b/src/abg-workers.cc index 4771a205..5e569cd7 100644 --- a/src/abg-workers.cc +++ b/src/abg-workers.cc @@ -90,23 +90,6 @@ size_t get_number_of_threads() {return sysconf(_SC_NPROCESSORS_ONLN);} -// - -/// The default constructor of the @ref task type -task::task() -{} - -/// Destructor of the @ref task type. -task::~task() -{} -// - -// -struct worker; - -/// A convenience typedef for a shared_ptr to the @ref worker type. -typedef shared_ptr worker_sptr; - /// The abstraction of a worker thread. /// /// This is an implementation detail of the @ref queue public @@ -135,21 +118,20 @@ struct queue::priv bool bring_workers_down; // The number of worker threads. size_t num_workers; - // The mutex associated to the queue condition variable below. - pthread_mutex_t queue_cond_mutex; + // A mutex that protects the todo tasks queue from being accessed in + // read/write by two threads at the same time. + pthread_mutex_t tasks_todo_mutex; // The queue condition variable. This condition is used to make the // worker threads sleep until a new task is added to the queue of // todo tasks. Whenever a new task is added to that queue, a signal - // is sent to all the threads sleeping on this condition variable, - // and only one of them wakes up and takes the mutex - // queue_cond_mutex above. - pthread_cond_t queue_cond; - // A mutex that protects the todo tasks queue from being accessed in - // read/write by two threads at the same time. - mutable pthread_mutex_t tasks_todo_mutex; + // is sent to all a thread sleeping on this condition variable. + pthread_cond_t tasks_todo_cond; // A mutex that protects the done tasks queue from being accessed in // read/write by two threads at the same time. pthread_mutex_t tasks_done_mutex; + // A condition to be signalled whenever there is a task done. That is being + // used to wait for tasks completed when bringing the workers down. + pthread_cond_t tasks_done_cond; // The todo task queue itself. std::queue tasks_todo; // The done task queue itself. @@ -159,39 +141,14 @@ struct queue::priv // vector. We call it a notifier. This notifier is the default // notifier of the work queue; the one that is used when the user // has specified no notifier. It basically does nothing. - task_done_notify default_notify; + static task_done_notify default_notify; // This is a reference to the the notifier that is actually used in // the queue. It's either the one specified by the user or the // default one. - task_done_notify& notify; + task_done_notify& notify; // A vector of the worker threads. std::vector workers; - /// The default constructor of @ref queue::priv. - priv() - : bring_workers_down(), - num_workers(get_number_of_threads()), - queue_cond_mutex(), - queue_cond(), - tasks_todo_mutex(), - tasks_done_mutex(), - notify(default_notify) - {create_workers();} - - /// A constructor of @ref queue::priv. - /// - /// @param nb_workers the number of worker threads to have in the - /// thread pool. - priv(size_t nb_workers) - : bring_workers_down(), - num_workers(nb_workers), - queue_cond_mutex(), - queue_cond(), - tasks_todo_mutex(), - tasks_done_mutex(), - notify(default_notify) - {create_workers();} - /// A constructor of @ref queue::priv. /// /// @param nb_workers the number of worker threads to have in the @@ -200,28 +157,17 @@ struct queue::priv /// @param task_done_notify a functor object that is invoked by the /// worker thread which has performed the task, right after it's /// added that task to the vector of the done tasks. - priv(size_t nb_workers, task_done_notify& n) + priv(size_t nb_workers = get_number_of_threads(), + task_done_notify& n = default_notify) : bring_workers_down(), num_workers(nb_workers), - queue_cond_mutex(), - queue_cond(), tasks_todo_mutex(), + tasks_todo_cond(), tasks_done_mutex(), + tasks_done_cond(), notify(n) {create_workers();} - /// Tests if there are more tasks to execute from the task queue. - /// - /// @return true iff there are more tasks to execute. - bool - more_tasks_to_execute() const - { - pthread_mutex_lock(&tasks_todo_mutex); - bool result = !tasks_todo.empty(); - pthread_mutex_unlock(&tasks_todo_mutex); - return result; - } - /// Create the worker threads pool and have all threads sit idle, /// waiting for a task to be added to the todo queue. void @@ -258,10 +204,7 @@ struct queue::priv pthread_mutex_lock(&tasks_todo_mutex); tasks_todo.push(t); pthread_mutex_unlock(&tasks_todo_mutex); - - pthread_mutex_lock(&queue_cond_mutex); - pthread_cond_signal(&queue_cond); - pthread_mutex_unlock(&queue_cond_mutex); + pthread_cond_signal(&tasks_todo_cond); return true; } @@ -301,13 +244,17 @@ struct queue::priv if (workers.empty()) return; - // Acquire the mutex that protects the queue condition variable - // (queue_cond) and wake up all the workers that are sleeping on - // the condition. - pthread_mutex_lock(&queue_cond_mutex); + // Wait for the todo list to be empty to make sure all tasks got picked up + pthread_mutex_lock(&tasks_todo_mutex); + while (!tasks_todo.empty()) + pthread_cond_wait(&tasks_done_cond, &tasks_todo_mutex); + + pthread_mutex_unlock(&tasks_todo_mutex); + + // Now that the task queue is empty, drain the workers by waking them up, + // letting them finish their final task before termination. bring_workers_down = true; - ABG_ASSERT(pthread_cond_broadcast(&queue_cond) == 0); - pthread_mutex_unlock(&queue_cond_mutex); + ABG_ASSERT(pthread_cond_broadcast(&tasks_todo_cond) == 0); for (std::vector::const_iterator i = workers.begin(); i != workers.end(); @@ -322,6 +269,9 @@ struct queue::priv }; //end struct queue::priv +// default initialize the default notifier. +queue::task_done_notify queue::priv::default_notify; + /// Default constructor of the @ref queue type. /// /// By default the queue is created with a number of worker threaders @@ -438,20 +388,17 @@ queue::task_done_notify::operator()(const task_sptr&/*task_done*/) queue::priv* worker::wait_to_execute_a_task(queue::priv* p) { - - pthread_mutex_lock(&p->queue_cond_mutex); - do { + pthread_mutex_lock(&p->tasks_todo_mutex); // If there is no more tasks to perform and the queue is not to // be brought down then wait (sleep) for new tasks to come up. - while (!p->more_tasks_to_execute() && !p->bring_workers_down) - pthread_cond_wait(&p->queue_cond, &p->queue_cond_mutex); + while (p->tasks_todo.empty() && !p->bring_workers_down) + pthread_cond_wait(&p->tasks_todo_cond, &p->tasks_todo_mutex); // We were woken up. So maybe there are tasks to perform? If // so, get a task from the queue ... task_sptr t; - pthread_mutex_lock(&p->tasks_todo_mutex); if (!p->tasks_todo.empty()) { t = p->tasks_todo.front(); @@ -477,11 +424,10 @@ worker::wait_to_execute_a_task(queue::priv* p) p->tasks_done.push_back(t); p->notify(t); pthread_mutex_unlock(&p->tasks_done_mutex); + pthread_cond_signal(&p->tasks_done_cond); } } - while (!p->bring_workers_down || p->more_tasks_to_execute()); - - pthread_mutex_unlock(&p->queue_cond_mutex); + while (!p->bring_workers_down); return p; }