Cleanup pthread code for data layers
authorTANGUY Arnaud <arn.tanguy@gmail.com>
Wed, 16 Jul 2014 13:20:15 +0000 (15:20 +0200)
committerTANGUY Arnaud <arn.tanguy@gmail.com>
Thu, 24 Jul 2014 14:18:52 +0000 (16:18 +0200)
Get rid of the ugly "friend" thread function. Simplify the creation of the
thread by inheriting from a base class. Thus, defining the thread is as
simple as overriding a virtual function.

include/caffe/data_layers.hpp
include/caffe/internal_thread.hpp [new file with mode: 0644]
src/caffe/layers/data_layer.cpp
src/caffe/layers/image_data_layer.cpp
src/caffe/layers/window_data_layer.cpp
src/caffe/test/test_power_layer.cpp

index 1bb6bce..107d780 100644 (file)
@@ -9,7 +9,6 @@
 
 #include "leveldb/db.h"
 #include "lmdb.h"
-#include "pthread.h"
 #include "hdf5.h"
 #include "boost/scoped_ptr.hpp"
 
@@ -18,6 +17,7 @@
 #include "caffe/filler.hpp"
 #include "caffe/layer.hpp"
 #include "caffe/proto/caffe.pb.h"
+#include "caffe/internal_thread.hpp"
 
 namespace caffe {
 
@@ -27,15 +27,8 @@ namespace caffe {
 // TODO: DataLayer, ImageDataLayer, and WindowDataLayer all have the
 // same basic structure and a lot of duplicated code.
 
-// This function is used to create a pthread that prefetches the data.
 template <typename Dtype>
-void* DataLayerPrefetch(void* layer_pointer);
-
-template <typename Dtype>
-class DataLayer : public Layer<Dtype> {
-  // The function used to perform prefetching.
-  friend void* DataLayerPrefetch<Dtype>(void* layer_pointer);
-
+class DataLayer : public Layer<Dtype>, public InternalThread {
  public:
   explicit DataLayer(const LayerParameter& param)
       : Layer<Dtype>(param) {}
@@ -63,6 +56,8 @@ class DataLayer : public Layer<Dtype> {
   virtual void CreatePrefetchThread();
   virtual void JoinPrefetchThread();
   virtual unsigned int PrefetchRand();
+  // The thread's function
+  virtual void InternalThreadEntry();
 
   shared_ptr<Caffe::RNG> prefetch_rng_;
 
@@ -80,7 +75,6 @@ class DataLayer : public Layer<Dtype> {
   int datum_height_;
   int datum_width_;
   int datum_size_;
-  pthread_t thread_;
   Blob<Dtype> prefetch_data_;
   Blob<Dtype> prefetch_label_;
   Blob<Dtype> data_mean_;
@@ -182,15 +176,8 @@ class HDF5OutputLayer : public Layer<Dtype> {
   Blob<Dtype> label_blob_;
 };
 
-// This function is used to create a pthread that prefetches the data.
-template <typename Dtype>
-void* ImageDataLayerPrefetch(void* layer_pointer);
-
 template <typename Dtype>
-class ImageDataLayer : public Layer<Dtype> {
-  // The function used to perform prefetching.
-  friend void* ImageDataLayerPrefetch<Dtype>(void* layer_pointer);
-
+class ImageDataLayer : public Layer<Dtype>, public InternalThread {
  public:
   explicit ImageDataLayer(const LayerParameter& param)
       : Layer<Dtype>(param) {}
@@ -219,6 +206,7 @@ class ImageDataLayer : public Layer<Dtype> {
   virtual void CreatePrefetchThread();
   virtual void JoinPrefetchThread();
   virtual unsigned int PrefetchRand();
+  virtual void InternalThreadEntry();
 
   shared_ptr<Caffe::RNG> prefetch_rng_;
   vector<std::pair<std::string, int> > lines_;
@@ -227,7 +215,6 @@ class ImageDataLayer : public Layer<Dtype> {
   int datum_height_;
   int datum_width_;
   int datum_size_;
-  pthread_t thread_;
   Blob<Dtype> prefetch_data_;
   Blob<Dtype> prefetch_label_;
   Blob<Dtype> data_mean_;
@@ -277,15 +264,8 @@ class MemoryDataLayer : public Layer<Dtype> {
   int pos_;
 };
 
-// This function is used to create a pthread that prefetches the window data.
-template <typename Dtype>
-void* WindowDataLayerPrefetch(void* layer_pointer);
-
 template <typename Dtype>
-class WindowDataLayer : public Layer<Dtype> {
-  // The function used to perform prefetching.
-  friend void* WindowDataLayerPrefetch<Dtype>(void* layer_pointer);
-
+class WindowDataLayer : public Layer<Dtype>, public InternalThread {
  public:
   explicit WindowDataLayer(const LayerParameter& param)
       : Layer<Dtype>(param) {}
@@ -312,9 +292,9 @@ class WindowDataLayer : public Layer<Dtype> {
   virtual void CreatePrefetchThread();
   virtual void JoinPrefetchThread();
   virtual unsigned int PrefetchRand();
+  virtual void InternalThreadEntry();
 
   shared_ptr<Caffe::RNG> prefetch_rng_;
-  pthread_t thread_;
   Blob<Dtype> prefetch_data_;
   Blob<Dtype> prefetch_label_;
   Blob<Dtype> data_mean_;
diff --git a/include/caffe/internal_thread.hpp b/include/caffe/internal_thread.hpp
new file mode 100644 (file)
index 0000000..46f6736
--- /dev/null
@@ -0,0 +1,46 @@
+// Copyright 2014 BVLC and contributors.
+
+#ifndef CAFFE_INTERNAL_THREAD_HPP_
+#define CAFFE_INTERNAL_THREAD_HPP_
+
+#include <pthread.h>
+
+namespace caffe {
+
+/**
+ * Virutal class encapsulate pthread for use in base class
+ * The child class will acquire the ability to run a single pthread,
+ * by reimplementing the virutal function InternalThreadEntry.
+ */
+class InternalThread {
+ public:
+  InternalThread() {}
+  virtual ~InternalThread() {}
+
+  /** Returns true if the thread was successfully started **/
+  bool StartInternalThread() {
+    return pthread_create(&_thread, NULL, InternalThreadEntryFunc, this);
+  }
+
+  /** Will not return until the internal thread has exited. */
+  bool WaitForInternalThreadToExit() {
+    return pthread_join(_thread, NULL);
+  }
+
+ protected:
+  /* Implement this method in your subclass 
+      with the code you want your thread to run. */
+  virtual void InternalThreadEntry() = 0;
+
+ private:
+  static void * InternalThreadEntryFunc(void * This) {
+    reinterpret_cast<InternalThread *>(This)->InternalThreadEntry();
+    return NULL;
+  }
+
+  pthread_t _thread;
+};
+
+}  // namespace caffe
+
+#endif
index b6afaa9..ef27c1a 100644 (file)
@@ -2,7 +2,6 @@
 
 #include <stdint.h>
 #include <leveldb/db.h>
-#include <pthread.h>
 
 #include <string>
 #include <vector>
 
 namespace caffe {
 
+// This function is used to create a pthread that prefetches the data.
 template <typename Dtype>
-void* DataLayerPrefetch(void* layer_pointer) {
-  CHECK(layer_pointer);
-  DataLayer<Dtype>* layer = static_cast<DataLayer<Dtype>*>(layer_pointer);
-  CHECK(layer);
+void DataLayer<Dtype>::InternalThreadEntry() {
   Datum datum;
-  CHECK(layer->prefetch_data_.count());
-  Dtype* top_data = layer->prefetch_data_.mutable_cpu_data();
+  CHECK(prefetch_data_.count());
+  Dtype* top_data = prefetch_data_.mutable_cpu_data();
   Dtype* top_label = NULL;  // suppress warnings about uninitialized variables
-  if (layer->output_labels_) {
-    top_label = layer->prefetch_label_.mutable_cpu_data();
+  if (output_labels_) {
+    top_label = prefetch_label_.mutable_cpu_data();
   }
-  const Dtype scale = layer->layer_param_.data_param().scale();
-  const int batch_size = layer->layer_param_.data_param().batch_size();
-  const int crop_size = layer->layer_param_.data_param().crop_size();
-  const bool mirror = layer->layer_param_.data_param().mirror();
+  const Dtype scale = this->layer_param_.data_param().scale();
+  const int batch_size = this->layer_param_.data_param().batch_size();
+  const int crop_size = this->layer_param_.data_param().crop_size();
+  const bool mirror = this->layer_param_.data_param().mirror();
 
   if (mirror && crop_size == 0) {
     LOG(FATAL) << "Current implementation requires mirror and crop_size to be "
         << "set at the same time.";
   }
   // datum scales
-  const int channels = layer->datum_channels_;
-  const int height = layer->datum_height_;
-  const int width = layer->datum_width_;
-  const int size = layer->datum_size_;
-  const Dtype* mean = layer->data_mean_.cpu_data();
+  const int channels = datum_channels_;
+  const int height = datum_height_;
+  const int width = datum_width_;
+  const int size = datum_size_;
+  const Dtype* mean = data_mean_.cpu_data();
   for (int item_id = 0; item_id < batch_size; ++item_id) {
     // get a blob
-    switch (layer->layer_param_.data_param().backend()) {
+    switch (this->layer_param_.data_param().backend()) {
     case DataParameter_DB_LEVELDB:
-      CHECK(layer->iter_);
-      CHECK(layer->iter_->Valid());
-      datum.ParseFromString(layer->iter_->value().ToString());
+      CHECK(iter_);
+      CHECK(iter_->Valid());
+      datum.ParseFromString(iter_->value().ToString());
       break;
     case DataParameter_DB_LMDB:
-      CHECK_EQ(mdb_cursor_get(layer->mdb_cursor_, &layer->mdb_key_,
-              &layer->mdb_value_, MDB_GET_CURRENT), MDB_SUCCESS);
-      datum.ParseFromArray(layer->mdb_value_.mv_data,
-          layer->mdb_value_.mv_size);
+      CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_,
+              &mdb_value_, MDB_GET_CURRENT), MDB_SUCCESS);
+      datum.ParseFromArray(mdb_value_.mv_data,
+          mdb_value_.mv_size);
       break;
     default:
       LOG(FATAL) << "Unknown database backend";
@@ -66,14 +63,14 @@ void* DataLayerPrefetch(void* layer_pointer) {
       CHECK(data.size()) << "Image cropping only support uint8 data";
       int h_off, w_off;
       // We only do random crop when we do training.
-      if (layer->phase_ == Caffe::TRAIN) {
-        h_off = layer->PrefetchRand() % (height - crop_size);
-        w_off = layer->PrefetchRand() % (width - crop_size);
+      if (phase_ == Caffe::TRAIN) {
+        h_off = PrefetchRand() % (height - crop_size);
+        w_off = PrefetchRand() % (width - crop_size);
       } else {
         h_off = (height - crop_size) / 2;
         w_off = (width - crop_size) / 2;
       }
-      if (mirror && layer->PrefetchRand() % 2) {
+      if (mirror && PrefetchRand() % 2) {
         // Copy mirrored version
         for (int c = 0; c < channels; ++c) {
           for (int h = 0; h < crop_size; ++h) {
@@ -118,34 +115,32 @@ void* DataLayerPrefetch(void* layer_pointer) {
       }
     }
 
-    if (layer->output_labels_) {
+    if (output_labels_) {
       top_label[item_id] = datum.label();
     }
     // go to the next iter
-    switch (layer->layer_param_.data_param().backend()) {
+    switch (this->layer_param_.data_param().backend()) {
     case DataParameter_DB_LEVELDB:
-      layer->iter_->Next();
-      if (!layer->iter_->Valid()) {
+      iter_->Next();
+      if (!iter_->Valid()) {
         // We have reached the end. Restart from the first.
         DLOG(INFO) << "Restarting data prefetching from start.";
-        layer->iter_->SeekToFirst();
+        iter_->SeekToFirst();
       }
       break;
     case DataParameter_DB_LMDB:
-      if (mdb_cursor_get(layer->mdb_cursor_, &layer->mdb_key_,
-              &layer->mdb_value_, MDB_NEXT) != MDB_SUCCESS) {
+      if (mdb_cursor_get(mdb_cursor_, &mdb_key_,
+              &mdb_value_, MDB_NEXT) != MDB_SUCCESS) {
         // We have reached the end. Restart from the first.
         DLOG(INFO) << "Restarting data prefetching from start.";
-        CHECK_EQ(mdb_cursor_get(layer->mdb_cursor_, &layer->mdb_key_,
-                &layer->mdb_value_, MDB_FIRST), MDB_SUCCESS);
+        CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_,
+                &mdb_value_, MDB_FIRST), MDB_SUCCESS);
       }
       break;
     default:
       LOG(FATAL) << "Unknown database backend";
     }
   }
-
-  return static_cast<void*>(NULL);
 }
 
 template <typename Dtype>
@@ -323,14 +318,12 @@ void DataLayer<Dtype>::CreatePrefetchThread() {
   } else {
     prefetch_rng_.reset();
   }
-  // Create the thread.
-  CHECK(!pthread_create(&thread_, NULL, DataLayerPrefetch<Dtype>,
-        static_cast<void*>(this))) << "Pthread execution failed.";
+  CHECK(!StartInternalThread()) << "Pthread execution failed";
 }
 
 template <typename Dtype>
 void DataLayer<Dtype>::JoinPrefetchThread() {
-  CHECK(!pthread_join(thread_, NULL)) << "Pthread joining failed.";
+  CHECK(!WaitForInternalThreadToExit()) << "Pthread joining failed";
 }
 
 template <typename Dtype>
index 30050dd..1ba2a3b 100644 (file)
@@ -2,7 +2,6 @@
 
 #include <stdint.h>
 #include <leveldb/db.h>
-#include <pthread.h>
 
 #include <string>
 #include <vector>
 
 namespace caffe {
 
+// This function is used to create a pthread that prefetches the data.
 template <typename Dtype>
-void* ImageDataLayerPrefetch(void* layer_pointer) {
-  CHECK(layer_pointer);
-  ImageDataLayer<Dtype>* layer =
-      reinterpret_cast<ImageDataLayer<Dtype>*>(layer_pointer);
-  CHECK(layer);
+void ImageDataLayer<Dtype>::InternalThreadEntry() {
   Datum datum;
-  CHECK(layer->prefetch_data_.count());
-  Dtype* top_data = layer->prefetch_data_.mutable_cpu_data();
-  Dtype* top_label = layer->prefetch_label_.mutable_cpu_data();
-  ImageDataParameter image_data_param = layer->layer_param_.image_data_param();
+  CHECK(prefetch_data_.count());
+  Dtype* top_data = prefetch_data_.mutable_cpu_data();
+  Dtype* top_label = prefetch_label_.mutable_cpu_data();
+  ImageDataParameter image_data_param = this->layer_param_.image_data_param();
   const Dtype scale = image_data_param.scale();
   const int batch_size = image_data_param.batch_size();
   const int crop_size = image_data_param.crop_size();
@@ -41,17 +37,17 @@ void* ImageDataLayerPrefetch(void* layer_pointer) {
         << "set at the same time.";
   }
   // datum scales
-  const int channels = layer->datum_channels_;
-  const int height = layer->datum_height_;
-  const int width = layer->datum_width_;
-  const int size = layer->datum_size_;
-  const int lines_size = layer->lines_.size();
-  const Dtype* mean = layer->data_mean_.cpu_data();
+  const int channels = datum_channels_;
+  const int height = datum_height_;
+  const int width = datum_width_;
+  const int size = datum_size_;
+  const int lines_size = lines_.size();
+  const Dtype* mean = data_mean_.cpu_data();
   for (int item_id = 0; item_id < batch_size; ++item_id) {
     // get a blob
-    CHECK_GT(lines_size, layer->lines_id_);
-    if (!ReadImageToDatum(layer->lines_[layer->lines_id_].first,
-          layer->lines_[layer->lines_id_].second,
+    CHECK_GT(lines_size, lines_id_);
+    if (!ReadImageToDatum(lines_[lines_id_].first,
+          lines_[lines_id_].second,
           new_height, new_width, &datum)) {
       continue;
     }
@@ -60,14 +56,14 @@ void* ImageDataLayerPrefetch(void* layer_pointer) {
       CHECK(data.size()) << "Image cropping only support uint8 data";
       int h_off, w_off;
       // We only do random crop when we do training.
-      if (layer->phase_ == Caffe::TRAIN) {
-        h_off = layer->PrefetchRand() % (height - crop_size);
-        w_off = layer->PrefetchRand() % (width - crop_size);
+      if (phase_ == Caffe::TRAIN) {
+        h_off = PrefetchRand() % (height - crop_size);
+        w_off = PrefetchRand() % (width - crop_size);
       } else {
         h_off = (height - crop_size) / 2;
         w_off = (width - crop_size) / 2;
       }
-      if (mirror && layer->PrefetchRand() % 2) {
+      if (mirror && PrefetchRand() % 2) {
         // Copy mirrored version
         for (int c = 0; c < channels; ++c) {
           for (int h = 0; h < crop_size; ++h) {
@@ -114,18 +110,16 @@ void* ImageDataLayerPrefetch(void* layer_pointer) {
 
     top_label[item_id] = datum.label();
     // go to the next iter
-    layer->lines_id_++;
-    if (layer->lines_id_ >= lines_size) {
+    lines_id_++;
+    if (lines_id_ >= lines_size) {
       // We have reached the end. Restart from the first.
       DLOG(INFO) << "Restarting data prefetching from start.";
-      layer->lines_id_ = 0;
-      if (layer->layer_param_.image_data_param().shuffle()) {
-        layer->ShuffleImages();
+      lines_id_ = 0;
+      if (this->layer_param_.image_data_param().shuffle()) {
+        ShuffleImages();
       }
     }
   }
-
-  return reinterpret_cast<void*>(NULL);
 }
 
 template <typename Dtype>
@@ -239,8 +233,7 @@ void ImageDataLayer<Dtype>::CreatePrefetchThread() {
     prefetch_rng_.reset();
   }
   // Create the thread.
-  CHECK(!pthread_create(&thread_, NULL, ImageDataLayerPrefetch<Dtype>,
-        static_cast<void*>(this))) << "Pthread execution failed.";
+  CHECK(!StartInternalThread()) << "Pthread execution failed";
 }
 
 template <typename Dtype>
@@ -255,9 +248,10 @@ void ImageDataLayer<Dtype>::ShuffleImages() {
   }
 }
 
+
 template <typename Dtype>
 void ImageDataLayer<Dtype>::JoinPrefetchThread() {
-  CHECK(!pthread_join(thread_, NULL)) << "Pthread joining failed.";
+  CHECK(!WaitForInternalThreadToExit()) << "Pthread joining failed";
 }
 
 template <typename Dtype>
index 9b4a23d..fe7f61c 100644 (file)
@@ -3,7 +3,6 @@
 // Based on data_layer.cpp by Yangqing Jia.
 
 #include <stdint.h>
-#include <pthread.h>
 
 #include <algorithm>
 #include <string>
 
 namespace caffe {
 
+// Thread fetching the data
 template <typename Dtype>
-void* WindowDataLayerPrefetch(void* layer_pointer) {
-  WindowDataLayer<Dtype>* layer =
-      reinterpret_cast<WindowDataLayer<Dtype>*>(layer_pointer);
-
+void WindowDataLayer<Dtype>::InternalThreadEntry() {
   // At each iteration, sample N windows where N*p are foreground (object)
   // windows and N*(1-p) are background (non-object) windows
 
-  Dtype* top_data = layer->prefetch_data_.mutable_cpu_data();
-  Dtype* top_label = layer->prefetch_label_.mutable_cpu_data();
-  const Dtype scale = layer->layer_param_.window_data_param().scale();
-  const int batch_size = layer->layer_param_.window_data_param().batch_size();
-  const int crop_size = layer->layer_param_.window_data_param().crop_size();
-  const int context_pad = layer->layer_param_.window_data_param().context_pad();
-  const bool mirror = layer->layer_param_.window_data_param().mirror();
+  Dtype* top_data = prefetch_data_.mutable_cpu_data();
+  Dtype* top_label = prefetch_label_.mutable_cpu_data();
+  const Dtype scale = this->layer_param_.window_data_param().scale();
+  const int batch_size = this->layer_param_.window_data_param().batch_size();
+  const int crop_size = this->layer_param_.window_data_param().crop_size();
+  const int context_pad = this->layer_param_.window_data_param().context_pad();
+  const bool mirror = this->layer_param_.window_data_param().mirror();
   const float fg_fraction =
-      layer->layer_param_.window_data_param().fg_fraction();
-  const Dtype* mean = layer->data_mean_.cpu_data();
-  const int mean_off = (layer->data_mean_.width() - crop_size) / 2;
-  const int mean_width = layer->data_mean_.width();
-  const int mean_height = layer->data_mean_.height();
+      this->layer_param_.window_data_param().fg_fraction();
+  const Dtype* mean = data_mean_.cpu_data();
+  const int mean_off = (data_mean_.width() - crop_size) / 2;
+  const int mean_width = data_mean_.width();
+  const int mean_height = data_mean_.height();
   cv::Size cv_crop_size(crop_size, crop_size);
-  const string& crop_mode = layer->layer_param_.window_data_param().crop_mode();
+  const string& crop_mode = this->layer_param_.window_data_param().crop_mode();
 
   bool use_square = (crop_mode == "square") ? true : false;
 
   // zero out batch
-  caffe_set(layer->prefetch_data_.count(), Dtype(0), top_data);
+  caffe_set(prefetch_data_.count(), Dtype(0), top_data);
 
   const int num_fg = static_cast<int>(static_cast<float>(batch_size)
       * fg_fraction);
@@ -66,24 +63,24 @@ void* WindowDataLayerPrefetch(void* layer_pointer) {
   for (int is_fg = 0; is_fg < 2; ++is_fg) {
     for (int dummy = 0; dummy < num_samples[is_fg]; ++dummy) {
       // sample a window
-      const unsigned int rand_index = layer->PrefetchRand();
+      const unsigned int rand_index = PrefetchRand();
       vector<float> window = (is_fg) ?
-          layer->fg_windows_[rand_index % layer->fg_windows_.size()] :
-          layer->bg_windows_[rand_index % layer->bg_windows_.size()];
+          fg_windows_[rand_index % fg_windows_.size()] :
+          bg_windows_[rand_index % bg_windows_.size()];
 
       bool do_mirror = false;
-      if (mirror && layer->PrefetchRand() % 2) {
+      if (mirror && PrefetchRand() % 2) {
         do_mirror = true;
       }
 
       // load the image containing the window
       pair<std::string, vector<int> > image =
-          layer->image_database_[window[WindowDataLayer<Dtype>::IMAGE_INDEX]];
+          image_database_[window[WindowDataLayer<Dtype>::IMAGE_INDEX]];
 
       cv::Mat cv_img = cv::imread(image.first, CV_LOAD_IMAGE_COLOR);
       if (!cv_img.data) {
         LOG(ERROR) << "Could not open or find file " << image.first;
-        return reinterpret_cast<void*>(NULL);
+        return;
       }
       const int channels = cv_img.channels();
 
@@ -210,7 +207,7 @@ void* WindowDataLayerPrefetch(void* layer_pointer) {
       // useful debugging code for dumping transformed windows to disk
       string file_id;
       std::stringstream ss;
-      ss << layer->PrefetchRand();
+      ss << PrefetchRand();
       ss >> file_id;
       std::ofstream inf((string("dump/") + file_id +
           string("_info.txt")).c_str(), std::ofstream::out);
@@ -242,8 +239,6 @@ void* WindowDataLayerPrefetch(void* layer_pointer) {
       item_id++;
     }
   }
-
-  return reinterpret_cast<void*>(NULL);
 }
 
 template <typename Dtype>
@@ -415,13 +410,12 @@ void WindowDataLayer<Dtype>::CreatePrefetchThread() {
     prefetch_rng_.reset();
   }
   // Create the thread.
-  CHECK(!pthread_create(&thread_, NULL, WindowDataLayerPrefetch<Dtype>,
-        static_cast<void*>(this))) << "Pthread execution failed.";
+  CHECK(!StartInternalThread()) << "Pthread execution failed.";
 }
 
 template <typename Dtype>
 void WindowDataLayer<Dtype>::JoinPrefetchThread() {
-  CHECK(!pthread_join(thread_, NULL)) << "Pthread joining failed.";
+  CHECK(!WaitForInternalThreadToExit()) << "Pthread joining failed.";
 }
 
 template <typename Dtype>
index d4e6fc5..44fcbd9 100644 (file)
@@ -13,8 +13,6 @@
 
 #include "caffe/test/test_caffe_main.hpp"
 
-using std::isnan;
-
 namespace caffe {
 
 template <typename TypeParam>