Data Layers Parallel for Multi-GPU
authorRonghang Hu <huronghang@hotmail.com>
Wed, 12 Aug 2015 04:38:06 +0000 (21:38 -0700)
committerRonghang Hu <huronghang@hotmail.com>
Wed, 12 Aug 2015 17:51:45 +0000 (10:51 -0700)
Allow data layers (and also PythonLayer when used as data layer) to be shared
among worker solver's training net, and also test net for future-proof if one
wants to do Multi-GPU testing. Data layers are locked during forward to ensure
sequential forward.

include/caffe/data_layers.hpp
include/caffe/layer.hpp
include/caffe/net.hpp
include/caffe/python_layer.hpp
include/caffe/solver.hpp
src/caffe/net.cpp
src/caffe/parallel.cpp
src/caffe/proto/caffe.proto
src/caffe/solver.cpp

index 12e6c36..552d814 100644 (file)
@@ -34,6 +34,8 @@ class BaseDataLayer : public Layer<Dtype> {
   // This method may not be overridden except by the BasePrefetchingDataLayer.
   virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
       const vector<Blob<Dtype>*>& top);
+  // Data layers should be shared by multiple solvers in parallel
+  virtual inline bool ShareInParallel() const { return true; }
   virtual void DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
       const vector<Blob<Dtype>*>& top) {}
   // Data layers have no bottoms, so reshaping is trivial.
@@ -94,7 +96,8 @@ class DataLayer : public BasePrefetchingDataLayer<Dtype> {
   virtual ~DataLayer();
   virtual void DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
       const vector<Blob<Dtype>*>& top);
-
+  // DataLayer uses DataReader instead for sharing for parallelism
+  virtual inline bool ShareInParallel() const { return false; }
   virtual inline const char* type() const { return "Data"; }
   virtual inline int ExactNumBottomBlobs() const { return 0; }
   virtual inline int MinTopBlobs() const { return 1; }
@@ -118,6 +121,8 @@ class DummyDataLayer : public Layer<Dtype> {
       : Layer<Dtype>(param) {}
   virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
       const vector<Blob<Dtype>*>& top);
+  // Data layers should be shared by multiple solvers in parallel
+  virtual inline bool ShareInParallel() const { return true; }
   // Data layers have no bottoms, so reshaping is trivial.
   virtual void Reshape(const vector<Blob<Dtype>*>& bottom,
       const vector<Blob<Dtype>*>& top) {}
@@ -151,6 +156,8 @@ class HDF5DataLayer : public Layer<Dtype> {
   virtual ~HDF5DataLayer();
   virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
       const vector<Blob<Dtype>*>& top);
+  // Data layers should be shared by multiple solvers in parallel
+  virtual inline bool ShareInParallel() const { return true; }
   // Data layers have no bottoms, so reshaping is trivial.
   virtual void Reshape(const vector<Blob<Dtype>*>& bottom,
       const vector<Blob<Dtype>*>& top) {}
@@ -192,6 +199,8 @@ class HDF5OutputLayer : public Layer<Dtype> {
   virtual ~HDF5OutputLayer();
   virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
       const vector<Blob<Dtype>*>& top);
+  // Data layers should be shared by multiple solvers in parallel
+  virtual inline bool ShareInParallel() const { return true; }
   // Data layers have no bottoms, so reshaping is trivial.
   virtual void Reshape(const vector<Blob<Dtype>*>& bottom,
       const vector<Blob<Dtype>*>& top) {}
index 0771b6a..d82197a 100644 (file)
@@ -1,6 +1,7 @@
 #ifndef CAFFE_LAYER_H_
 #define CAFFE_LAYER_H_
 
+#include <boost/thread.hpp>
 #include <algorithm>
 #include <string>
 #include <vector>
@@ -86,6 +87,14 @@ class Layer {
       const vector<Blob<Dtype>*>& top) {}
 
   /**
+   * @brief Whether a layer should be shared by multiple nets during data
+   *        parallelism. By default, all layers except for data layers should
+   *        not be shared. data layers should be shared to ensure each worker
+   *        solver access data sequentially during data parallelism.
+   */
+  virtual inline bool ShareInParallel() const { return false; }
+
+  /**
    * @brief Adjust the shapes of top blobs and internal buffers to accommodate
    *        the shapes of the bottom blobs.
    *
@@ -396,6 +405,10 @@ class Layer {
     }
   }
 
+ private:
+  // mutex to lock layer to ensure sequential forward
+  boost::mutex forward_mutex_;
+
   DISABLE_COPY_AND_ASSIGN(Layer);
 };  // class Layer
 
@@ -405,6 +418,8 @@ class Layer {
 template <typename Dtype>
 inline Dtype Layer<Dtype>::Forward(const vector<Blob<Dtype>*>& bottom,
     const vector<Blob<Dtype>*>& top) {
+  // Lock during forward to ensure sequential forward
+  boost::mutex::scoped_lock lock(forward_mutex_);
   Dtype loss = 0;
   Reshape(bottom, top);
   switch (Caffe::mode()) {
index bf99755..1bf07d2 100644 (file)
@@ -23,8 +23,9 @@ namespace caffe {
 template <typename Dtype>
 class Net {
  public:
-  explicit Net(const NetParameter& param);
-  explicit Net(const string& param_file, Phase phase);
+  explicit Net(const NetParameter& param, const Net* root_net = NULL);
+  explicit Net(const string& param_file, Phase phase,
+      const Net* root_net = NULL);
   virtual ~Net() {}
 
   /// @brief Initialize a network with a NetParameter.
@@ -291,7 +292,8 @@ class Net {
   size_t memory_used_;
   /// Whether to compute and display debug info for the net.
   bool debug_info_;
-
+  /// The root net that actually holds the shared layers in data parallelism
+  const Net* const root_net_;
   DISABLE_COPY_AND_ASSIGN(Net);
 };
 
index 2957e74..c43c1e8 100644 (file)
@@ -27,6 +27,10 @@ class PythonLayer : public Layer<Dtype> {
     self_.attr("reshape")(bottom, top);
   }
 
+  virtual inline bool ShareInParallel() const {
+    return this->layer_param_.python_param().share_in_parallel();
+  }
+
   virtual inline const char* type() const { return "Python"; }
 
  protected:
index 89a6c76..f583324 100644 (file)
@@ -17,8 +17,9 @@ namespace caffe {
 template <typename Dtype>
 class Solver {
  public:
-  explicit Solver(const SolverParameter& param);
-  explicit Solver(const string& param_file);
+  explicit Solver(const SolverParameter& param,
+      const Solver* root_solver = NULL);
+  explicit Solver(const string& param_file, const Solver* root_solver = NULL);
   void Init(const SolverParameter& param);
   void InitTrainNet();
   void InitTestNets();
@@ -79,6 +80,10 @@ class Solver {
   vector<shared_ptr<Net<Dtype> > > test_nets_;
   vector<Callback*> callbacks_;
 
+  // The root solver that holds root nets (actually containing shared layers)
+  // in data parallelism
+  const Solver* const root_solver_;
+
   DISABLE_COPY_AND_ASSIGN(Solver);
 };
 
@@ -89,8 +94,9 @@ class Solver {
 template <typename Dtype>
 class WorkerSolver : public Solver<Dtype> {
  public:
-  explicit WorkerSolver(const SolverParameter& param)
-      : Solver<Dtype>(param) {}
+  explicit WorkerSolver(const SolverParameter& param,
+      const Solver<Dtype>* root_solver = NULL)
+      : Solver<Dtype>(param, root_solver) {}
 
  protected:
   void ApplyUpdate() {}
index 5d0f432..14f8385 100644 (file)
 namespace caffe {
 
 template <typename Dtype>
-Net<Dtype>::Net(const NetParameter& param) {
+Net<Dtype>::Net(const NetParameter& param, const Net* root_net)
+    : root_net_(root_net) {
   Init(param);
 }
 
 template <typename Dtype>
-Net<Dtype>::Net(const string& param_file, Phase phase) {
+Net<Dtype>::Net(const string& param_file, Phase phase, const Net* root_net)
+    : root_net_(root_net) {
   NetParameter param;
   ReadNetParamsFromTextFileOrDie(param_file, &param);
   param.mutable_state()->set_phase(phase);
@@ -36,6 +38,8 @@ Net<Dtype>::Net(const string& param_file, Phase phase) {
 
 template <typename Dtype>
 void Net<Dtype>::Init(const NetParameter& in_param) {
+  CHECK(Caffe::root_solver() || root_net_)
+      << "root_net_ needs to be set for all non-root solvers";
   // Set phase from the state.
   phase_ = in_param.state().phase();
   // Filter layers based on their include/exclude rules and
@@ -79,6 +83,9 @@ void Net<Dtype>::Init(const NetParameter& in_param) {
   top_id_vecs_.resize(param.layer_size());
   bottom_need_backward_.resize(param.layer_size());
   for (int layer_id = 0; layer_id < param.layer_size(); ++layer_id) {
+    // For non-root solvers, whether this layer is shared from root_net_.
+    bool is_shared_layer = !Caffe::root_solver()
+        && root_net_->layers_[layer_id]->ShareInParallel();
     // Inherit phase from net if unset.
     if (!param.layer(layer_id).has_phase()) {
       param.mutable_layer(layer_id)->set_phase(phase_);
@@ -91,7 +98,12 @@ void Net<Dtype>::Init(const NetParameter& in_param) {
           << "propagate_down param must be specified "
           << "either 0 or bottom_size times ";
     }
-    layers_.push_back(LayerRegistry<Dtype>::CreateLayer(layer_param));
+    if (is_shared_layer) {
+      LOG(INFO) << "Sharing layer " << layer_param.name() << " from root net";
+      layers_.push_back(root_net_->layers_[layer_id]);
+    } else {
+      layers_.push_back(LayerRegistry<Dtype>::CreateLayer(layer_param));
+    }
     layer_names_.push_back(layer_param.name());
     if (Caffe::root_solver()) {
       LOG(INFO) << "Creating Layer " << layer_param.name();
@@ -125,10 +137,22 @@ void Net<Dtype>::Init(const NetParameter& in_param) {
       }
     }
     // After this layer is connected, set it up.
+    if (is_shared_layer) {
+      // Set up size of top blobs using root_net_
+      const vector<Blob<Dtype>*>& base_top = root_net_->top_vecs_[layer_id];
+      const vector<Blob<Dtype>*>& this_top = this->top_vecs_[layer_id];
+      for (int top_id = 0; top_id < base_top.size(); ++top_id) {
+        this_top[top_id]->ReshapeLike(*base_top[top_id]);
+        LOG(INFO) << "Created top blob " << top_id << " (shape: "
+            << this_top[top_id]->shape_string() <<  ") for shared layer "
+            << layer_param.name();
+      }
+    } else {
+      layers_[layer_id]->SetUp(bottom_vecs_[layer_id], top_vecs_[layer_id]);
+    }
     if (Caffe::root_solver()) {
       LOG(INFO) << "Setting up " << layer_names_[layer_id];
     }
-    layers_[layer_id]->SetUp(bottom_vecs_[layer_id], top_vecs_[layer_id]);
     for (int top_id = 0; top_id < top_vecs_[layer_id].size(); ++top_id) {
       if (blob_loss_weights_.size() <= top_id_vecs_[layer_id][top_id]) {
         blob_loss_weights_.resize(top_id_vecs_[layer_id][top_id] + 1, Dtype(0));
index 5a08df6..6e7d802 100644 (file)
@@ -218,7 +218,7 @@ P2PSync<Dtype>::P2PSync(shared_ptr<Solver<Dtype> > root_solver,
     solver_ = root_solver;
   } else {
     Caffe::set_root_solver(false);
-    solver_.reset(new WorkerSolver<Dtype>(param));
+    solver_.reset(new WorkerSolver<Dtype>(param, root_solver.get()));
     Caffe::set_root_solver(true);
   }
   this->configure(solver_.get());
@@ -436,4 +436,3 @@ INSTANTIATE_CLASS(GPUParams);
 INSTANTIATE_CLASS(P2PSync);
 
 }  // namespace caffe
-
index 4116541..e78c668 100644 (file)
@@ -740,6 +740,10 @@ message PythonParameter {
   // string, dictionary in Python dict format, JSON, etc. You may parse this
   // string in `setup` method and use it in `forward` and `backward`.
   optional string param_str = 3 [default = ''];
+  // Whether this PythonLayer is shared among worker solvers during data parallelism.
+  // If true, each worker solver sequentially run forward from this layer.
+  // This value should be set true if you are using it as a data layer.
+  optional bool share_in_parallel = 4 [default = false];
 }
 
 // Message that stores parameters used by ReductionLayer
index b6fd6b6..a44ff88 100644 (file)
 namespace caffe {
 
 template <typename Dtype>
-Solver<Dtype>::Solver(const SolverParameter& param)
-    : net_(), callbacks_() {
+Solver<Dtype>::Solver(const SolverParameter& param, const Solver* root_solver)
+    : net_(), callbacks_(), root_solver_(root_solver) {
   Init(param);
 }
 
 template <typename Dtype>
-Solver<Dtype>::Solver(const string& param_file)
-    : net_(), callbacks_() {
+Solver<Dtype>::Solver(const string& param_file, const Solver* root_solver)
+    : net_(), callbacks_(), root_solver_(root_solver) {
   SolverParameter param;
   ReadProtoFromTextFileOrDie(param_file, &param);
   Init(param);
@@ -33,6 +33,8 @@ Solver<Dtype>::Solver(const string& param_file)
 
 template <typename Dtype>
 void Solver<Dtype>::Init(const SolverParameter& param) {
+  CHECK(Caffe::root_solver() || root_solver_)
+      << "root_solver_ needs to be set for all non-root solvers";
   LOG_IF(INFO, Caffe::root_solver()) << "Initializing solver from parameters: "
     << std::endl << param.DebugString();
   param_ = param;
@@ -88,7 +90,11 @@ void Solver<Dtype>::InitTrainNet() {
   net_state.MergeFrom(net_param.state());
   net_state.MergeFrom(param_.train_state());
   net_param.mutable_state()->CopyFrom(net_state);
-  net_.reset(new Net<Dtype>(net_param));
+  if (Caffe::root_solver()) {
+    net_.reset(new Net<Dtype>(net_param));
+  } else {
+    net_.reset(new Net<Dtype>(net_param, root_solver_->net_.get()));
+  }
 }
 
 template <typename Dtype>
@@ -163,7 +169,12 @@ void Solver<Dtype>::InitTestNets() {
     net_params[i].mutable_state()->CopyFrom(net_state);
     LOG(INFO)
         << "Creating test net (#" << i << ") specified by " << sources[i];
-    test_nets_[i].reset(new Net<Dtype>(net_params[i]));
+    if (Caffe::root_solver()) {
+      test_nets_[i].reset(new Net<Dtype>(net_params[i]));
+    } else {
+      test_nets_[i].reset(new Net<Dtype>(net_params[i],
+          root_solver_->test_nets_[i].get()));
+    }
     test_nets_[i]->set_debug_info(param_.debug_info());
   }
 }