get_number_of_threads()
{return sysconf(_SC_NPROCESSORS_ONLN);}
-// <task stuff>
-
-/// The default constructor of the @ref task type
-task::task()
-{}
-
-/// Destructor of the @ref task type.
-task::~task()
-{}
-// </task stuff>
-
-// <worker declarations>
-struct worker;
-
-/// A convenience typedef for a shared_ptr to the @ref worker type.
-typedef shared_ptr<worker> worker_sptr;
-
/// The abstraction of a worker thread.
///
/// This is an implementation detail of the @ref queue public
bool bring_workers_down;
// The number of worker threads.
size_t num_workers;
- // The mutex associated to the queue condition variable below.
- pthread_mutex_t queue_cond_mutex;
+ // A mutex that protects the todo tasks queue from being accessed in
+ // read/write by two threads at the same time.
+ pthread_mutex_t tasks_todo_mutex;
// The queue condition variable. This condition is used to make the
// worker threads sleep until a new task is added to the queue of
// todo tasks. Whenever a new task is added to that queue, a signal
- // is sent to all the threads sleeping on this condition variable,
- // and only one of them wakes up and takes the mutex
- // queue_cond_mutex above.
- pthread_cond_t queue_cond;
- // A mutex that protects the todo tasks queue from being accessed in
- // read/write by two threads at the same time.
- mutable pthread_mutex_t tasks_todo_mutex;
+ // is sent to all a thread sleeping on this condition variable.
+ pthread_cond_t tasks_todo_cond;
// A mutex that protects the done tasks queue from being accessed in
// read/write by two threads at the same time.
pthread_mutex_t tasks_done_mutex;
+ // A condition to be signalled whenever there is a task done. That is being
+ // used to wait for tasks completed when bringing the workers down.
+ pthread_cond_t tasks_done_cond;
// The todo task queue itself.
std::queue<task_sptr> tasks_todo;
// The done task queue itself.
// vector. We call it a notifier. This notifier is the default
// notifier of the work queue; the one that is used when the user
// has specified no notifier. It basically does nothing.
- task_done_notify default_notify;
+ static task_done_notify default_notify;
// This is a reference to the the notifier that is actually used in
// the queue. It's either the one specified by the user or the
// default one.
- task_done_notify& notify;
+ task_done_notify& notify;
// A vector of the worker threads.
std::vector<worker> workers;
- /// The default constructor of @ref queue::priv.
- priv()
- : bring_workers_down(),
- num_workers(get_number_of_threads()),
- queue_cond_mutex(),
- queue_cond(),
- tasks_todo_mutex(),
- tasks_done_mutex(),
- notify(default_notify)
- {create_workers();}
-
- /// A constructor of @ref queue::priv.
- ///
- /// @param nb_workers the number of worker threads to have in the
- /// thread pool.
- priv(size_t nb_workers)
- : bring_workers_down(),
- num_workers(nb_workers),
- queue_cond_mutex(),
- queue_cond(),
- tasks_todo_mutex(),
- tasks_done_mutex(),
- notify(default_notify)
- {create_workers();}
-
/// A constructor of @ref queue::priv.
///
/// @param nb_workers the number of worker threads to have in the
/// @param task_done_notify a functor object that is invoked by the
/// worker thread which has performed the task, right after it's
/// added that task to the vector of the done tasks.
- priv(size_t nb_workers, task_done_notify& n)
+ priv(size_t nb_workers = get_number_of_threads(),
+ task_done_notify& n = default_notify)
: bring_workers_down(),
num_workers(nb_workers),
- queue_cond_mutex(),
- queue_cond(),
tasks_todo_mutex(),
+ tasks_todo_cond(),
tasks_done_mutex(),
+ tasks_done_cond(),
notify(n)
{create_workers();}
- /// Tests if there are more tasks to execute from the task queue.
- ///
- /// @return true iff there are more tasks to execute.
- bool
- more_tasks_to_execute() const
- {
- pthread_mutex_lock(&tasks_todo_mutex);
- bool result = !tasks_todo.empty();
- pthread_mutex_unlock(&tasks_todo_mutex);
- return result;
- }
-
/// Create the worker threads pool and have all threads sit idle,
/// waiting for a task to be added to the todo queue.
void
pthread_mutex_lock(&tasks_todo_mutex);
tasks_todo.push(t);
pthread_mutex_unlock(&tasks_todo_mutex);
-
- pthread_mutex_lock(&queue_cond_mutex);
- pthread_cond_signal(&queue_cond);
- pthread_mutex_unlock(&queue_cond_mutex);
+ pthread_cond_signal(&tasks_todo_cond);
return true;
}
if (workers.empty())
return;
- // Acquire the mutex that protects the queue condition variable
- // (queue_cond) and wake up all the workers that are sleeping on
- // the condition.
- pthread_mutex_lock(&queue_cond_mutex);
+ // Wait for the todo list to be empty to make sure all tasks got picked up
+ pthread_mutex_lock(&tasks_todo_mutex);
+ while (!tasks_todo.empty())
+ pthread_cond_wait(&tasks_done_cond, &tasks_todo_mutex);
+
+ pthread_mutex_unlock(&tasks_todo_mutex);
+
+ // Now that the task queue is empty, drain the workers by waking them up,
+ // letting them finish their final task before termination.
bring_workers_down = true;
- ABG_ASSERT(pthread_cond_broadcast(&queue_cond) == 0);
- pthread_mutex_unlock(&queue_cond_mutex);
+ ABG_ASSERT(pthread_cond_broadcast(&tasks_todo_cond) == 0);
for (std::vector<worker>::const_iterator i = workers.begin();
i != workers.end();
}; //end struct queue::priv
+// default initialize the default notifier.
+queue::task_done_notify queue::priv::default_notify;
+
/// Default constructor of the @ref queue type.
///
/// By default the queue is created with a number of worker threaders
queue::priv*
worker::wait_to_execute_a_task(queue::priv* p)
{
-
- pthread_mutex_lock(&p->queue_cond_mutex);
-
do
{
+ pthread_mutex_lock(&p->tasks_todo_mutex);
// If there is no more tasks to perform and the queue is not to
// be brought down then wait (sleep) for new tasks to come up.
- while (!p->more_tasks_to_execute() && !p->bring_workers_down)
- pthread_cond_wait(&p->queue_cond, &p->queue_cond_mutex);
+ while (p->tasks_todo.empty() && !p->bring_workers_down)
+ pthread_cond_wait(&p->tasks_todo_cond, &p->tasks_todo_mutex);
// We were woken up. So maybe there are tasks to perform? If
// so, get a task from the queue ...
task_sptr t;
- pthread_mutex_lock(&p->tasks_todo_mutex);
if (!p->tasks_todo.empty())
{
t = p->tasks_todo.front();
p->tasks_done.push_back(t);
p->notify(t);
pthread_mutex_unlock(&p->tasks_done_mutex);
+ pthread_cond_signal(&p->tasks_done_cond);
}
}
- while (!p->bring_workers_down || p->more_tasks_to_execute());
-
- pthread_mutex_unlock(&p->queue_cond_mutex);
+ while (!p->bring_workers_down);
return p;
}