* Caffe's thread local state will be initialized using the current
* thread values, e.g. device id, solver index etc. The random seed
* is initialized using caffe_rng_rand.
- * Will not return until the internal thread has exited.
*/
- bool StartInternalThread();
+ void StartInternalThread();
/** Will not return until the internal thread has exited. */
- bool WaitForInternalThreadToExit();
+ void StopInternalThread();
bool is_started() const;
with the code you want your thread to run. */
virtual void InternalThreadEntry() {}
+ /* Should be tested when running loops to exit when requested. */
+ bool must_stop();
+
private:
void entry(int device, Caffe::Brew mode, int rand_seed);
#include <boost/thread.hpp>
+#include <exception>
#include "caffe/internal_thread.hpp"
#include "caffe/util/math_functions.hpp"
StopInternalThread();
}
-InternalThread::~InternalThread() {
- WaitForInternalThreadToExit();
+bool InternalThread::is_started() const {
+ return thread_ && thread_->joinable();
}
-bool InternalThread::is_started() const {
- return thread_.get() != NULL && thread_->joinable();
+bool InternalThread::must_stop() {
+ return thread_ && thread_->interruption_requested();
}
-bool InternalThread::StartInternalThread() {
- if (!WaitForInternalThreadToExit()) {
- return false;
- }
+void InternalThread::StartInternalThread() {
+ // TODO switch to failing once Caffe prefetch thread is persistent.
+ // Threads should not be started and stopped repeatedly.
+ // CHECK(!is_started());
+ StopInternalThread();
int device = 0;
#ifndef CPU_ONLY
try {
thread_.reset(new boost::thread(&InternalThread::entry, this, device, mode,
rand_seed));
- } catch (...) {
- return false;
+ } catch (std::exception& e) {
+ LOG(FATAL) << "Thread exception: " << e.what();
}
- return true;
}
void InternalThread::entry(int device, Caffe::Brew mode, int rand_seed) {
InternalThreadEntry();
}
-/** Will not return until the internal thread has exited. */
-bool InternalThread::WaitForInternalThreadToExit() {
+void InternalThread::StopInternalThread() {
if (is_started()) {
+ thread_->interrupt();
try {
thread_->join();
- } catch (...) {
- return false;
+ } catch (boost::thread_interrupted&) {
+ } catch (std::exception& e) {
+ LOG(FATAL) << "Thread exception: " << e.what();
}
}
- return true;
}
} // namespace caffe
template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::CreatePrefetchThread() {
this->data_transformer_->InitRand();
- CHECK(StartInternalThread()) << "Thread execution failed";
+ StartInternalThread();
}
template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::JoinPrefetchThread() {
- CHECK(WaitForInternalThreadToExit()) << "Thread joining failed";
+ StopInternalThread();
}
template <typename Dtype>
TEST_F(InternalThreadTest, TestStartAndExit) {
InternalThread thread;
EXPECT_FALSE(thread.is_started());
- EXPECT_TRUE(thread.StartInternalThread());
+ thread.StartInternalThread();
EXPECT_TRUE(thread.is_started());
- EXPECT_TRUE(thread.WaitForInternalThreadToExit());
+ thread.StopInternalThread();
EXPECT_FALSE(thread.is_started());
}
TEST_F(InternalThreadTest, TestRandomSeed) {
TestThreadA t1;
Caffe::set_random_seed(9658361);
- EXPECT_TRUE(t1.StartInternalThread());
- EXPECT_TRUE(t1.WaitForInternalThreadToExit());
+ t1.StartInternalThread();
+ t1.StopInternalThread();
TestThreadA t2;
Caffe::set_random_seed(9658361);
- EXPECT_TRUE(t2.StartInternalThread());
- EXPECT_TRUE(t2.WaitForInternalThreadToExit());
+ t2.StartInternalThread();
+ t2.StopInternalThread();
TestThreadB t3;
Caffe::set_random_seed(3435563);
- EXPECT_TRUE(t3.StartInternalThread());
- EXPECT_TRUE(t3.WaitForInternalThreadToExit());
+ t3.StartInternalThread();
+ t3.StopInternalThread();
}
} // namespace caffe