From 79ac2120ba7bbc6a14ea9464af3530bc6fc0c8ec Mon Sep 17 00:00:00 2001 From: Owen Anderson Date: Fri, 5 Apr 2019 08:34:41 -0700 Subject: [PATCH] Fix a few instances of notifying on a CV while holding the lock (#18857) Summary: Fix a few instances of notifying on a CV while holding the lock to release the lock before notifying. This avoids an extra thread suspension when the notified thread tries to grab the lock. Pull Request resolved: https://github.com/pytorch/pytorch/pull/18857 Differential Revision: D14779132 Pulled By: resistor fbshipit-source-id: b18a05c4c15be1426ebfdffac1c8f002b771cfd7 --- torch/lib/c10d/ProcessGroup.cpp | 7 +++---- torch/lib/c10d/ProcessGroupGloo.cpp | 18 ++++++++++++------ torch/lib/c10d/ProcessGroupMPI.cpp | 10 ++++------ 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/torch/lib/c10d/ProcessGroup.cpp b/torch/lib/c10d/ProcessGroup.cpp index c40f5e4..353ef88 100644 --- a/torch/lib/c10d/ProcessGroup.cpp +++ b/torch/lib/c10d/ProcessGroup.cpp @@ -29,9 +29,7 @@ void ProcessGroup::Work::synchronize() {} void ProcessGroup::Work::wait() { std::unique_lock lock(mutex_); - while (!completed_) { - cv_.wait(lock); - } + cv_.wait(lock, [&] { return completed_; }); if (exception_) { std::rethrow_exception(exception_); } @@ -39,9 +37,10 @@ void ProcessGroup::Work::wait() { } void ProcessGroup::Work::finish(std::exception_ptr exception) { - std::lock_guard lock(mutex_); + std::unique_lock lock(mutex_); completed_ = true; exception_ = exception; + lock.unlock(); cv_.notify_all(); } diff --git a/torch/lib/c10d/ProcessGroupGloo.cpp b/torch/lib/c10d/ProcessGroupGloo.cpp index f0251b4..9a34c80 100644 --- a/torch/lib/c10d/ProcessGroupGloo.cpp +++ b/torch/lib/c10d/ProcessGroupGloo.cpp @@ -290,17 +290,16 @@ ProcessGroupGloo::ProcessGroupGloo( ProcessGroupGloo::~ProcessGroupGloo() { std::unique_lock lock(workMutex_); - while (!workQueue_.empty()) { - workConsumeCV_.wait(lock); - } + workConsumeCV_.wait(lock, [&] { return workQueue_.empty(); }); // Queue is empty, signal stop stop_ = true; // Release lock to allow threads to terminate - workProduceCV_.notify_all(); lock.unlock(); + workProduceCV_.notify_all(); + // Wait for worker threads to terminate for (auto& thread : threads_) { thread.join(); @@ -322,10 +321,13 @@ void ProcessGroupGloo::runLoop(int workerIndex) { auto work = std::move(workQueue_.front()); workQueue_.pop_front(); - workConsumeCV_.notify_one(); - workInProgress_[workerIndex] = work; lock.unlock(); + + // Notify after releasing the lock so that the waiter + // does not immediately block. + workConsumeCV_.notify_one(); + AsyncWork::execute(std::move(work)); lock.lock(); workInProgress_[workerIndex] = nullptr; @@ -335,6 +337,10 @@ void ProcessGroupGloo::runLoop(int workerIndex) { void ProcessGroupGloo::enqueue(std::shared_ptr work) { std::unique_lock lock(workMutex_); workQueue_.push_back(std::move(work)); + lock.unlock(); + + // Notify after releasing the lock so that the waiter + // does not immediately block. workProduceCV_.notify_one(); } diff --git a/torch/lib/c10d/ProcessGroupMPI.cpp b/torch/lib/c10d/ProcessGroupMPI.cpp index c75955a..0c8b3d1 100644 --- a/torch/lib/c10d/ProcessGroupMPI.cpp +++ b/torch/lib/c10d/ProcessGroupMPI.cpp @@ -273,17 +273,14 @@ ProcessGroupMPI::~ProcessGroupMPI() { void ProcessGroupMPI::destroy() { std::unique_lock lock(pgMutex_); + queueConsumeCV_.wait(lock, [&] { return queue_.empty(); }); - while (!queue_.empty()) { - queueConsumeCV_.wait(lock); - } // Queue is empty, signal stop stop_ = true; // Release lock to allow threads to terminate - queueProduceCV_.notify_all(); - lock.unlock(); + queueProduceCV_.notify_all(); // Join the single worker thread workerThread_.join(); @@ -310,12 +307,12 @@ void ProcessGroupMPI::runLoop() { auto workTuple = std::move(queue_.front()); queue_.pop_front(); - queueConsumeCV_.notify_one(); auto& workEntry = std::get<0>(workTuple); auto& work = std::get<1>(workTuple); lock.unlock(); + queueConsumeCV_.notify_one(); try { workEntry->run(workEntry); @@ -333,6 +330,7 @@ std::shared_ptr ProcessGroupMPI::enqueue( auto work = std::make_shared(); std::unique_lock lock(pgMutex_); queue_.push_back(std::make_tuple(std::move(entry), work)); + lock.unlock(); queueProduceCV_.notify_one(); return work; } -- 2.7.4