Add DataReader for parallel training with one DB session
authorCyprien Noel <cyprien.noel@gmail.com>
Tue, 19 May 2015 01:06:09 +0000 (18:06 -0700)
committerEvan Shelhamer <shelhamer@imaginarynumber.net>
Sun, 9 Aug 2015 22:13:11 +0000 (15:13 -0700)
- Make sure each solver accesses a different subset of the data
- Sequential reading of DB for performance
- Prefetch a configurable amount of data to host memory
- Distribute data to solvers in round-robin way for determinism

include/caffe/data_layers.hpp
include/caffe/data_reader.hpp [new file with mode: 0644]
src/caffe/data_reader.cpp [new file with mode: 0644]
src/caffe/layers/base_data_layer.cpp
src/caffe/layers/data_layer.cpp
src/caffe/proto/caffe.proto
src/caffe/test/test_layer_factory.cpp
src/caffe/test/test_upgrade_proto.cpp
src/caffe/util/blocking_queue.cpp

index f57ab6b..12e6c36 100644 (file)
@@ -5,11 +5,11 @@
 #include <utility>
 #include <vector>
 
-#include "boost/scoped_ptr.hpp"
 #include "hdf5.h"
 
 #include "caffe/blob.hpp"
 #include "caffe/common.hpp"
+#include "caffe/data_reader.hpp"
 #include "caffe/data_transformer.hpp"
 #include "caffe/filler.hpp"
 #include "caffe/internal_thread.hpp"
@@ -90,8 +90,7 @@ class BasePrefetchingDataLayer :
 template <typename Dtype>
 class DataLayer : public BasePrefetchingDataLayer<Dtype> {
  public:
-  explicit DataLayer(const LayerParameter& param)
-      : BasePrefetchingDataLayer<Dtype>(param) {}
+  explicit DataLayer(const LayerParameter& param);
   virtual ~DataLayer();
   virtual void DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
       const vector<Blob<Dtype>*>& top);
@@ -104,8 +103,7 @@ class DataLayer : public BasePrefetchingDataLayer<Dtype> {
  protected:
   virtual void load_batch(Batch<Dtype>* batch);
 
-  shared_ptr<db::DB> db_;
-  shared_ptr<db::Cursor> cursor_;
+  DataReader reader_;
 };
 
 /**
diff --git a/include/caffe/data_reader.hpp b/include/caffe/data_reader.hpp
new file mode 100644 (file)
index 0000000..8ed5542
--- /dev/null
@@ -0,0 +1,82 @@
+#ifndef CAFFE_DATA_READER_HPP_
+#define CAFFE_DATA_READER_HPP_
+
+#include <map>
+#include <string>
+#include <vector>
+
+#include "caffe/common.hpp"
+#include "caffe/internal_thread.hpp"
+#include "caffe/util/blocking_queue.hpp"
+#include "caffe/util/db.hpp"
+
+namespace caffe {
+
+/**
+ * @brief Reads data from a source to queues available to data layers.
+ * A single reading thread is created per source, even if multiple solvers
+ * are running in parallel, e.g. for multi-GPU training. This makes sure
+ * databases are read sequentially, and that each solver accesses a different
+ * subset of the database. Data is distributed to solvers in a round-robin
+ * way to keep parallel training deterministic.
+ */
+class DataReader {
+ public:
+  explicit DataReader(const LayerParameter& param);
+  ~DataReader();
+
+  inline BlockingQueue<Datum*>& free() const {
+    return queue_pair_->free_;
+  }
+  inline BlockingQueue<Datum*>& full() const {
+    return queue_pair_->full_;
+  }
+
+ protected:
+  // Queue pairs are shared between a body and its readers
+  class QueuePair {
+   public:
+    explicit QueuePair(int size);
+    ~QueuePair();
+
+    BlockingQueue<Datum*> free_;
+    BlockingQueue<Datum*> full_;
+
+  DISABLE_COPY_AND_ASSIGN(QueuePair);
+  };
+
+  // A single body is created per source
+  class Body : public InternalThread {
+   public:
+    explicit Body(const LayerParameter& param);
+    virtual ~Body();
+
+   protected:
+    void InternalThreadEntry();
+    void read_one(db::Cursor* cursor, QueuePair* qp);
+
+    const LayerParameter param_;
+    BlockingQueue<shared_ptr<QueuePair> > new_queue_pairs_;
+
+    friend class DataReader;
+
+  DISABLE_COPY_AND_ASSIGN(Body);
+  };
+
+  // A source is uniquely identified by its layer name + path, in case
+  // the same database is read from two different locations in the net.
+  static inline string source_key(const LayerParameter& param) {
+    return param.name() + ":" + param.data_param().source();
+  }
+
+  const shared_ptr<QueuePair> queue_pair_;
+  shared_ptr<Body> body_;
+
+  static map<const string, boost::weak_ptr<DataReader::Body> > bodies_;
+
+DISABLE_COPY_AND_ASSIGN(DataReader);
+};
+
+}  // namespace caffe
+
+#endif  // CAFFE_DATA_READER_HPP_
diff --git a/src/caffe/data_reader.cpp b/src/caffe/data_reader.cpp
new file mode 100644 (file)
index 0000000..60606f0
--- /dev/null
@@ -0,0 +1,121 @@
+#include <boost/thread.hpp>
+#include <map>
+#include <string>
+#include <vector>
+
+#include "caffe/common.hpp"
+#include "caffe/data_layers.hpp"
+#include "caffe/data_reader.hpp"
+#include "caffe/proto/caffe.pb.h"
+
+namespace caffe {
+
+using boost::weak_ptr;
+
+map<const string, weak_ptr<DataReader::Body> > DataReader::bodies_;
+static boost::mutex bodies_mutex_;
+
+DataReader::DataReader(const LayerParameter& param)
+    : queue_pair_(new QueuePair(  //
+        param.data_param().prefetch() * param.data_param().batch_size())) {
+  // Get or create a body
+  boost::mutex::scoped_lock lock(bodies_mutex_);
+  string key = source_key(param);
+  weak_ptr<Body>& weak = bodies_[key];
+  body_ = weak.lock();
+  if (!body_) {
+    body_.reset(new Body(param));
+    bodies_[key] = weak_ptr<Body>(body_);
+  }
+  body_->new_queue_pairs_.push(queue_pair_);
+}
+
+DataReader::~DataReader() {
+  string key = source_key(body_->param_);
+  body_.reset();
+  boost::mutex::scoped_lock lock(bodies_mutex_);
+  if (bodies_[key].expired()) {
+    bodies_.erase(key);
+  }
+}
+
+//
+
+DataReader::QueuePair::QueuePair(int size) {
+  // Initialize the free queue with requested number of datums
+  for (int i = 0; i < size; ++i) {
+    free_.push(new Datum());
+  }
+}
+
+DataReader::QueuePair::~QueuePair() {
+  Datum* datum;
+  while (free_.try_pop(&datum)) {
+    delete datum;
+  }
+  while (full_.try_pop(&datum)) {
+    delete datum;
+  }
+}
+
+//
+
+DataReader::Body::Body(const LayerParameter& param)
+    : param_(param),
+      new_queue_pairs_() {
+  StartInternalThread();
+}
+
+DataReader::Body::~Body() {
+  StopInternalThread();
+}
+
+void DataReader::Body::InternalThreadEntry() {
+  shared_ptr<db::DB> db(db::GetDB(param_.data_param().backend()));
+  db->Open(param_.data_param().source(), db::READ);
+  shared_ptr<db::Cursor> cursor(db->NewCursor());
+  vector<shared_ptr<QueuePair> > qps;
+  try {
+    // int solver_count = param_.phase() == TRAIN ? Caffe::solver_count() : 1;
+    // TODO single solver until multi-gpu merge
+    int solver_count = 1;
+
+    // To ensure deterministic runs, only start running once all solvers
+    // are ready. But solvers need to peek on one item during initialization,
+    // so read one item, then wait for the next solver.
+    for (int i = 0; i < solver_count; ++i) {
+      shared_ptr<QueuePair> qp(new_queue_pairs_.pop());
+      read_one(cursor.get(), qp.get());
+      qps.push_back(qp);
+    }
+    // Main loop
+    while (!must_stop()) {
+      for (int i = 0; i < solver_count; ++i) {
+        read_one(cursor.get(), qps[i].get());
+      }
+      // Check no additional readers have been created. This can happen if
+      // more than one net is trained at a time per process, whether single
+      // or multi solver. It might also happen if two data layers have same
+      // name and same source.
+      CHECK_EQ(new_queue_pairs_.size(), 0);
+    }
+  } catch (boost::thread_interrupted&) {
+    // Interrupted exception is expected on shutdown
+  }
+}
+
+void DataReader::Body::read_one(db::Cursor* cursor, QueuePair* qp) {
+  Datum* datum = qp->free_.pop();
+  // TODO deserialize in-place instead of copy?
+  datum->ParseFromString(cursor->value());
+  qp->full_.push(datum);
+
+  // go to the next iter
+  cursor->Next();
+  if (!cursor->valid()) {
+    DLOG(INFO) << "Restarting data prefetching from start.";
+    cursor->SeekToFirst();
+  }
+}
+
+}  // namespace caffe
index 9288d91..20f76f6 100644 (file)
@@ -108,7 +108,7 @@ void BasePrefetchingDataLayer<Dtype>::Forward_cpu(
   DLOG(INFO) << "Prefetch copied";
   if (this->output_labels_) {
     // Reshape to loaded labels.
-    top[1]->ReshapeLike(prefetch_label_);
+    top[1]->ReshapeLike(batch->label_);
     // Copy the labels.
     caffe_copy(batch->label_.count(), batch->label_.cpu_data(),
         top[1]->mutable_cpu_data());
index 22d9f43..0932d9f 100644 (file)
 #include "caffe/proto/caffe.pb.h"
 #include "caffe/util/benchmark.hpp"
 #include "caffe/util/io.hpp"
-#include "caffe/util/math_functions.hpp"
-#include "caffe/util/rng.hpp"
 
 namespace caffe {
 
 template <typename Dtype>
+DataLayer<Dtype>::DataLayer(const LayerParameter& param)
+  : BasePrefetchingDataLayer<Dtype>(param),
+    reader_(param) {
+}
+
+template <typename Dtype>
 DataLayer<Dtype>::~DataLayer() {
   this->StopInternalThread();
 }
@@ -24,31 +28,19 @@ DataLayer<Dtype>::~DataLayer() {
 template <typename Dtype>
 void DataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
       const vector<Blob<Dtype>*>& top) {
-  // Initialize DB
-  db_.reset(db::GetDB(this->layer_param_.data_param().backend()));
-  db_->Open(this->layer_param_.data_param().source(), db::READ);
-  cursor_.reset(db_->NewCursor());
+  const int batch_size = this->layer_param_.data_param().batch_size();
+  // Read a data point, and use it to initialize the top blob.
+  Datum& datum = *(reader_.full().peek());
 
-  // Check if we should randomly skip a few data points
-  if (this->layer_param_.data_param().rand_skip()) {
-    unsigned int skip = caffe_rng_rand() %
-                        this->layer_param_.data_param().rand_skip();
-    LOG(INFO) << "Skipping first " << skip << " data points.";
-    while (skip-- > 0) {
-      cursor_->Next();
-    }
-  }
-  // Read a data point, to initialize the prefetch and top blobs.
-  Datum datum;
-  datum.ParseFromString(cursor_->value());
   // Use data_transformer to infer the expected blob shape from datum.
   vector<int> top_shape = this->data_transformer_->InferBlobShape(datum);
   this->transformed_data_.Reshape(top_shape);
   // Reshape top[0] and prefetch_data according to the batch_size.
-  top_shape[0] = this->layer_param_.data_param().batch_size();
-  this->prefetch_data_.Reshape(top_shape);
-  top[0]->ReshapeLike(this->prefetch_data_);
-
+  top_shape[0] = batch_size;
+  top[0]->Reshape(top_shape);
+  for (int i = 0; i < this->PREFETCH_COUNT; ++i) {
+    this->prefetch_[i].data_.Reshape(top_shape);
+  }
   LOG(INFO) << "output data size: " << top[0]->num() << ","
       << top[0]->channels() << "," << top[0]->height() << ","
       << top[0]->width();
@@ -76,12 +68,11 @@ void DataLayer<Dtype>::load_batch(Batch<Dtype>* batch) {
   // Reshape according to the first datum of each batch
   // on single input batches allows for inputs of varying dimension.
   const int batch_size = this->layer_param_.data_param().batch_size();
-  Datum datum;
-  datum.ParseFromString(cursor_->value());
+  Datum& datum = *(reader_.full().peek());
   // Use data_transformer to infer the expected blob shape from datum.
   vector<int> top_shape = this->data_transformer_->InferBlobShape(datum);
   this->transformed_data_.Reshape(top_shape);
-  // Reshape prefetch_data according to the batch_size.
+  // Reshape batch according to the batch_size.
   top_shape[0] = batch_size;
   batch->data_.Reshape(top_shape);
 
@@ -91,11 +82,10 @@ void DataLayer<Dtype>::load_batch(Batch<Dtype>* batch) {
   if (this->output_labels_) {
     top_label = batch->label_.mutable_cpu_data();
   }
-  timer.Start();
   for (int item_id = 0; item_id < batch_size; ++item_id) {
+    timer.Start();
     // get a datum
-    Datum datum;
-    datum.ParseFromString(cursor_->value());
+    Datum& datum = *(reader_.full().pop("Waiting for data"));
     read_time += timer.MicroSeconds();
     timer.Start();
     // Apply data transformations (mirror, scale, crop...)
@@ -107,13 +97,8 @@ void DataLayer<Dtype>::load_batch(Batch<Dtype>* batch) {
       top_label[item_id] = datum.label();
     }
     trans_time += timer.MicroSeconds();
-    timer.Start();
-    // go to the next item.
-    cursor_->Next();
-    if (!cursor_->valid()) {
-      DLOG(INFO) << "Restarting data prefetching from start.";
-      cursor_->SeekToFirst();
-    }
+
+    reader_.free().push(const_cast<Datum*>(&datum));
   }
   timer.Stop();
   batch_timer.Stop();
index 89f1459..4116541 100644 (file)
@@ -500,6 +500,7 @@ message DataParameter {
   // to avoid all asynchronous sgd clients to start at the same point. The skip
   // point would be set as rand_skip * rand(0,1). Note that rand_skip should not
   // be larger than the number of keys in the database.
+  // DEPRECATED. Each solver accesses a different subset of the database.
   optional uint32 rand_skip = 7 [default = 0];
   optional DB backend = 8 [default = LEVELDB];
   // DEPRECATED. See TransformationParameter. For data pre-processing, we can do
@@ -515,6 +516,9 @@ message DataParameter {
   optional bool mirror = 6 [default = false];
   // Force the encoded image to have 3 color channels
   optional bool force_encoded_color = 9 [default = false];
+  // Prefetch queue (Number of batches to prefetch to host memory, increase if
+  // data access bandwidth varies).
+  optional uint32 prefetch = 10 [default = 4];
 }
 
 message DropoutParameter {
index efb1b37..c86fafd 100644 (file)
@@ -1,11 +1,14 @@
 #include <map>
 #include <string>
 
+#include "boost/scoped_ptr.hpp"
 #include "gtest/gtest.h"
 
 #include "caffe/common.hpp"
 #include "caffe/layer.hpp"
 #include "caffe/layer_factory.hpp"
+#include "caffe/util/db.hpp"
+#include "caffe/util/io.hpp"
 
 #include "caffe/test/test_caffe_main.hpp"
 
@@ -21,11 +24,20 @@ TYPED_TEST(LayerFactoryTest, TestCreateLayer) {
   typename LayerRegistry<Dtype>::CreatorRegistry& registry =
       LayerRegistry<Dtype>::Registry();
   shared_ptr<Layer<Dtype> > layer;
-  LayerParameter layer_param;
   for (typename LayerRegistry<Dtype>::CreatorRegistry::iterator iter =
        registry.begin(); iter != registry.end(); ++iter) {
     // Special case: PythonLayer is checked by pytest
     if (iter->first == "Python") { continue; }
+    LayerParameter layer_param;
+    // Data layers expect a DB
+    if (iter->first == "Data") {
+      string tmp;
+      MakeTempDir(&tmp);
+      boost::scoped_ptr<db::DB> db(db::GetDB(DataParameter_DB_LEVELDB));
+      db->Open(tmp, db::NEW);
+      db->Close();
+      layer_param.mutable_data_param()->set_source(tmp);
+    }
     layer_param.set_type(iter->first);
     layer = LayerRegistry<Dtype>::CreateLayer(layer_param);
     EXPECT_EQ(iter->first, layer->type());
index eec6276..0067202 100644 (file)
@@ -2,12 +2,15 @@
 #include <string>
 #include <vector>
 
+#include "boost/scoped_ptr.hpp"
 #include "google/protobuf/text_format.h"
 #include "gtest/gtest.h"
 
 #include "caffe/blob.hpp"
 #include "caffe/common.hpp"
 #include "caffe/layer.hpp"
+#include "caffe/util/db.hpp"
+#include "caffe/util/io.hpp"
 #include "caffe/util/upgrade_proto.hpp"
 
 #include "caffe/test/test_caffe_main.hpp"
@@ -2901,6 +2904,15 @@ TEST_F(NetUpgradeTest, TestUpgradeV1LayerType) {
       continue;  // Empty string isn't actually a valid layer type.
     }
     layer_param.set_type(v2_layer_type);
+    // Data layers expect a DB
+    if (v2_layer_type == "Data") {
+      string tmp;
+      MakeTempDir(&tmp);
+      boost::scoped_ptr<db::DB> db(db::GetDB(DataParameter_DB_LEVELDB));
+      db->Open(tmp, db::NEW);
+      db->Close();
+      layer_param.mutable_data_param()->set_source(tmp);
+    }
     layer = LayerRegistry<float>::CreateLayer(layer_param);
     EXPECT_EQ(v2_layer_type, layer->type());
   }
index 6ab6ba0..f7c53f2 100644 (file)
@@ -2,6 +2,7 @@
 #include <string>
 
 #include "caffe/data_layers.hpp"
+#include "caffe/data_reader.hpp"
 #include "caffe/util/blocking_queue.hpp"
 
 namespace caffe {
@@ -86,5 +87,7 @@ size_t BlockingQueue<T>::size() const {
 
 template class BlockingQueue<Batch<float>*>;
 template class BlockingQueue<Batch<double>*>;
+template class BlockingQueue<Datum*>;
+template class BlockingQueue<shared_ptr<DataReader::QueuePair> >;
 
 }  // namespace caffe