std::vector<task_sptr> tasks_done;
// This functor is invoked to notify the user of this queue that a
// task has been completed and has been added to the done tasks
- // vector.
- task_done_notify notify;
+ // vector. We call it a notifier. This notifier is the default
+ // notifier of the work queue; the one that is used when the user
+ // has specified no notifier. It basically does nothing.
+ task_done_notify default_notify;
+ // This is a reference to the the notifier that is actually used in
+ // the queue. It's either the one specified by the user or the
+ // default one.
+ task_done_notify& notify;
// A vector of the worker threads.
std::vector<worker> workers;
queue_cond_mutex(),
queue_cond(),
tasks_todo_mutex(),
- tasks_done_mutex()
+ tasks_done_mutex(),
+ notify(default_notify)
{create_workers();}
/// A constructor of @ref queue::priv.
queue_cond_mutex(),
queue_cond(),
tasks_todo_mutex(),
- tasks_done_mutex()
+ tasks_done_mutex(),
+ notify(default_notify)
{create_workers();}
/// A constructor of @ref queue::priv.
/// @param task_done_notify a functor object that is invoked by the
/// worker thread which has performed the task, right after it's
/// added that task to the vector of the done tasks.
- priv(size_t nb_workers, const task_done_notify& n)
+ priv(size_t nb_workers, task_done_notify& n)
: bring_workers_down(),
num_workers(nb_workers),
queue_cond_mutex(),
/// performing the task. When it's done with the task, it goes back
/// to be suspended, waiting for a new task to be scheduled.
///
- /// @param t the task to schedule.
+ /// @param t the task to schedule. Note that a nil task won't be
+ /// scheduled. If the queue is empty, the task @p t won't be
+ /// scheduled either.
+ ///
+ /// @return true iff the task @p t was successfully scheduled.
bool
schedule_task(const task_sptr& t)
{
- if (workers.empty())
+ if (workers.empty() || !t)
return false;
pthread_mutex_lock(&tasks_todo_mutex);
return true;
}
- /// Signal all the threads (of the pool) which are suspended, so
- /// that they wakes up. If there is no task to perform, they just
- /// end their execution. If there are tasks to perform, they finish
- /// them and then end their execution.
+ /// Submit a vector of task to the queue of tasks to be performed.
+ ///
+ /// This wakes up threads of the pool which immediatly start
+ /// performing the tasks. When they are done with the task, they go
+ /// back to be suspended, waiting for new tasks to be scheduled.
+ ///
+ /// @param tasks the tasks to schedule.
+ bool
+ schedule_tasks(const tasks_type& tasks)
+ {
+ bool is_ok= true;
+ for (tasks_type::const_iterator t = tasks.begin(); t != tasks.end(); ++t)
+ is_ok &= schedule_task(*t);
+ return is_ok;
+ }
+
+ /// Signal all the threads (of the pool) which are suspended and
+ /// waiting to perform a task, so that they wake up and end up their
+ /// execution. If there is no task to perform, they just end their
+ /// execution. If there are tasks to perform, they finish them and
+ /// then end their execution.
///
/// This function then joins all the tasks of the pool, waiting for
/// them to finish, and then it returns. In other words, this
/// function suspends the thread of the caller, waiting for the
/// worker threads to finish their tasks, and end their execution.
///
- /// If the user wants to work with the thread pool again, she'll
- /// need to create them again, using the member function
+ /// If the user code wants to work with the thread pool again,
+ /// she'll need to create them again, using the member function
/// create_workers().
void
do_bring_workers_down()
if (workers.empty())
return;
- bring_workers_down = true;
-
pthread_mutex_lock(&tasks_todo_mutex);
- if (tasks_todo.empty())
- assert(pthread_cond_broadcast(&queue_cond) == 0);
+ bring_workers_down = true;
pthread_mutex_unlock(&tasks_todo_mutex);
+ // Acquire the mutex that protects the queue condition variable
+ // (queue_cond) and wake up all the workers that are sleeping on
+ // the condition.
+ pthread_mutex_lock(&queue_cond_mutex);
+ assert(pthread_cond_broadcast(&queue_cond) == 0);
+ pthread_mutex_unlock(&queue_cond_mutex);
+
for (std::vector<worker>::const_iterator i = workers.begin();
i != workers.end();
++i)
/// @param number_of_workers the number of worker threads to have in
/// the pool.
///
-/// @param the notifier to invoker when a task is performed. Users
-/// should create a type that inherit this @ref task_done_notify class
-/// and overload its virtual task_done_notify::operator() operator
-/// function.
+/// @param the notifier to invoke when a task is done doing its job.
+/// Users should create a type that inherit this @ref task_done_notify
+/// class and overload its virtual task_done_notify::operator()
+/// operator function. Note that the code of that
+/// task_done_notify::operator() is assured to run in *sequence*, with
+/// respect to the code of other task_done_notify::operator() from
+/// other tasks.
queue::queue(unsigned number_of_workers,
- const task_done_notify& notifier)
+ task_done_notify& notifier)
: p_(new priv(number_of_workers, notifier))
{}
/// performing the task. When it's done with the task, it goes back
/// to be suspended, waiting for a new task to be scheduled.
///
-/// @param t the task to schedule.
+/// @param t the task to schedule. Note that if the queue is empty or
+/// if the task is nil, the task is not scheduled.
+///
+/// @return true iff the task was successfully scheduled.
bool
queue::schedule_task(const task_sptr& t)
{return p_->schedule_task(t);}
+/// Submit a vector of tasks to the queue of tasks to be performed.
+///
+/// This wakes up one or more threads from the pool which immediatly
+/// start performing the tasks. When the threads are done with the
+/// tasks, they goes back to be suspended, waiting for a new task to
+/// be scheduled.
+///
+/// @param tasks the tasks to schedule.
+bool
+queue::schedule_tasks(const tasks_type& tasks)
+{return p_->schedule_tasks(tasks);}
+
/// Suspends the current thread until all worker threads finish
/// performing the tasks they are executing.
///
/// If the worker threads were suspended waiting for a new task to
/// perform, they are woken up and their execution ends.
///
-/// The execution of the current thread is resume when all the threads
-/// of the pool have finished their execution and are terminated.
+/// The execution of the current thread is resumed when all the
+/// threads of the pool have finished their execution and are
+/// terminated.
void
queue::wait_for_workers_to_complete()
{p_->do_bring_workers_down();}
/// Getter of the vector of tasks that got performed.
///
-/// @retun the vector of tasks that got performed.
-const std::vector<task_sptr>&
+/// @return the vector of tasks that got performed.
+std::vector<task_sptr>&
queue::get_completed_tasks() const
{return p_->tasks_done;}
do
{
+ // If there is no more tasks to perform and the queue is not to
+ // be brought down then wait (sleep) for new tasks to come up.
pthread_mutex_lock(&p->queue_cond_mutex);
while (!more_tasks && !p->bring_workers_down)
{
}
pthread_mutex_unlock(&p->queue_cond_mutex);
+ // We were woken up. So maybe there are tasks to perform? If
+ // so, get a task from the queue ...
task_sptr t;
pthread_mutex_lock(&p->tasks_todo_mutex);
if (!p->tasks_todo.empty())
}
pthread_mutex_unlock(&p->tasks_todo_mutex);
+ // If we've got a task to perform then perform it and when it's
+ // done then add to the set of tasks that are done.
if (t)
{
t->perform();
+
+ // Add the task to the vector of tasks that are done and
+ // notify listeners about the fact that the task is done.
+ //
+ // Note that this (including the notification) is not
+ // happening in parallel. So the code performed by the
+ // notifier during the notification is running sequentially,
+ // not in parallel with any other task that was just done
+ // and that is notifying its listeners.
pthread_mutex_lock(&p->tasks_done_mutex);
p->tasks_done.push_back(t);
- pthread_mutex_unlock(&p->tasks_done_mutex);
p->notify(t);
+ pthread_mutex_unlock(&p->tasks_done_mutex);
}
pthread_mutex_lock(&p->tasks_todo_mutex);