Change the way threads are started and stopped
authorCyprien Noel <cyprien.noel@gmail.com>
Tue, 28 Apr 2015 21:46:20 +0000 (14:46 -0700)
committerEvan Shelhamer <shelhamer@imaginarynumber.net>
Sun, 9 Aug 2015 22:13:10 +0000 (15:13 -0700)
- Interrupt the thread before waiting on join
- Provide a method for looping threads to exit on demand
- CHECK if start and stop succeed instead of returning an error

include/caffe/internal_thread.hpp
src/caffe/internal_thread.cpp
src/caffe/layers/base_data_layer.cpp
src/caffe/test/test_internal_thread.cpp

index bcff318..be6ff7f 100644 (file)
@@ -25,12 +25,11 @@ class InternalThread {
    * Caffe's thread local state will be initialized using the current
    * thread values, e.g. device id, solver index etc. The random seed
    * is initialized using caffe_rng_rand.
-   * Will not return until the internal thread has exited.
    */
-  bool StartInternalThread();
+  void StartInternalThread();
 
   /** Will not return until the internal thread has exited. */
-  bool WaitForInternalThreadToExit();
+  void StopInternalThread();
 
   bool is_started() const;
 
@@ -39,6 +38,9 @@ class InternalThread {
       with the code you want your thread to run. */
   virtual void InternalThreadEntry() {}
 
+  /* Should be tested when running loops to exit when requested. */
+  bool must_stop();
+
  private:
   void entry(int device, Caffe::Brew mode, int rand_seed);
 
index 2be88b3..d6c2655 100644 (file)
@@ -1,4 +1,5 @@
 #include <boost/thread.hpp>
+#include <exception>
 
 #include "caffe/internal_thread.hpp"
 #include "caffe/util/math_functions.hpp"
@@ -9,18 +10,19 @@ InternalThread::~InternalThread() {
   StopInternalThread();
 }
 
-InternalThread::~InternalThread() {
-  WaitForInternalThreadToExit();
+bool InternalThread::is_started() const {
+  return thread_ && thread_->joinable();
 }
 
-bool InternalThread::is_started() const {
-  return thread_.get() != NULL && thread_->joinable();
+bool InternalThread::must_stop() {
+  return thread_ && thread_->interruption_requested();
 }
 
-bool InternalThread::StartInternalThread() {
-  if (!WaitForInternalThreadToExit()) {
-    return false;
-  }
+void InternalThread::StartInternalThread() {
+  // TODO switch to failing once Caffe prefetch thread is persistent.
+  // Threads should not be started and stopped repeatedly.
+  // CHECK(!is_started());
+  StopInternalThread();
 
   int device = 0;
 #ifndef CPU_ONLY
@@ -32,10 +34,9 @@ bool InternalThread::StartInternalThread() {
   try {
     thread_.reset(new boost::thread(&InternalThread::entry, this, device, mode,
           rand_seed));
-  } catch (...) {
-    return false;
+  } catch (std::exception& e) {
+    LOG(FATAL) << "Thread exception: " << e.what();
   }
-  return true;
 }
 
 void InternalThread::entry(int device, Caffe::Brew mode, int rand_seed) {
@@ -48,16 +49,16 @@ void InternalThread::entry(int device, Caffe::Brew mode, int rand_seed) {
   InternalThreadEntry();
 }
 
-/** Will not return until the internal thread has exited. */
-bool InternalThread::WaitForInternalThreadToExit() {
+void InternalThread::StopInternalThread() {
   if (is_started()) {
+    thread_->interrupt();
     try {
       thread_->join();
-    } catch (...) {
-      return false;
+    } catch (boost::thread_interrupted&) {
+    } catch (std::exception& e) {
+      LOG(FATAL) << "Thread exception: " << e.what();
     }
   }
-  return true;
 }
 
 }  // namespace caffe
index 26a1118..facaed7 100644 (file)
@@ -47,12 +47,12 @@ void BasePrefetchingDataLayer<Dtype>::LayerSetUp(
 template <typename Dtype>
 void BasePrefetchingDataLayer<Dtype>::CreatePrefetchThread() {
   this->data_transformer_->InitRand();
-  CHECK(StartInternalThread()) << "Thread execution failed";
+  StartInternalThread();
 }
 
 template <typename Dtype>
 void BasePrefetchingDataLayer<Dtype>::JoinPrefetchThread() {
-  CHECK(WaitForInternalThreadToExit()) << "Thread joining failed";
+  StopInternalThread();
 }
 
 template <typename Dtype>
index 390c8ed..93f1cc5 100644 (file)
@@ -14,9 +14,9 @@ class InternalThreadTest : public ::testing::Test {};
 TEST_F(InternalThreadTest, TestStartAndExit) {
   InternalThread thread;
   EXPECT_FALSE(thread.is_started());
-  EXPECT_TRUE(thread.StartInternalThread());
+  thread.StartInternalThread();
   EXPECT_TRUE(thread.is_started());
-  EXPECT_TRUE(thread.WaitForInternalThreadToExit());
+  thread.StopInternalThread();
   EXPECT_FALSE(thread.is_started());
 }
 
@@ -35,18 +35,18 @@ class TestThreadB : public InternalThread {
 TEST_F(InternalThreadTest, TestRandomSeed) {
   TestThreadA t1;
   Caffe::set_random_seed(9658361);
-  EXPECT_TRUE(t1.StartInternalThread());
-  EXPECT_TRUE(t1.WaitForInternalThreadToExit());
+  t1.StartInternalThread();
+  t1.StopInternalThread();
 
   TestThreadA t2;
   Caffe::set_random_seed(9658361);
-  EXPECT_TRUE(t2.StartInternalThread());
-  EXPECT_TRUE(t2.WaitForInternalThreadToExit());
+  t2.StartInternalThread();
+  t2.StopInternalThread();
 
   TestThreadB t3;
   Caffe::set_random_seed(3435563);
-  EXPECT_TRUE(t3.StartInternalThread());
-  EXPECT_TRUE(t3.WaitForInternalThreadToExit());
+  t3.StartInternalThread();
+  t3.StopInternalThread();
 }
 
 }  // namespace caffe