#include <utility>
#include <vector>
-#include "boost/scoped_ptr.hpp"
#include "hdf5.h"
#include "caffe/blob.hpp"
#include "caffe/common.hpp"
+#include "caffe/data_reader.hpp"
#include "caffe/data_transformer.hpp"
#include "caffe/filler.hpp"
#include "caffe/internal_thread.hpp"
template <typename Dtype>
class DataLayer : public BasePrefetchingDataLayer<Dtype> {
public:
- explicit DataLayer(const LayerParameter& param)
- : BasePrefetchingDataLayer<Dtype>(param) {}
+ explicit DataLayer(const LayerParameter& param);
virtual ~DataLayer();
virtual void DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
protected:
virtual void load_batch(Batch<Dtype>* batch);
- shared_ptr<db::DB> db_;
- shared_ptr<db::Cursor> cursor_;
+ DataReader reader_;
};
/**
--- /dev/null
+#ifndef CAFFE_DATA_READER_HPP_
+#define CAFFE_DATA_READER_HPP_
+
+#include <map>
+#include <string>
+#include <vector>
+
+#include "caffe/common.hpp"
+#include "caffe/internal_thread.hpp"
+#include "caffe/util/blocking_queue.hpp"
+#include "caffe/util/db.hpp"
+
+namespace caffe {
+
+/**
+ * @brief Reads data from a source to queues available to data layers.
+ * A single reading thread is created per source, even if multiple solvers
+ * are running in parallel, e.g. for multi-GPU training. This makes sure
+ * databases are read sequentially, and that each solver accesses a different
+ * subset of the database. Data is distributed to solvers in a round-robin
+ * way to keep parallel training deterministic.
+ */
+class DataReader {
+ public:
+ explicit DataReader(const LayerParameter& param);
+ ~DataReader();
+
+ inline BlockingQueue<Datum*>& free() const {
+ return queue_pair_->free_;
+ }
+ inline BlockingQueue<Datum*>& full() const {
+ return queue_pair_->full_;
+ }
+
+ protected:
+ // Queue pairs are shared between a body and its readers
+ class QueuePair {
+ public:
+ explicit QueuePair(int size);
+ ~QueuePair();
+
+ BlockingQueue<Datum*> free_;
+ BlockingQueue<Datum*> full_;
+
+ DISABLE_COPY_AND_ASSIGN(QueuePair);
+ };
+
+ // A single body is created per source
+ class Body : public InternalThread {
+ public:
+ explicit Body(const LayerParameter& param);
+ virtual ~Body();
+
+ protected:
+ void InternalThreadEntry();
+ void read_one(db::Cursor* cursor, QueuePair* qp);
+
+ const LayerParameter param_;
+ BlockingQueue<shared_ptr<QueuePair> > new_queue_pairs_;
+
+ friend class DataReader;
+
+ DISABLE_COPY_AND_ASSIGN(Body);
+ };
+
+ // A source is uniquely identified by its layer name + path, in case
+ // the same database is read from two different locations in the net.
+ static inline string source_key(const LayerParameter& param) {
+ return param.name() + ":" + param.data_param().source();
+ }
+
+ const shared_ptr<QueuePair> queue_pair_;
+ shared_ptr<Body> body_;
+
+ static map<const string, boost::weak_ptr<DataReader::Body> > bodies_;
+
+DISABLE_COPY_AND_ASSIGN(DataReader);
+};
+
+} // namespace caffe
+
+#endif // CAFFE_DATA_READER_HPP_
--- /dev/null
+#include <boost/thread.hpp>
+#include <map>
+#include <string>
+#include <vector>
+
+#include "caffe/common.hpp"
+#include "caffe/data_layers.hpp"
+#include "caffe/data_reader.hpp"
+#include "caffe/proto/caffe.pb.h"
+
+namespace caffe {
+
+using boost::weak_ptr;
+
+map<const string, weak_ptr<DataReader::Body> > DataReader::bodies_;
+static boost::mutex bodies_mutex_;
+
+DataReader::DataReader(const LayerParameter& param)
+ : queue_pair_(new QueuePair( //
+ param.data_param().prefetch() * param.data_param().batch_size())) {
+ // Get or create a body
+ boost::mutex::scoped_lock lock(bodies_mutex_);
+ string key = source_key(param);
+ weak_ptr<Body>& weak = bodies_[key];
+ body_ = weak.lock();
+ if (!body_) {
+ body_.reset(new Body(param));
+ bodies_[key] = weak_ptr<Body>(body_);
+ }
+ body_->new_queue_pairs_.push(queue_pair_);
+}
+
+DataReader::~DataReader() {
+ string key = source_key(body_->param_);
+ body_.reset();
+ boost::mutex::scoped_lock lock(bodies_mutex_);
+ if (bodies_[key].expired()) {
+ bodies_.erase(key);
+ }
+}
+
+//
+
+DataReader::QueuePair::QueuePair(int size) {
+ // Initialize the free queue with requested number of datums
+ for (int i = 0; i < size; ++i) {
+ free_.push(new Datum());
+ }
+}
+
+DataReader::QueuePair::~QueuePair() {
+ Datum* datum;
+ while (free_.try_pop(&datum)) {
+ delete datum;
+ }
+ while (full_.try_pop(&datum)) {
+ delete datum;
+ }
+}
+
+//
+
+DataReader::Body::Body(const LayerParameter& param)
+ : param_(param),
+ new_queue_pairs_() {
+ StartInternalThread();
+}
+
+DataReader::Body::~Body() {
+ StopInternalThread();
+}
+
+void DataReader::Body::InternalThreadEntry() {
+ shared_ptr<db::DB> db(db::GetDB(param_.data_param().backend()));
+ db->Open(param_.data_param().source(), db::READ);
+ shared_ptr<db::Cursor> cursor(db->NewCursor());
+ vector<shared_ptr<QueuePair> > qps;
+ try {
+ // int solver_count = param_.phase() == TRAIN ? Caffe::solver_count() : 1;
+ // TODO single solver until multi-gpu merge
+ int solver_count = 1;
+
+ // To ensure deterministic runs, only start running once all solvers
+ // are ready. But solvers need to peek on one item during initialization,
+ // so read one item, then wait for the next solver.
+ for (int i = 0; i < solver_count; ++i) {
+ shared_ptr<QueuePair> qp(new_queue_pairs_.pop());
+ read_one(cursor.get(), qp.get());
+ qps.push_back(qp);
+ }
+ // Main loop
+ while (!must_stop()) {
+ for (int i = 0; i < solver_count; ++i) {
+ read_one(cursor.get(), qps[i].get());
+ }
+ // Check no additional readers have been created. This can happen if
+ // more than one net is trained at a time per process, whether single
+ // or multi solver. It might also happen if two data layers have same
+ // name and same source.
+ CHECK_EQ(new_queue_pairs_.size(), 0);
+ }
+ } catch (boost::thread_interrupted&) {
+ // Interrupted exception is expected on shutdown
+ }
+}
+
+void DataReader::Body::read_one(db::Cursor* cursor, QueuePair* qp) {
+ Datum* datum = qp->free_.pop();
+ // TODO deserialize in-place instead of copy?
+ datum->ParseFromString(cursor->value());
+ qp->full_.push(datum);
+
+ // go to the next iter
+ cursor->Next();
+ if (!cursor->valid()) {
+ DLOG(INFO) << "Restarting data prefetching from start.";
+ cursor->SeekToFirst();
+ }
+}
+
+} // namespace caffe
DLOG(INFO) << "Prefetch copied";
if (this->output_labels_) {
// Reshape to loaded labels.
- top[1]->ReshapeLike(prefetch_label_);
+ top[1]->ReshapeLike(batch->label_);
// Copy the labels.
caffe_copy(batch->label_.count(), batch->label_.cpu_data(),
top[1]->mutable_cpu_data());
#include "caffe/proto/caffe.pb.h"
#include "caffe/util/benchmark.hpp"
#include "caffe/util/io.hpp"
-#include "caffe/util/math_functions.hpp"
-#include "caffe/util/rng.hpp"
namespace caffe {
template <typename Dtype>
+DataLayer<Dtype>::DataLayer(const LayerParameter& param)
+ : BasePrefetchingDataLayer<Dtype>(param),
+ reader_(param) {
+}
+
+template <typename Dtype>
DataLayer<Dtype>::~DataLayer() {
this->StopInternalThread();
}
template <typename Dtype>
void DataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {
- // Initialize DB
- db_.reset(db::GetDB(this->layer_param_.data_param().backend()));
- db_->Open(this->layer_param_.data_param().source(), db::READ);
- cursor_.reset(db_->NewCursor());
+ const int batch_size = this->layer_param_.data_param().batch_size();
+ // Read a data point, and use it to initialize the top blob.
+ Datum& datum = *(reader_.full().peek());
- // Check if we should randomly skip a few data points
- if (this->layer_param_.data_param().rand_skip()) {
- unsigned int skip = caffe_rng_rand() %
- this->layer_param_.data_param().rand_skip();
- LOG(INFO) << "Skipping first " << skip << " data points.";
- while (skip-- > 0) {
- cursor_->Next();
- }
- }
- // Read a data point, to initialize the prefetch and top blobs.
- Datum datum;
- datum.ParseFromString(cursor_->value());
// Use data_transformer to infer the expected blob shape from datum.
vector<int> top_shape = this->data_transformer_->InferBlobShape(datum);
this->transformed_data_.Reshape(top_shape);
// Reshape top[0] and prefetch_data according to the batch_size.
- top_shape[0] = this->layer_param_.data_param().batch_size();
- this->prefetch_data_.Reshape(top_shape);
- top[0]->ReshapeLike(this->prefetch_data_);
-
+ top_shape[0] = batch_size;
+ top[0]->Reshape(top_shape);
+ for (int i = 0; i < this->PREFETCH_COUNT; ++i) {
+ this->prefetch_[i].data_.Reshape(top_shape);
+ }
LOG(INFO) << "output data size: " << top[0]->num() << ","
<< top[0]->channels() << "," << top[0]->height() << ","
<< top[0]->width();
// Reshape according to the first datum of each batch
// on single input batches allows for inputs of varying dimension.
const int batch_size = this->layer_param_.data_param().batch_size();
- Datum datum;
- datum.ParseFromString(cursor_->value());
+ Datum& datum = *(reader_.full().peek());
// Use data_transformer to infer the expected blob shape from datum.
vector<int> top_shape = this->data_transformer_->InferBlobShape(datum);
this->transformed_data_.Reshape(top_shape);
- // Reshape prefetch_data according to the batch_size.
+ // Reshape batch according to the batch_size.
top_shape[0] = batch_size;
batch->data_.Reshape(top_shape);
if (this->output_labels_) {
top_label = batch->label_.mutable_cpu_data();
}
- timer.Start();
for (int item_id = 0; item_id < batch_size; ++item_id) {
+ timer.Start();
// get a datum
- Datum datum;
- datum.ParseFromString(cursor_->value());
+ Datum& datum = *(reader_.full().pop("Waiting for data"));
read_time += timer.MicroSeconds();
timer.Start();
// Apply data transformations (mirror, scale, crop...)
top_label[item_id] = datum.label();
}
trans_time += timer.MicroSeconds();
- timer.Start();
- // go to the next item.
- cursor_->Next();
- if (!cursor_->valid()) {
- DLOG(INFO) << "Restarting data prefetching from start.";
- cursor_->SeekToFirst();
- }
+
+ reader_.free().push(const_cast<Datum*>(&datum));
}
timer.Stop();
batch_timer.Stop();
// to avoid all asynchronous sgd clients to start at the same point. The skip
// point would be set as rand_skip * rand(0,1). Note that rand_skip should not
// be larger than the number of keys in the database.
+ // DEPRECATED. Each solver accesses a different subset of the database.
optional uint32 rand_skip = 7 [default = 0];
optional DB backend = 8 [default = LEVELDB];
// DEPRECATED. See TransformationParameter. For data pre-processing, we can do
optional bool mirror = 6 [default = false];
// Force the encoded image to have 3 color channels
optional bool force_encoded_color = 9 [default = false];
+ // Prefetch queue (Number of batches to prefetch to host memory, increase if
+ // data access bandwidth varies).
+ optional uint32 prefetch = 10 [default = 4];
}
message DropoutParameter {
#include <map>
#include <string>
+#include "boost/scoped_ptr.hpp"
#include "gtest/gtest.h"
#include "caffe/common.hpp"
#include "caffe/layer.hpp"
#include "caffe/layer_factory.hpp"
+#include "caffe/util/db.hpp"
+#include "caffe/util/io.hpp"
#include "caffe/test/test_caffe_main.hpp"
typename LayerRegistry<Dtype>::CreatorRegistry& registry =
LayerRegistry<Dtype>::Registry();
shared_ptr<Layer<Dtype> > layer;
- LayerParameter layer_param;
for (typename LayerRegistry<Dtype>::CreatorRegistry::iterator iter =
registry.begin(); iter != registry.end(); ++iter) {
// Special case: PythonLayer is checked by pytest
if (iter->first == "Python") { continue; }
+ LayerParameter layer_param;
+ // Data layers expect a DB
+ if (iter->first == "Data") {
+ string tmp;
+ MakeTempDir(&tmp);
+ boost::scoped_ptr<db::DB> db(db::GetDB(DataParameter_DB_LEVELDB));
+ db->Open(tmp, db::NEW);
+ db->Close();
+ layer_param.mutable_data_param()->set_source(tmp);
+ }
layer_param.set_type(iter->first);
layer = LayerRegistry<Dtype>::CreateLayer(layer_param);
EXPECT_EQ(iter->first, layer->type());
#include <string>
#include <vector>
+#include "boost/scoped_ptr.hpp"
#include "google/protobuf/text_format.h"
#include "gtest/gtest.h"
#include "caffe/blob.hpp"
#include "caffe/common.hpp"
#include "caffe/layer.hpp"
+#include "caffe/util/db.hpp"
+#include "caffe/util/io.hpp"
#include "caffe/util/upgrade_proto.hpp"
#include "caffe/test/test_caffe_main.hpp"
continue; // Empty string isn't actually a valid layer type.
}
layer_param.set_type(v2_layer_type);
+ // Data layers expect a DB
+ if (v2_layer_type == "Data") {
+ string tmp;
+ MakeTempDir(&tmp);
+ boost::scoped_ptr<db::DB> db(db::GetDB(DataParameter_DB_LEVELDB));
+ db->Open(tmp, db::NEW);
+ db->Close();
+ layer_param.mutable_data_param()->set_source(tmp);
+ }
layer = LayerRegistry<float>::CreateLayer(layer_param);
EXPECT_EQ(v2_layer_type, layer->type());
}
#include <string>
#include "caffe/data_layers.hpp"
+#include "caffe/data_reader.hpp"
#include "caffe/util/blocking_queue.hpp"
namespace caffe {
template class BlockingQueue<Batch<float>*>;
template class BlockingQueue<Batch<double>*>;
+template class BlockingQueue<Datum*>;
+template class BlockingQueue<shared_ptr<DataReader::QueuePair> >;
} // namespace caffe