void ProcessGroup::Work::wait() {
std::unique_lock<std::mutex> lock(mutex_);
- while (!completed_) {
- cv_.wait(lock);
- }
+ cv_.wait(lock, [&] { return completed_; });
if (exception_) {
std::rethrow_exception(exception_);
}
}
void ProcessGroup::Work::finish(std::exception_ptr exception) {
- std::lock_guard<std::mutex> lock(mutex_);
+ std::unique_lock<std::mutex> lock(mutex_);
completed_ = true;
exception_ = exception;
+ lock.unlock();
cv_.notify_all();
}
ProcessGroupGloo::~ProcessGroupGloo() {
std::unique_lock<std::mutex> lock(workMutex_);
- while (!workQueue_.empty()) {
- workConsumeCV_.wait(lock);
- }
+ workConsumeCV_.wait(lock, [&] { return workQueue_.empty(); });
// Queue is empty, signal stop
stop_ = true;
// Release lock to allow threads to terminate
- workProduceCV_.notify_all();
lock.unlock();
+ workProduceCV_.notify_all();
+
// Wait for worker threads to terminate
for (auto& thread : threads_) {
thread.join();
auto work = std::move(workQueue_.front());
workQueue_.pop_front();
- workConsumeCV_.notify_one();
-
workInProgress_[workerIndex] = work;
lock.unlock();
+
+ // Notify after releasing the lock so that the waiter
+ // does not immediately block.
+ workConsumeCV_.notify_one();
+
AsyncWork::execute(std::move(work));
lock.lock();
workInProgress_[workerIndex] = nullptr;
void ProcessGroupGloo::enqueue(std::shared_ptr<AsyncWork> work) {
std::unique_lock<std::mutex> lock(workMutex_);
workQueue_.push_back(std::move(work));
+ lock.unlock();
+
+ // Notify after releasing the lock so that the waiter
+ // does not immediately block.
workProduceCV_.notify_one();
}
void ProcessGroupMPI::destroy() {
std::unique_lock<std::mutex> lock(pgMutex_);
+ queueConsumeCV_.wait(lock, [&] { return queue_.empty(); });
- while (!queue_.empty()) {
- queueConsumeCV_.wait(lock);
- }
// Queue is empty, signal stop
stop_ = true;
// Release lock to allow threads to terminate
- queueProduceCV_.notify_all();
-
lock.unlock();
+ queueProduceCV_.notify_all();
// Join the single worker thread
workerThread_.join();
auto workTuple = std::move(queue_.front());
queue_.pop_front();
- queueConsumeCV_.notify_one();
auto& workEntry = std::get<0>(workTuple);
auto& work = std::get<1>(workTuple);
lock.unlock();
+ queueConsumeCV_.notify_one();
try {
workEntry->run(workEntry);
auto work = std::make_shared<WorkMPI>();
std::unique_lock<std::mutex> lock(pgMutex_);
queue_.push_back(std::make_tuple(std::move(entry), work));
+ lock.unlock();
queueProduceCV_.notify_one();
return work;
}