cleanup data_layer, add prefetch_rng_ field to it and use instead of
authorJeff Donahue <jeff.donahue@gmail.com>
Tue, 22 Apr 2014 19:41:02 +0000 (12:41 -0700)
committerJeff Donahue <jeff.donahue@gmail.com>
Wed, 23 Apr 2014 06:40:18 +0000 (23:40 -0700)
rand -- seeded tests still fail

include/caffe/util/math_functions.hpp
include/caffe/vision_layers.hpp
src/caffe/layers/data_layer.cpp
src/caffe/test/test_data_layer.cpp
src/caffe/util/math_functions.cpp

index 62290123b8b6c3b404e7a385ec8b2590ff2c7053..887c8f2e6e069aa38bbdf7504122a2ee468642cb 100644 (file)
@@ -105,6 +105,8 @@ void caffe_powx(const int n, const Dtype* a, const Dtype b, Dtype* y);
 template <typename Dtype>
 void caffe_gpu_powx(const int n, const Dtype* a, const Dtype b, Dtype* y);
 
+unsigned int caffe_rng_rand();
+
 template <typename Dtype>
 Dtype caffe_nextafter(const Dtype b);
 
index 6084c27d6f6c97ca06ab302d2fb55997f500a492..4f5e41fc16362080114e47ec00a748ee7e2ce06d 100644 (file)
@@ -286,6 +286,11 @@ class DataLayer : public Layer<Dtype> {
   virtual void Backward_gpu(const vector<Blob<Dtype>*>& top,
       const bool propagate_down, vector<Blob<Dtype>*>* bottom) { return; }
 
+  virtual void CreatePrefetchThread();
+  virtual void JoinPrefetchThread();
+  virtual unsigned int PrefetchRand();
+
+  shared_ptr<Caffe::RNG> prefetch_rng_;
   shared_ptr<leveldb::DB> db_;
   shared_ptr<leveldb::Iterator> iter_;
   int datum_channels_;
index e4c34eb6ea7bd913ecadb227fb5edc3b52e7e740..ae673f03593f314b79f8e51e370ca03f75fca9d3 100644 (file)
@@ -10,6 +10,7 @@
 #include "caffe/layer.hpp"
 #include "caffe/util/io.hpp"
 #include "caffe/util/math_functions.hpp"
+#include "caffe/util/rng.hpp"
 #include "caffe/vision_layers.hpp"
 
 using std::string;
@@ -19,7 +20,7 @@ namespace caffe {
 template <typename Dtype>
 void* DataLayerPrefetch(void* layer_pointer) {
   CHECK(layer_pointer);
-  DataLayer<Dtype>* layer = reinterpret_cast<DataLayer<Dtype>*>(layer_pointer);
+  DataLayer<Dtype>* layer = static_cast<DataLayer<Dtype>*>(layer_pointer);
   CHECK(layer);
   Datum datum;
   CHECK(layer->prefetch_data_);
@@ -54,27 +55,23 @@ void* DataLayerPrefetch(void* layer_pointer) {
       int h_off, w_off;
       // We only do random crop when we do training.
       if (layer->phase_ == Caffe::TRAIN) {
-        // NOLINT_NEXT_LINE(runtime/threadsafe_fn)
-        h_off = rand() % (height - crop_size);
-        // NOLINT_NEXT_LINE(runtime/threadsafe_fn)
-        w_off = rand() % (width - crop_size);
+        h_off = layer->PrefetchRand() % (height - crop_size);
+        w_off = layer->PrefetchRand() % (width - crop_size);
       } else {
         h_off = (height - crop_size) / 2;
         w_off = (width - crop_size) / 2;
       }
-      // NOLINT_NEXT_LINE(runtime/threadsafe_fn)
-      if (mirror && rand() % 2) {
+      if (mirror && layer->PrefetchRand() % 2) {
         // Copy mirrored version
         for (int c = 0; c < channels; ++c) {
           for (int h = 0; h < crop_size; ++h) {
             for (int w = 0; w < crop_size; ++w) {
-              top_data[((item_id * channels + c) * crop_size + h) * crop_size
-                       + crop_size - 1 - w] =
-                  (static_cast<Dtype>(
-                      (uint8_t)data[(c * height + h + h_off) * width
-                                    + w + w_off])
-                    - mean[(c * height + h + h_off) * width + w + w_off])
-                  * scale;
+              int top_index = ((item_id * channels + c) * crop_size + h)
+                              * crop_size + (crop_size - 1 - w);
+              int data_index = (c * height + h + h_off) * width + w + w_off;
+              Dtype datum_element =
+                  static_cast<Dtype>(static_cast<uint8_t>(data[data_index]));
+              top_data[top_index] = (datum_element - mean[data_index]) * scale;
             }
           }
         }
@@ -83,13 +80,12 @@ void* DataLayerPrefetch(void* layer_pointer) {
         for (int c = 0; c < channels; ++c) {
           for (int h = 0; h < crop_size; ++h) {
             for (int w = 0; w < crop_size; ++w) {
-              top_data[((item_id * channels + c) * crop_size + h) * crop_size
-                       + w]
-                  = (static_cast<Dtype>(
-                      (uint8_t)data[(c * height + h + h_off) * width
-                                    + w + w_off])
-                     - mean[(c * height + h + h_off) * width + w + w_off])
-                  * scale;
+              int top_index = ((item_id * channels + c) * crop_size + h)
+                              * crop_size + w;
+              int data_index = (c * height + h + h_off) * width + w + w_off;
+              Dtype datum_element =
+                  static_cast<Dtype>(static_cast<uint8_t>(data[data_index]));
+              top_data[top_index] = (datum_element - mean[data_index]) * scale;
             }
           }
         }
@@ -98,8 +94,9 @@ void* DataLayerPrefetch(void* layer_pointer) {
       // we will prefer to use data() first, and then try float_data()
       if (data.size()) {
         for (int j = 0; j < size; ++j) {
-          top_data[item_id * size + j] =
-              (static_cast<Dtype>((uint8_t)data[j]) - mean[j]) * scale;
+          Dtype datum_element =
+              static_cast<Dtype>(static_cast<uint8_t>(data[j]));
+          top_data[item_id * size + j] = (datum_element - mean[j]) * scale;
         }
       } else {
         for (int j = 0; j < size; ++j) {
@@ -121,13 +118,12 @@ void* DataLayerPrefetch(void* layer_pointer) {
     }
   }
 
-  return reinterpret_cast<void*>(NULL);
+  return static_cast<void*>(NULL);
 }
 
 template <typename Dtype>
 DataLayer<Dtype>::~DataLayer<Dtype>() {
-  // Finally, join the thread
-  CHECK(!pthread_join(thread_, NULL)) << "Pthread joining failed.";
+  JoinPrefetchThread();
 }
 
 template <typename Dtype>
@@ -157,8 +153,8 @@ void DataLayer<Dtype>::SetUp(const vector<Blob<Dtype>*>& bottom,
   iter_->SeekToFirst();
   // Check if we would need to randomly skip a few data points
   if (this->layer_param_.data_param().rand_skip()) {
-    // NOLINT_NEXT_LINE(runtime/threadsafe_fn)
-    unsigned int skip = rand() % this->layer_param_.data_param().rand_skip();
+    unsigned int skip = caffe_rng_rand() %
+                        this->layer_param_.data_param().rand_skip();
     LOG(INFO) << "Skipping first " << skip << " data points.";
     while (skip-- > 0) {
       iter_->Next();
@@ -227,17 +223,44 @@ void DataLayer<Dtype>::SetUp(const vector<Blob<Dtype>*>& bottom,
   }
   data_mean_.cpu_data();
   DLOG(INFO) << "Initializing prefetch";
+  CreatePrefetchThread();
+  DLOG(INFO) << "Prefetch initialized.";
+}
+
+template <typename Dtype>
+void DataLayer<Dtype>::CreatePrefetchThread() {
   phase_ = Caffe::phase();
+  const bool prefetch_needs_rand = (phase_ == Caffe::TRAIN) &&
+      (this->layer_param_.data_param().mirror() ||
+       this->layer_param_.data_param().crop_size());
+  if (prefetch_needs_rand) {
+    const unsigned int prefetch_rng_seed = caffe_rng_rand();
+    prefetch_rng_.reset(new Caffe::RNG(prefetch_rng_seed));
+  } else {
+    prefetch_rng_.reset();
+  }
+  // Create the thread.
   CHECK(!pthread_create(&thread_, NULL, DataLayerPrefetch<Dtype>,
-      reinterpret_cast<void*>(this))) << "Pthread execution failed.";
-  DLOG(INFO) << "Prefetch initialized.";
+        static_cast<void*>(this))) << "Pthread execution failed.";
+}
+
+template <typename Dtype>
+void DataLayer<Dtype>::JoinPrefetchThread() {
+  CHECK(!pthread_join(thread_, NULL)) << "Pthread joining failed.";
+}
+
+template <typename Dtype>
+unsigned int DataLayer<Dtype>::PrefetchRand() {
+  caffe::rng_t* prefetch_rng =
+      static_cast<caffe::rng_t*>(prefetch_rng_->generator());
+  return (*prefetch_rng)();
 }
 
 template <typename Dtype>
 Dtype DataLayer<Dtype>::Forward_cpu(const vector<Blob<Dtype>*>& bottom,
       vector<Blob<Dtype>*>* top) {
   // First, join the thread
-  CHECK(!pthread_join(thread_, NULL)) << "Pthread joining failed.";
+  JoinPrefetchThread();
   // Copy the data
   caffe_copy(prefetch_data_->count(), prefetch_data_->cpu_data(),
              (*top)[0]->mutable_cpu_data());
@@ -246,9 +269,7 @@ Dtype DataLayer<Dtype>::Forward_cpu(const vector<Blob<Dtype>*>& bottom,
                (*top)[1]->mutable_cpu_data());
   }
   // Start a new prefetch thread
-  phase_ = Caffe::phase();
-  CHECK(!pthread_create(&thread_, NULL, DataLayerPrefetch<Dtype>,
-      reinterpret_cast<void*>(this))) << "Pthread execution failed.";
+  CreatePrefetchThread();
   return Dtype(0.);
 }
 
index fad8573d6da4255b6827b526e9f059c01fb4a9de..9847aca8352b7823035c6a405bbd822cf817c967 100644 (file)
@@ -249,6 +249,7 @@ TYPED_TEST(DataLayerTest, TestReadCropTrainSequenceSeededCPU) {
 
   // Get crop sequence with Caffe seed 1701 (done in SetUp), srand seed 1701.
   srand(this->seed_);
+  Caffe::set_random_seed(this->seed_);
   vector<vector<TypeParam> > crop_sequence;
   for (int iter = 0; iter < 2; ++iter) {
     layer.Forward(this->blob_bottom_vec_, &this->blob_top_vec_);
index a3f1084367defe72cf4b9a367e0cff785b20d43b..ba2492aa635465d2efd94787732a21920b51e255 100644 (file)
@@ -302,6 +302,10 @@ void caffe_exp<double>(const int n, const double* a, double* y) {
   vdExp(n, a, y);
 }
 
+unsigned int caffe_rng_rand() {
+  return (*caffe_rng())();
+}
+
 template <typename Dtype>
 Dtype caffe_nextafter(const Dtype b) {
   return boost::math::nextafter<Dtype>(