From 57d7ecbc464c36cbf635ebb8bc510b003f7944f1 Mon Sep 17 00:00:00 2001 From: TANGUY Arnaud Date: Wed, 16 Jul 2014 15:20:15 +0200 Subject: [PATCH] Cleanup pthread code for data layers Get rid of the ugly "friend" thread function. Simplify the creation of the thread by inheriting from a base class. Thus, defining the thread is as simple as overriding a virtual function. --- include/caffe/data_layers.hpp | 36 ++++----------- include/caffe/internal_thread.hpp | 46 +++++++++++++++++++ src/caffe/layers/data_layer.cpp | 83 ++++++++++++++++------------------ src/caffe/layers/image_data_layer.cpp | 60 +++++++++++------------- src/caffe/layers/window_data_layer.cpp | 56 ++++++++++------------- src/caffe/test/test_power_layer.cpp | 2 - 6 files changed, 144 insertions(+), 139 deletions(-) create mode 100644 include/caffe/internal_thread.hpp diff --git a/include/caffe/data_layers.hpp b/include/caffe/data_layers.hpp index 1bb6bce..107d780 100644 --- a/include/caffe/data_layers.hpp +++ b/include/caffe/data_layers.hpp @@ -9,7 +9,6 @@ #include "leveldb/db.h" #include "lmdb.h" -#include "pthread.h" #include "hdf5.h" #include "boost/scoped_ptr.hpp" @@ -18,6 +17,7 @@ #include "caffe/filler.hpp" #include "caffe/layer.hpp" #include "caffe/proto/caffe.pb.h" +#include "caffe/internal_thread.hpp" namespace caffe { @@ -27,15 +27,8 @@ namespace caffe { // TODO: DataLayer, ImageDataLayer, and WindowDataLayer all have the // same basic structure and a lot of duplicated code. -// This function is used to create a pthread that prefetches the data. template -void* DataLayerPrefetch(void* layer_pointer); - -template -class DataLayer : public Layer { - // The function used to perform prefetching. - friend void* DataLayerPrefetch(void* layer_pointer); - +class DataLayer : public Layer, public InternalThread { public: explicit DataLayer(const LayerParameter& param) : Layer(param) {} @@ -63,6 +56,8 @@ class DataLayer : public Layer { virtual void CreatePrefetchThread(); virtual void JoinPrefetchThread(); virtual unsigned int PrefetchRand(); + // The thread's function + virtual void InternalThreadEntry(); shared_ptr prefetch_rng_; @@ -80,7 +75,6 @@ class DataLayer : public Layer { int datum_height_; int datum_width_; int datum_size_; - pthread_t thread_; Blob prefetch_data_; Blob prefetch_label_; Blob data_mean_; @@ -182,15 +176,8 @@ class HDF5OutputLayer : public Layer { Blob label_blob_; }; -// This function is used to create a pthread that prefetches the data. -template -void* ImageDataLayerPrefetch(void* layer_pointer); - template -class ImageDataLayer : public Layer { - // The function used to perform prefetching. - friend void* ImageDataLayerPrefetch(void* layer_pointer); - +class ImageDataLayer : public Layer, public InternalThread { public: explicit ImageDataLayer(const LayerParameter& param) : Layer(param) {} @@ -219,6 +206,7 @@ class ImageDataLayer : public Layer { virtual void CreatePrefetchThread(); virtual void JoinPrefetchThread(); virtual unsigned int PrefetchRand(); + virtual void InternalThreadEntry(); shared_ptr prefetch_rng_; vector > lines_; @@ -227,7 +215,6 @@ class ImageDataLayer : public Layer { int datum_height_; int datum_width_; int datum_size_; - pthread_t thread_; Blob prefetch_data_; Blob prefetch_label_; Blob data_mean_; @@ -277,15 +264,8 @@ class MemoryDataLayer : public Layer { int pos_; }; -// This function is used to create a pthread that prefetches the window data. -template -void* WindowDataLayerPrefetch(void* layer_pointer); - template -class WindowDataLayer : public Layer { - // The function used to perform prefetching. - friend void* WindowDataLayerPrefetch(void* layer_pointer); - +class WindowDataLayer : public Layer, public InternalThread { public: explicit WindowDataLayer(const LayerParameter& param) : Layer(param) {} @@ -312,9 +292,9 @@ class WindowDataLayer : public Layer { virtual void CreatePrefetchThread(); virtual void JoinPrefetchThread(); virtual unsigned int PrefetchRand(); + virtual void InternalThreadEntry(); shared_ptr prefetch_rng_; - pthread_t thread_; Blob prefetch_data_; Blob prefetch_label_; Blob data_mean_; diff --git a/include/caffe/internal_thread.hpp b/include/caffe/internal_thread.hpp new file mode 100644 index 0000000..46f6736 --- /dev/null +++ b/include/caffe/internal_thread.hpp @@ -0,0 +1,46 @@ +// Copyright 2014 BVLC and contributors. + +#ifndef CAFFE_INTERNAL_THREAD_HPP_ +#define CAFFE_INTERNAL_THREAD_HPP_ + +#include + +namespace caffe { + +/** + * Virutal class encapsulate pthread for use in base class + * The child class will acquire the ability to run a single pthread, + * by reimplementing the virutal function InternalThreadEntry. + */ +class InternalThread { + public: + InternalThread() {} + virtual ~InternalThread() {} + + /** Returns true if the thread was successfully started **/ + bool StartInternalThread() { + return pthread_create(&_thread, NULL, InternalThreadEntryFunc, this); + } + + /** Will not return until the internal thread has exited. */ + bool WaitForInternalThreadToExit() { + return pthread_join(_thread, NULL); + } + + protected: + /* Implement this method in your subclass + with the code you want your thread to run. */ + virtual void InternalThreadEntry() = 0; + + private: + static void * InternalThreadEntryFunc(void * This) { + reinterpret_cast(This)->InternalThreadEntry(); + return NULL; + } + + pthread_t _thread; +}; + +} // namespace caffe + +#endif diff --git a/src/caffe/layers/data_layer.cpp b/src/caffe/layers/data_layer.cpp index b6afaa9..ef27c1a 100644 --- a/src/caffe/layers/data_layer.cpp +++ b/src/caffe/layers/data_layer.cpp @@ -2,7 +2,6 @@ #include #include -#include #include #include @@ -16,46 +15,44 @@ namespace caffe { +// This function is used to create a pthread that prefetches the data. template -void* DataLayerPrefetch(void* layer_pointer) { - CHECK(layer_pointer); - DataLayer* layer = static_cast*>(layer_pointer); - CHECK(layer); +void DataLayer::InternalThreadEntry() { Datum datum; - CHECK(layer->prefetch_data_.count()); - Dtype* top_data = layer->prefetch_data_.mutable_cpu_data(); + CHECK(prefetch_data_.count()); + Dtype* top_data = prefetch_data_.mutable_cpu_data(); Dtype* top_label = NULL; // suppress warnings about uninitialized variables - if (layer->output_labels_) { - top_label = layer->prefetch_label_.mutable_cpu_data(); + if (output_labels_) { + top_label = prefetch_label_.mutable_cpu_data(); } - const Dtype scale = layer->layer_param_.data_param().scale(); - const int batch_size = layer->layer_param_.data_param().batch_size(); - const int crop_size = layer->layer_param_.data_param().crop_size(); - const bool mirror = layer->layer_param_.data_param().mirror(); + const Dtype scale = this->layer_param_.data_param().scale(); + const int batch_size = this->layer_param_.data_param().batch_size(); + const int crop_size = this->layer_param_.data_param().crop_size(); + const bool mirror = this->layer_param_.data_param().mirror(); if (mirror && crop_size == 0) { LOG(FATAL) << "Current implementation requires mirror and crop_size to be " << "set at the same time."; } // datum scales - const int channels = layer->datum_channels_; - const int height = layer->datum_height_; - const int width = layer->datum_width_; - const int size = layer->datum_size_; - const Dtype* mean = layer->data_mean_.cpu_data(); + const int channels = datum_channels_; + const int height = datum_height_; + const int width = datum_width_; + const int size = datum_size_; + const Dtype* mean = data_mean_.cpu_data(); for (int item_id = 0; item_id < batch_size; ++item_id) { // get a blob - switch (layer->layer_param_.data_param().backend()) { + switch (this->layer_param_.data_param().backend()) { case DataParameter_DB_LEVELDB: - CHECK(layer->iter_); - CHECK(layer->iter_->Valid()); - datum.ParseFromString(layer->iter_->value().ToString()); + CHECK(iter_); + CHECK(iter_->Valid()); + datum.ParseFromString(iter_->value().ToString()); break; case DataParameter_DB_LMDB: - CHECK_EQ(mdb_cursor_get(layer->mdb_cursor_, &layer->mdb_key_, - &layer->mdb_value_, MDB_GET_CURRENT), MDB_SUCCESS); - datum.ParseFromArray(layer->mdb_value_.mv_data, - layer->mdb_value_.mv_size); + CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_, + &mdb_value_, MDB_GET_CURRENT), MDB_SUCCESS); + datum.ParseFromArray(mdb_value_.mv_data, + mdb_value_.mv_size); break; default: LOG(FATAL) << "Unknown database backend"; @@ -66,14 +63,14 @@ void* DataLayerPrefetch(void* layer_pointer) { CHECK(data.size()) << "Image cropping only support uint8 data"; int h_off, w_off; // We only do random crop when we do training. - if (layer->phase_ == Caffe::TRAIN) { - h_off = layer->PrefetchRand() % (height - crop_size); - w_off = layer->PrefetchRand() % (width - crop_size); + if (phase_ == Caffe::TRAIN) { + h_off = PrefetchRand() % (height - crop_size); + w_off = PrefetchRand() % (width - crop_size); } else { h_off = (height - crop_size) / 2; w_off = (width - crop_size) / 2; } - if (mirror && layer->PrefetchRand() % 2) { + if (mirror && PrefetchRand() % 2) { // Copy mirrored version for (int c = 0; c < channels; ++c) { for (int h = 0; h < crop_size; ++h) { @@ -118,34 +115,32 @@ void* DataLayerPrefetch(void* layer_pointer) { } } - if (layer->output_labels_) { + if (output_labels_) { top_label[item_id] = datum.label(); } // go to the next iter - switch (layer->layer_param_.data_param().backend()) { + switch (this->layer_param_.data_param().backend()) { case DataParameter_DB_LEVELDB: - layer->iter_->Next(); - if (!layer->iter_->Valid()) { + iter_->Next(); + if (!iter_->Valid()) { // We have reached the end. Restart from the first. DLOG(INFO) << "Restarting data prefetching from start."; - layer->iter_->SeekToFirst(); + iter_->SeekToFirst(); } break; case DataParameter_DB_LMDB: - if (mdb_cursor_get(layer->mdb_cursor_, &layer->mdb_key_, - &layer->mdb_value_, MDB_NEXT) != MDB_SUCCESS) { + if (mdb_cursor_get(mdb_cursor_, &mdb_key_, + &mdb_value_, MDB_NEXT) != MDB_SUCCESS) { // We have reached the end. Restart from the first. DLOG(INFO) << "Restarting data prefetching from start."; - CHECK_EQ(mdb_cursor_get(layer->mdb_cursor_, &layer->mdb_key_, - &layer->mdb_value_, MDB_FIRST), MDB_SUCCESS); + CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_, + &mdb_value_, MDB_FIRST), MDB_SUCCESS); } break; default: LOG(FATAL) << "Unknown database backend"; } } - - return static_cast(NULL); } template @@ -323,14 +318,12 @@ void DataLayer::CreatePrefetchThread() { } else { prefetch_rng_.reset(); } - // Create the thread. - CHECK(!pthread_create(&thread_, NULL, DataLayerPrefetch, - static_cast(this))) << "Pthread execution failed."; + CHECK(!StartInternalThread()) << "Pthread execution failed"; } template void DataLayer::JoinPrefetchThread() { - CHECK(!pthread_join(thread_, NULL)) << "Pthread joining failed."; + CHECK(!WaitForInternalThreadToExit()) << "Pthread joining failed"; } template diff --git a/src/caffe/layers/image_data_layer.cpp b/src/caffe/layers/image_data_layer.cpp index 30050dd..1ba2a3b 100644 --- a/src/caffe/layers/image_data_layer.cpp +++ b/src/caffe/layers/image_data_layer.cpp @@ -2,7 +2,6 @@ #include #include -#include #include #include @@ -18,17 +17,14 @@ namespace caffe { +// This function is used to create a pthread that prefetches the data. template -void* ImageDataLayerPrefetch(void* layer_pointer) { - CHECK(layer_pointer); - ImageDataLayer* layer = - reinterpret_cast*>(layer_pointer); - CHECK(layer); +void ImageDataLayer::InternalThreadEntry() { Datum datum; - CHECK(layer->prefetch_data_.count()); - Dtype* top_data = layer->prefetch_data_.mutable_cpu_data(); - Dtype* top_label = layer->prefetch_label_.mutable_cpu_data(); - ImageDataParameter image_data_param = layer->layer_param_.image_data_param(); + CHECK(prefetch_data_.count()); + Dtype* top_data = prefetch_data_.mutable_cpu_data(); + Dtype* top_label = prefetch_label_.mutable_cpu_data(); + ImageDataParameter image_data_param = this->layer_param_.image_data_param(); const Dtype scale = image_data_param.scale(); const int batch_size = image_data_param.batch_size(); const int crop_size = image_data_param.crop_size(); @@ -41,17 +37,17 @@ void* ImageDataLayerPrefetch(void* layer_pointer) { << "set at the same time."; } // datum scales - const int channels = layer->datum_channels_; - const int height = layer->datum_height_; - const int width = layer->datum_width_; - const int size = layer->datum_size_; - const int lines_size = layer->lines_.size(); - const Dtype* mean = layer->data_mean_.cpu_data(); + const int channels = datum_channels_; + const int height = datum_height_; + const int width = datum_width_; + const int size = datum_size_; + const int lines_size = lines_.size(); + const Dtype* mean = data_mean_.cpu_data(); for (int item_id = 0; item_id < batch_size; ++item_id) { // get a blob - CHECK_GT(lines_size, layer->lines_id_); - if (!ReadImageToDatum(layer->lines_[layer->lines_id_].first, - layer->lines_[layer->lines_id_].second, + CHECK_GT(lines_size, lines_id_); + if (!ReadImageToDatum(lines_[lines_id_].first, + lines_[lines_id_].second, new_height, new_width, &datum)) { continue; } @@ -60,14 +56,14 @@ void* ImageDataLayerPrefetch(void* layer_pointer) { CHECK(data.size()) << "Image cropping only support uint8 data"; int h_off, w_off; // We only do random crop when we do training. - if (layer->phase_ == Caffe::TRAIN) { - h_off = layer->PrefetchRand() % (height - crop_size); - w_off = layer->PrefetchRand() % (width - crop_size); + if (phase_ == Caffe::TRAIN) { + h_off = PrefetchRand() % (height - crop_size); + w_off = PrefetchRand() % (width - crop_size); } else { h_off = (height - crop_size) / 2; w_off = (width - crop_size) / 2; } - if (mirror && layer->PrefetchRand() % 2) { + if (mirror && PrefetchRand() % 2) { // Copy mirrored version for (int c = 0; c < channels; ++c) { for (int h = 0; h < crop_size; ++h) { @@ -114,18 +110,16 @@ void* ImageDataLayerPrefetch(void* layer_pointer) { top_label[item_id] = datum.label(); // go to the next iter - layer->lines_id_++; - if (layer->lines_id_ >= lines_size) { + lines_id_++; + if (lines_id_ >= lines_size) { // We have reached the end. Restart from the first. DLOG(INFO) << "Restarting data prefetching from start."; - layer->lines_id_ = 0; - if (layer->layer_param_.image_data_param().shuffle()) { - layer->ShuffleImages(); + lines_id_ = 0; + if (this->layer_param_.image_data_param().shuffle()) { + ShuffleImages(); } } } - - return reinterpret_cast(NULL); } template @@ -239,8 +233,7 @@ void ImageDataLayer::CreatePrefetchThread() { prefetch_rng_.reset(); } // Create the thread. - CHECK(!pthread_create(&thread_, NULL, ImageDataLayerPrefetch, - static_cast(this))) << "Pthread execution failed."; + CHECK(!StartInternalThread()) << "Pthread execution failed"; } template @@ -255,9 +248,10 @@ void ImageDataLayer::ShuffleImages() { } } + template void ImageDataLayer::JoinPrefetchThread() { - CHECK(!pthread_join(thread_, NULL)) << "Pthread joining failed."; + CHECK(!WaitForInternalThreadToExit()) << "Pthread joining failed"; } template diff --git a/src/caffe/layers/window_data_layer.cpp b/src/caffe/layers/window_data_layer.cpp index 9b4a23d..fe7f61c 100644 --- a/src/caffe/layers/window_data_layer.cpp +++ b/src/caffe/layers/window_data_layer.cpp @@ -3,7 +3,6 @@ // Based on data_layer.cpp by Yangqing Jia. #include -#include #include #include @@ -28,34 +27,32 @@ namespace caffe { +// Thread fetching the data template -void* WindowDataLayerPrefetch(void* layer_pointer) { - WindowDataLayer* layer = - reinterpret_cast*>(layer_pointer); - +void WindowDataLayer::InternalThreadEntry() { // At each iteration, sample N windows where N*p are foreground (object) // windows and N*(1-p) are background (non-object) windows - Dtype* top_data = layer->prefetch_data_.mutable_cpu_data(); - Dtype* top_label = layer->prefetch_label_.mutable_cpu_data(); - const Dtype scale = layer->layer_param_.window_data_param().scale(); - const int batch_size = layer->layer_param_.window_data_param().batch_size(); - const int crop_size = layer->layer_param_.window_data_param().crop_size(); - const int context_pad = layer->layer_param_.window_data_param().context_pad(); - const bool mirror = layer->layer_param_.window_data_param().mirror(); + Dtype* top_data = prefetch_data_.mutable_cpu_data(); + Dtype* top_label = prefetch_label_.mutable_cpu_data(); + const Dtype scale = this->layer_param_.window_data_param().scale(); + const int batch_size = this->layer_param_.window_data_param().batch_size(); + const int crop_size = this->layer_param_.window_data_param().crop_size(); + const int context_pad = this->layer_param_.window_data_param().context_pad(); + const bool mirror = this->layer_param_.window_data_param().mirror(); const float fg_fraction = - layer->layer_param_.window_data_param().fg_fraction(); - const Dtype* mean = layer->data_mean_.cpu_data(); - const int mean_off = (layer->data_mean_.width() - crop_size) / 2; - const int mean_width = layer->data_mean_.width(); - const int mean_height = layer->data_mean_.height(); + this->layer_param_.window_data_param().fg_fraction(); + const Dtype* mean = data_mean_.cpu_data(); + const int mean_off = (data_mean_.width() - crop_size) / 2; + const int mean_width = data_mean_.width(); + const int mean_height = data_mean_.height(); cv::Size cv_crop_size(crop_size, crop_size); - const string& crop_mode = layer->layer_param_.window_data_param().crop_mode(); + const string& crop_mode = this->layer_param_.window_data_param().crop_mode(); bool use_square = (crop_mode == "square") ? true : false; // zero out batch - caffe_set(layer->prefetch_data_.count(), Dtype(0), top_data); + caffe_set(prefetch_data_.count(), Dtype(0), top_data); const int num_fg = static_cast(static_cast(batch_size) * fg_fraction); @@ -66,24 +63,24 @@ void* WindowDataLayerPrefetch(void* layer_pointer) { for (int is_fg = 0; is_fg < 2; ++is_fg) { for (int dummy = 0; dummy < num_samples[is_fg]; ++dummy) { // sample a window - const unsigned int rand_index = layer->PrefetchRand(); + const unsigned int rand_index = PrefetchRand(); vector window = (is_fg) ? - layer->fg_windows_[rand_index % layer->fg_windows_.size()] : - layer->bg_windows_[rand_index % layer->bg_windows_.size()]; + fg_windows_[rand_index % fg_windows_.size()] : + bg_windows_[rand_index % bg_windows_.size()]; bool do_mirror = false; - if (mirror && layer->PrefetchRand() % 2) { + if (mirror && PrefetchRand() % 2) { do_mirror = true; } // load the image containing the window pair > image = - layer->image_database_[window[WindowDataLayer::IMAGE_INDEX]]; + image_database_[window[WindowDataLayer::IMAGE_INDEX]]; cv::Mat cv_img = cv::imread(image.first, CV_LOAD_IMAGE_COLOR); if (!cv_img.data) { LOG(ERROR) << "Could not open or find file " << image.first; - return reinterpret_cast(NULL); + return; } const int channels = cv_img.channels(); @@ -210,7 +207,7 @@ void* WindowDataLayerPrefetch(void* layer_pointer) { // useful debugging code for dumping transformed windows to disk string file_id; std::stringstream ss; - ss << layer->PrefetchRand(); + ss << PrefetchRand(); ss >> file_id; std::ofstream inf((string("dump/") + file_id + string("_info.txt")).c_str(), std::ofstream::out); @@ -242,8 +239,6 @@ void* WindowDataLayerPrefetch(void* layer_pointer) { item_id++; } } - - return reinterpret_cast(NULL); } template @@ -415,13 +410,12 @@ void WindowDataLayer::CreatePrefetchThread() { prefetch_rng_.reset(); } // Create the thread. - CHECK(!pthread_create(&thread_, NULL, WindowDataLayerPrefetch, - static_cast(this))) << "Pthread execution failed."; + CHECK(!StartInternalThread()) << "Pthread execution failed."; } template void WindowDataLayer::JoinPrefetchThread() { - CHECK(!pthread_join(thread_, NULL)) << "Pthread joining failed."; + CHECK(!WaitForInternalThreadToExit()) << "Pthread joining failed."; } template diff --git a/src/caffe/test/test_power_layer.cpp b/src/caffe/test/test_power_layer.cpp index d4e6fc5..44fcbd9 100644 --- a/src/caffe/test/test_power_layer.cpp +++ b/src/caffe/test/test_power_layer.cpp @@ -13,8 +13,6 @@ #include "caffe/test/test_caffe_main.hpp" -using std::isnan; - namespace caffe { template -- 2.7.4