Fix a few instances of notifying on a CV while holding the lock (#18857)
authorOwen Anderson <owen.anderson@oculus.com>
Fri, 5 Apr 2019 15:34:41 +0000 (08:34 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Fri, 5 Apr 2019 15:41:53 +0000 (08:41 -0700)
Summary:
Fix a few instances of notifying on a CV while holding the lock to release the lock before notifying.  This avoids an extra thread suspension when the notified thread tries to grab the lock.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/18857

Differential Revision: D14779132

Pulled By: resistor

fbshipit-source-id: b18a05c4c15be1426ebfdffac1c8f002b771cfd7

torch/lib/c10d/ProcessGroup.cpp
torch/lib/c10d/ProcessGroupGloo.cpp
torch/lib/c10d/ProcessGroupMPI.cpp

index c40f5e4..353ef88 100644 (file)
@@ -29,9 +29,7 @@ void ProcessGroup::Work::synchronize() {}
 
 void ProcessGroup::Work::wait() {
   std::unique_lock<std::mutex> lock(mutex_);
-  while (!completed_) {
-    cv_.wait(lock);
-  }
+  cv_.wait(lock, [&] { return completed_; });
   if (exception_) {
     std::rethrow_exception(exception_);
   }
@@ -39,9 +37,10 @@ void ProcessGroup::Work::wait() {
 }
 
 void ProcessGroup::Work::finish(std::exception_ptr exception) {
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::unique_lock<std::mutex> lock(mutex_);
   completed_ = true;
   exception_ = exception;
+  lock.unlock();
   cv_.notify_all();
 }
 
index f0251b4..9a34c80 100644 (file)
@@ -290,17 +290,16 @@ ProcessGroupGloo::ProcessGroupGloo(
 
 ProcessGroupGloo::~ProcessGroupGloo() {
   std::unique_lock<std::mutex> lock(workMutex_);
-  while (!workQueue_.empty()) {
-    workConsumeCV_.wait(lock);
-  }
+  workConsumeCV_.wait(lock, [&] { return workQueue_.empty(); });
 
   // Queue is empty, signal stop
   stop_ = true;
 
   // Release lock to allow threads to terminate
-  workProduceCV_.notify_all();
   lock.unlock();
 
+  workProduceCV_.notify_all();
+
   // Wait for worker threads to terminate
   for (auto& thread : threads_) {
     thread.join();
@@ -322,10 +321,13 @@ void ProcessGroupGloo::runLoop(int workerIndex) {
 
     auto work = std::move(workQueue_.front());
     workQueue_.pop_front();
-    workConsumeCV_.notify_one();
-
     workInProgress_[workerIndex] = work;
     lock.unlock();
+
+    // Notify after releasing the lock so that the waiter
+    // does not immediately block.
+    workConsumeCV_.notify_one();
+
     AsyncWork::execute(std::move(work));
     lock.lock();
     workInProgress_[workerIndex] = nullptr;
@@ -335,6 +337,10 @@ void ProcessGroupGloo::runLoop(int workerIndex) {
 void ProcessGroupGloo::enqueue(std::shared_ptr<AsyncWork> work) {
   std::unique_lock<std::mutex> lock(workMutex_);
   workQueue_.push_back(std::move(work));
+  lock.unlock();
+
+  // Notify after releasing the lock so that the waiter
+  // does not immediately block.
   workProduceCV_.notify_one();
 }
 
index c75955a..0c8b3d1 100644 (file)
@@ -273,17 +273,14 @@ ProcessGroupMPI::~ProcessGroupMPI() {
 
 void ProcessGroupMPI::destroy() {
   std::unique_lock<std::mutex> lock(pgMutex_);
+  queueConsumeCV_.wait(lock, [&] { return queue_.empty(); });
 
-  while (!queue_.empty()) {
-    queueConsumeCV_.wait(lock);
-  }
   // Queue is empty, signal stop
   stop_ = true;
 
   // Release lock to allow threads to terminate
-  queueProduceCV_.notify_all();
-
   lock.unlock();
+  queueProduceCV_.notify_all();
 
   // Join the single worker thread
   workerThread_.join();
@@ -310,12 +307,12 @@ void ProcessGroupMPI::runLoop() {
     auto workTuple = std::move(queue_.front());
 
     queue_.pop_front();
-    queueConsumeCV_.notify_one();
 
     auto& workEntry = std::get<0>(workTuple);
     auto& work = std::get<1>(workTuple);
 
     lock.unlock();
+    queueConsumeCV_.notify_one();
 
     try {
       workEntry->run(workEntry);
@@ -333,6 +330,7 @@ std::shared_ptr<ProcessGroup::Work> ProcessGroupMPI::enqueue(
   auto work = std::make_shared<WorkMPI>();
   std::unique_lock<std::mutex> lock(pgMutex_);
   queue_.push_back(std::make_tuple(std::move(entry), work));
+  lock.unlock();
   queueProduceCV_.notify_one();
   return work;
 }