// This method may not be overridden except by the BasePrefetchingDataLayer.
virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
+ // Data layers should be shared by multiple solvers in parallel
+ virtual inline bool ShareInParallel() const { return true; }
virtual void DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {}
// Data layers have no bottoms, so reshaping is trivial.
virtual ~DataLayer();
virtual void DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
-
+ // DataLayer uses DataReader instead for sharing for parallelism
+ virtual inline bool ShareInParallel() const { return false; }
virtual inline const char* type() const { return "Data"; }
virtual inline int ExactNumBottomBlobs() const { return 0; }
virtual inline int MinTopBlobs() const { return 1; }
: Layer<Dtype>(param) {}
virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
+ // Data layers should be shared by multiple solvers in parallel
+ virtual inline bool ShareInParallel() const { return true; }
// Data layers have no bottoms, so reshaping is trivial.
virtual void Reshape(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {}
virtual ~HDF5DataLayer();
virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
+ // Data layers should be shared by multiple solvers in parallel
+ virtual inline bool ShareInParallel() const { return true; }
// Data layers have no bottoms, so reshaping is trivial.
virtual void Reshape(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {}
virtual ~HDF5OutputLayer();
virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
+ // Data layers should be shared by multiple solvers in parallel
+ virtual inline bool ShareInParallel() const { return true; }
// Data layers have no bottoms, so reshaping is trivial.
virtual void Reshape(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {}
#ifndef CAFFE_LAYER_H_
#define CAFFE_LAYER_H_
+#include <boost/thread.hpp>
#include <algorithm>
#include <string>
#include <vector>
const vector<Blob<Dtype>*>& top) {}
/**
+ * @brief Whether a layer should be shared by multiple nets during data
+ * parallelism. By default, all layers except for data layers should
+ * not be shared. data layers should be shared to ensure each worker
+ * solver access data sequentially during data parallelism.
+ */
+ virtual inline bool ShareInParallel() const { return false; }
+
+ /**
* @brief Adjust the shapes of top blobs and internal buffers to accommodate
* the shapes of the bottom blobs.
*
}
}
+ private:
+ // mutex to lock layer to ensure sequential forward
+ boost::mutex forward_mutex_;
+
DISABLE_COPY_AND_ASSIGN(Layer);
}; // class Layer
template <typename Dtype>
inline Dtype Layer<Dtype>::Forward(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {
+ // Lock during forward to ensure sequential forward
+ boost::mutex::scoped_lock lock(forward_mutex_);
Dtype loss = 0;
Reshape(bottom, top);
switch (Caffe::mode()) {
template <typename Dtype>
class Net {
public:
- explicit Net(const NetParameter& param);
- explicit Net(const string& param_file, Phase phase);
+ explicit Net(const NetParameter& param, const Net* root_net = NULL);
+ explicit Net(const string& param_file, Phase phase,
+ const Net* root_net = NULL);
virtual ~Net() {}
/// @brief Initialize a network with a NetParameter.
size_t memory_used_;
/// Whether to compute and display debug info for the net.
bool debug_info_;
-
+ /// The root net that actually holds the shared layers in data parallelism
+ const Net* const root_net_;
DISABLE_COPY_AND_ASSIGN(Net);
};
self_.attr("reshape")(bottom, top);
}
+ virtual inline bool ShareInParallel() const {
+ return this->layer_param_.python_param().share_in_parallel();
+ }
+
virtual inline const char* type() const { return "Python"; }
protected:
template <typename Dtype>
class Solver {
public:
- explicit Solver(const SolverParameter& param);
- explicit Solver(const string& param_file);
+ explicit Solver(const SolverParameter& param,
+ const Solver* root_solver = NULL);
+ explicit Solver(const string& param_file, const Solver* root_solver = NULL);
void Init(const SolverParameter& param);
void InitTrainNet();
void InitTestNets();
vector<shared_ptr<Net<Dtype> > > test_nets_;
vector<Callback*> callbacks_;
+ // The root solver that holds root nets (actually containing shared layers)
+ // in data parallelism
+ const Solver* const root_solver_;
+
DISABLE_COPY_AND_ASSIGN(Solver);
};
template <typename Dtype>
class WorkerSolver : public Solver<Dtype> {
public:
- explicit WorkerSolver(const SolverParameter& param)
- : Solver<Dtype>(param) {}
+ explicit WorkerSolver(const SolverParameter& param,
+ const Solver<Dtype>* root_solver = NULL)
+ : Solver<Dtype>(param, root_solver) {}
protected:
void ApplyUpdate() {}
namespace caffe {
template <typename Dtype>
-Net<Dtype>::Net(const NetParameter& param) {
+Net<Dtype>::Net(const NetParameter& param, const Net* root_net)
+ : root_net_(root_net) {
Init(param);
}
template <typename Dtype>
-Net<Dtype>::Net(const string& param_file, Phase phase) {
+Net<Dtype>::Net(const string& param_file, Phase phase, const Net* root_net)
+ : root_net_(root_net) {
NetParameter param;
ReadNetParamsFromTextFileOrDie(param_file, ¶m);
param.mutable_state()->set_phase(phase);
template <typename Dtype>
void Net<Dtype>::Init(const NetParameter& in_param) {
+ CHECK(Caffe::root_solver() || root_net_)
+ << "root_net_ needs to be set for all non-root solvers";
// Set phase from the state.
phase_ = in_param.state().phase();
// Filter layers based on their include/exclude rules and
top_id_vecs_.resize(param.layer_size());
bottom_need_backward_.resize(param.layer_size());
for (int layer_id = 0; layer_id < param.layer_size(); ++layer_id) {
+ // For non-root solvers, whether this layer is shared from root_net_.
+ bool is_shared_layer = !Caffe::root_solver()
+ && root_net_->layers_[layer_id]->ShareInParallel();
// Inherit phase from net if unset.
if (!param.layer(layer_id).has_phase()) {
param.mutable_layer(layer_id)->set_phase(phase_);
<< "propagate_down param must be specified "
<< "either 0 or bottom_size times ";
}
- layers_.push_back(LayerRegistry<Dtype>::CreateLayer(layer_param));
+ if (is_shared_layer) {
+ LOG(INFO) << "Sharing layer " << layer_param.name() << " from root net";
+ layers_.push_back(root_net_->layers_[layer_id]);
+ } else {
+ layers_.push_back(LayerRegistry<Dtype>::CreateLayer(layer_param));
+ }
layer_names_.push_back(layer_param.name());
if (Caffe::root_solver()) {
LOG(INFO) << "Creating Layer " << layer_param.name();
}
}
// After this layer is connected, set it up.
+ if (is_shared_layer) {
+ // Set up size of top blobs using root_net_
+ const vector<Blob<Dtype>*>& base_top = root_net_->top_vecs_[layer_id];
+ const vector<Blob<Dtype>*>& this_top = this->top_vecs_[layer_id];
+ for (int top_id = 0; top_id < base_top.size(); ++top_id) {
+ this_top[top_id]->ReshapeLike(*base_top[top_id]);
+ LOG(INFO) << "Created top blob " << top_id << " (shape: "
+ << this_top[top_id]->shape_string() << ") for shared layer "
+ << layer_param.name();
+ }
+ } else {
+ layers_[layer_id]->SetUp(bottom_vecs_[layer_id], top_vecs_[layer_id]);
+ }
if (Caffe::root_solver()) {
LOG(INFO) << "Setting up " << layer_names_[layer_id];
}
- layers_[layer_id]->SetUp(bottom_vecs_[layer_id], top_vecs_[layer_id]);
for (int top_id = 0; top_id < top_vecs_[layer_id].size(); ++top_id) {
if (blob_loss_weights_.size() <= top_id_vecs_[layer_id][top_id]) {
blob_loss_weights_.resize(top_id_vecs_[layer_id][top_id] + 1, Dtype(0));
solver_ = root_solver;
} else {
Caffe::set_root_solver(false);
- solver_.reset(new WorkerSolver<Dtype>(param));
+ solver_.reset(new WorkerSolver<Dtype>(param, root_solver.get()));
Caffe::set_root_solver(true);
}
this->configure(solver_.get());
INSTANTIATE_CLASS(P2PSync);
} // namespace caffe
-
// string, dictionary in Python dict format, JSON, etc. You may parse this
// string in `setup` method and use it in `forward` and `backward`.
optional string param_str = 3 [default = ''];
+ // Whether this PythonLayer is shared among worker solvers during data parallelism.
+ // If true, each worker solver sequentially run forward from this layer.
+ // This value should be set true if you are using it as a data layer.
+ optional bool share_in_parallel = 4 [default = false];
}
// Message that stores parameters used by ReductionLayer
namespace caffe {
template <typename Dtype>
-Solver<Dtype>::Solver(const SolverParameter& param)
- : net_(), callbacks_() {
+Solver<Dtype>::Solver(const SolverParameter& param, const Solver* root_solver)
+ : net_(), callbacks_(), root_solver_(root_solver) {
Init(param);
}
template <typename Dtype>
-Solver<Dtype>::Solver(const string& param_file)
- : net_(), callbacks_() {
+Solver<Dtype>::Solver(const string& param_file, const Solver* root_solver)
+ : net_(), callbacks_(), root_solver_(root_solver) {
SolverParameter param;
ReadProtoFromTextFileOrDie(param_file, ¶m);
Init(param);
template <typename Dtype>
void Solver<Dtype>::Init(const SolverParameter& param) {
+ CHECK(Caffe::root_solver() || root_solver_)
+ << "root_solver_ needs to be set for all non-root solvers";
LOG_IF(INFO, Caffe::root_solver()) << "Initializing solver from parameters: "
<< std::endl << param.DebugString();
param_ = param;
net_state.MergeFrom(net_param.state());
net_state.MergeFrom(param_.train_state());
net_param.mutable_state()->CopyFrom(net_state);
- net_.reset(new Net<Dtype>(net_param));
+ if (Caffe::root_solver()) {
+ net_.reset(new Net<Dtype>(net_param));
+ } else {
+ net_.reset(new Net<Dtype>(net_param, root_solver_->net_.get()));
+ }
}
template <typename Dtype>
net_params[i].mutable_state()->CopyFrom(net_state);
LOG(INFO)
<< "Creating test net (#" << i << ") specified by " << sources[i];
- test_nets_[i].reset(new Net<Dtype>(net_params[i]));
+ if (Caffe::root_solver()) {
+ test_nets_[i].reset(new Net<Dtype>(net_params[i]));
+ } else {
+ test_nets_[i].reset(new Net<Dtype>(net_params[i],
+ root_solver_->test_nets_[i].get()));
+ }
test_nets_[i]->set_debug_info(param_.debug_info());
}
}