#include "caffe/util/math_functions.hpp"
#include "caffe/util/rng.hpp"
#include "caffe/vision_layers.hpp"
+#include "caffe/proto/caffe.pb.h"
using std::string;
const Dtype* mean = layer->data_mean_.cpu_data();
for (int item_id = 0; item_id < batch_size; ++item_id) {
// get a blob
- CHECK(layer->iter_);
- CHECK(layer->iter_->Valid());
- datum.ParseFromString(layer->iter_->value().ToString());
+ switch (layer->layer_param_.data_param().backend()) {
+ case DataParameter_DB_LEVELDB:
+ CHECK(layer->iter_);
+ CHECK(layer->iter_->Valid());
+ datum.ParseFromString(layer->iter_->value().ToString());
+ break;
+ case DataParameter_DB_LMDB:
+ CHECK_EQ(mdb_cursor_get(layer->mdb_cursor_, &layer->mdb_key_,
+ &layer->mdb_value_, MDB_GET_CURRENT), MDB_SUCCESS);
+ datum.ParseFromArray(layer->mdb_value_.mv_data,
+ layer->mdb_value_.mv_size);
+ break;
+ default:
+ LOG(FATAL) << "Unknown database backend";
+ }
+
const string& data = datum.data();
if (crop_size) {
CHECK(data.size()) << "Image cropping only support uint8 data";
top_label[item_id] = datum.label();
}
// go to the next iter
- layer->iter_->Next();
- if (!layer->iter_->Valid()) {
- // We have reached the end. Restart from the first.
- DLOG(INFO) << "Restarting data prefetching from start.";
- layer->iter_->SeekToFirst();
+ switch (layer->layer_param_.data_param().backend()) {
+ case DataParameter_DB_LEVELDB:
+ layer->iter_->Next();
+ if (!layer->iter_->Valid()) {
+ // We have reached the end. Restart from the first.
+ DLOG(INFO) << "Restarting data prefetching from start.";
+ layer->iter_->SeekToFirst();
+ }
+ break;
+ case DataParameter_DB_LMDB:
+ if (mdb_cursor_get(layer->mdb_cursor_, &layer->mdb_key_,
+ &layer->mdb_value_, MDB_NEXT) != MDB_SUCCESS) {
+ // We have reached the end. Restart from the first.
+ DLOG(INFO) << "Restarting data prefetching from start.";
+ CHECK_EQ(mdb_cursor_get(layer->mdb_cursor_, &layer->mdb_key_,
+ &layer->mdb_value_, MDB_FIRST), MDB_SUCCESS);
+ }
+ break;
+ default:
+ LOG(FATAL) << "Unknown database backend";
}
}
template <typename Dtype>
DataLayer<Dtype>::~DataLayer<Dtype>() {
JoinPrefetchThread();
+ // clean up the database resources
+ switch (this->layer_param_.data_param().backend()) {
+ case DataParameter_DB_LEVELDB:
+ break; // do nothing
+ case DataParameter_DB_LMDB:
+ mdb_cursor_close(mdb_cursor_);
+ mdb_close(mdb_env_, mdb_dbi_);
+ mdb_txn_abort(mdb_txn_);
+ mdb_env_close(mdb_env_);
+ break;
+ default:
+ LOG(FATAL) << "Unknown database backend";
+ }
}
template <typename Dtype>
} else {
output_labels_ = true;
}
- // Initialize the leveldb
- leveldb::DB* db_temp;
- leveldb::Options options;
- options.create_if_missing = false;
- options.max_open_files = 100;
- LOG(INFO) << "Opening leveldb " << this->layer_param_.data_param().source();
- leveldb::Status status = leveldb::DB::Open(
- options, this->layer_param_.data_param().source(), &db_temp);
- CHECK(status.ok()) << "Failed to open leveldb "
- << this->layer_param_.data_param().source() << std::endl
- << status.ToString();
- db_.reset(db_temp);
- iter_.reset(db_->NewIterator(leveldb::ReadOptions()));
- iter_->SeekToFirst();
+ // Initialize DB
+ switch (this->layer_param_.data_param().backend()) {
+ case DataParameter_DB_LEVELDB:
+ {
+ leveldb::DB* db_temp;
+ leveldb::Options options;
+ options.create_if_missing = false;
+ options.max_open_files = 100;
+ LOG(INFO) << "Opening leveldb " << this->layer_param_.data_param().source();
+ leveldb::Status status = leveldb::DB::Open(
+ options, this->layer_param_.data_param().source(), &db_temp);
+ CHECK(status.ok()) << "Failed to open leveldb "
+ << this->layer_param_.data_param().source() << std::endl
+ << status.ToString();
+ db_.reset(db_temp);
+ iter_.reset(db_->NewIterator(leveldb::ReadOptions()));
+ iter_->SeekToFirst();
+ }
+ break;
+ case DataParameter_DB_LMDB:
+ CHECK_EQ(mdb_env_create(&mdb_env_), MDB_SUCCESS) << "mdb_env_create failed";
+ CHECK_EQ(mdb_env_set_mapsize(mdb_env_, 1099511627776), MDB_SUCCESS); // 1TB
+ CHECK_EQ(mdb_env_open(mdb_env_,
+ this->layer_param_.data_param().source().c_str(),
+ MDB_RDONLY|MDB_NOTLS, 0664), MDB_SUCCESS) << "mdb_env_open failed";
+ CHECK_EQ(mdb_txn_begin(mdb_env_, NULL, MDB_RDONLY, &mdb_txn_), MDB_SUCCESS)
+ << "mdb_txn_begin failed";
+ CHECK_EQ(mdb_open(mdb_txn_, NULL, 0, &mdb_dbi_), MDB_SUCCESS)
+ << "mdb_open failed";
+ CHECK_EQ(mdb_cursor_open(mdb_txn_, mdb_dbi_, &mdb_cursor_), MDB_SUCCESS)
+ << "mdb_cursor_open failed";
+ LOG(INFO) << "Opening lmdb " << this->layer_param_.data_param().source();
+ CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_, &mdb_value_, MDB_FIRST),
+ MDB_SUCCESS) << "mdb_cursor_get failed";
+ break;
+ default:
+ LOG(FATAL) << "Unknown database backend";
+ }
+
// Check if we would need to randomly skip a few data points
if (this->layer_param_.data_param().rand_skip()) {
unsigned int skip = caffe_rng_rand() %
this->layer_param_.data_param().rand_skip();
LOG(INFO) << "Skipping first " << skip << " data points.";
while (skip-- > 0) {
- iter_->Next();
- if (!iter_->Valid()) {
- iter_->SeekToFirst();
+ switch (this->layer_param_.data_param().backend()) {
+ case DataParameter_DB_LEVELDB:
+ iter_->Next();
+ if (!iter_->Valid()) {
+ iter_->SeekToFirst();
+ }
+ break;
+ case DataParameter_DB_LMDB:
+ if(mdb_cursor_get(mdb_cursor_, &mdb_key_, &mdb_value_, MDB_NEXT)
+ != MDB_SUCCESS) {
+ CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_, &mdb_value_,
+ MDB_FIRST), MDB_SUCCESS);
+ }
+ break;
+ default:
+ LOG(FATAL) << "Unknown database backend";
}
}
}
// Read a data point, and use it to initialize the top blob.
Datum datum;
- datum.ParseFromString(iter_->value().ToString());
+ switch (this->layer_param_.data_param().backend()) {
+ case DataParameter_DB_LEVELDB:
+ datum.ParseFromString(iter_->value().ToString());
+ break;
+ case DataParameter_DB_LMDB:
+ datum.ParseFromArray(mdb_value_.mv_data, mdb_value_.mv_size);
+ break;
+ default:
+ LOG(FATAL) << "Unknown database backend";
+ }
+
// image
int crop_size = this->layer_param_.data_param().crop_size();
if (crop_size > 0) {