LmdbDataset()
: env_(NULL),
dbi_(0),
- txn_(NULL) { }
+ write_txn_(NULL),
+ read_txn_(NULL) { }
bool open(const string& filename, Mode mode);
bool put(const K& key, const V& value);
dbi_(dbi) { }
shared_ptr<DatasetState> clone() {
+ CHECK(cursor_);
+
MDB_cursor* new_cursor;
+ int retval;
- if (cursor_) {
- int retval;
- retval = mdb_cursor_open(txn_, *dbi_, &new_cursor);
- CHECK_EQ(retval, MDB_SUCCESS) << mdb_strerror(retval);
- MDB_val key;
- MDB_val val;
- retval = mdb_cursor_get(cursor_, &key, &val, MDB_GET_CURRENT);
- CHECK_EQ(retval, MDB_SUCCESS) << mdb_strerror(retval);
- retval = mdb_cursor_get(new_cursor, &key, &val, MDB_SET);
- CHECK_EQ(MDB_SUCCESS, retval) << mdb_strerror(retval);
- } else {
- new_cursor = cursor_;
- }
+ retval = mdb_cursor_open(txn_, *dbi_, &new_cursor);
+ CHECK_EQ(retval, MDB_SUCCESS) << mdb_strerror(retval);
+ MDB_val key;
+ MDB_val val;
+ retval = mdb_cursor_get(cursor_, &key, &val, MDB_GET_CURRENT);
+ CHECK_EQ(retval, MDB_SUCCESS) << mdb_strerror(retval);
+ retval = mdb_cursor_get(new_cursor, &key, &val, MDB_SET);
+ CHECK_EQ(MDB_SUCCESS, retval) << mdb_strerror(retval);
return shared_ptr<DatasetState>(new LmdbState(new_cursor, txn_, dbi_));
}
MDB_env* env_;
MDB_dbi dbi_;
- MDB_txn* txn_;
+ MDB_txn* write_txn_;
+ MDB_txn* read_txn_;
};
} // namespace caffe
DLOG(INFO) << "LMDB: Open " << filename;
CHECK(NULL == env_);
- CHECK(NULL == txn_);
+ CHECK(NULL == write_txn_);
+ CHECK(NULL == read_txn_);
CHECK_EQ(0, dbi_);
int retval;
return false;
}
- retval = mdb_txn_begin(env_, NULL, flag2, &txn_);
+ retval = mdb_txn_begin(env_, NULL, MDB_RDONLY, &read_txn_);
if (MDB_SUCCESS != retval) {
LOG(ERROR) << "mdb_txn_begin failed " << mdb_strerror(retval);
return false;
}
- retval = mdb_open(txn_, NULL, 0, &dbi_);
+ retval = mdb_txn_begin(env_, NULL, flag2, &write_txn_);
+ if (MDB_SUCCESS != retval) {
+ LOG(ERROR) << "mdb_txn_begin failed " << mdb_strerror(retval);
+ return false;
+ }
+
+ retval = mdb_open(write_txn_, NULL, 0, &dbi_);
if (MDB_SUCCESS != retval) {
LOG(ERROR) << "mdb_open failed" << mdb_strerror(retval);
return false;
mdbkey.mv_size = serialized_key.size();
mdbkey.mv_data = serialized_key.data();
- CHECK_NOTNULL(txn_);
+ CHECK_NOTNULL(write_txn_);
CHECK_NE(0, dbi_);
- int retval = mdb_put(txn_, dbi_, &mdbkey, &mdbdata, 0);
+ int retval = mdb_put(write_txn_, dbi_, &mdbkey, &mdbdata, 0);
if (MDB_SUCCESS != retval) {
LOG(ERROR) << "mdb_put failed " << mdb_strerror(retval);
return false;
mdbkey.mv_size = serialized_key.size();
int retval;
- MDB_txn* get_txn;
- retval = mdb_txn_begin(env_, NULL, MDB_RDONLY, &get_txn);
- if (MDB_SUCCESS != retval) {
- LOG(ERROR) << "mdb_txn_begin failed " << mdb_strerror(retval);
- return false;
- }
-
- retval = mdb_get(get_txn, dbi_, &mdbkey, &mdbdata);
+ retval = mdb_get(read_txn_, dbi_, &mdbkey, &mdbdata);
if (MDB_SUCCESS != retval) {
LOG(ERROR) << "mdb_get failed " << mdb_strerror(retval);
return false;
}
- mdb_txn_abort(get_txn);
-
if (!VCoder::deserialize(reinterpret_cast<char*>(mdbdata.mv_data),
mdbdata.mv_size, value)) {
LOG(ERROR) << "failed to deserialize value";
int retval;
- MDB_txn* iter_txn;
-
- retval = mdb_txn_begin(env_, NULL, MDB_RDONLY, &iter_txn);
- CHECK_EQ(MDB_SUCCESS, retval) << "mdb_txn_begin failed "
- << mdb_strerror(retval);
-
MDB_cursor* cursor;
- retval = mdb_cursor_open(iter_txn, dbi_, &cursor);
+ retval = mdb_cursor_open(read_txn_, dbi_, &cursor);
CHECK_EQ(retval, MDB_SUCCESS) << mdb_strerror(retval);
MDB_val mdbkey;
MDB_val mdbval;
CHECK_EQ(retval, MDB_SUCCESS) << mdb_strerror(retval);
mdb_cursor_close(cursor);
- mdb_txn_abort(iter_txn);
if (!KCoder::deserialize(reinterpret_cast<char*>(mdbkey.mv_data),
mdbkey.mv_size, key)) {
int retval;
- MDB_txn* iter_txn;
-
- retval = mdb_txn_begin(env_, NULL, MDB_RDONLY, &iter_txn);
- CHECK_EQ(MDB_SUCCESS, retval) << "mdb_txn_begin failed "
- << mdb_strerror(retval);
-
MDB_cursor* cursor;
- retval = mdb_cursor_open(iter_txn, dbi_, &cursor);
+ retval = mdb_cursor_open(read_txn_, dbi_, &cursor);
CHECK_EQ(retval, MDB_SUCCESS) << mdb_strerror(retval);
MDB_val mdbkey;
MDB_val mdbval;
CHECK_EQ(retval, MDB_SUCCESS) << mdb_strerror(retval);
mdb_cursor_close(cursor);
- mdb_txn_abort(iter_txn);
if (!KCoder::deserialize(reinterpret_cast<char*>(mdbkey.mv_data),
mdbkey.mv_size, key)) {
bool LmdbDataset<K, V, KCoder, VCoder>::commit() {
DLOG(INFO) << "LMDB: Commit";
- CHECK_NOTNULL(txn_);
+ CHECK_NOTNULL(write_txn_);
int retval;
- retval = mdb_txn_commit(txn_);
+ retval = mdb_txn_commit(write_txn_);
if (MDB_SUCCESS != retval) {
LOG(ERROR) << "mdb_txn_commit failed " << mdb_strerror(retval);
return false;
}
- retval = mdb_txn_begin(env_, NULL, 0, &txn_);
+ mdb_txn_abort(read_txn_);
+
+ retval = mdb_txn_begin(env_, NULL, 0, &write_txn_);
+ if (MDB_SUCCESS != retval) {
+ LOG(ERROR) << "mdb_txn_begin failed " << mdb_strerror(retval);
+ return false;
+ }
+
+ retval = mdb_txn_begin(env_, NULL, MDB_RDONLY, &read_txn_);
if (MDB_SUCCESS != retval) {
LOG(ERROR) << "mdb_txn_begin failed " << mdb_strerror(retval);
return false;
DLOG(INFO) << "LMDB: Close";
if (env_ && dbi_) {
+ mdb_txn_abort(write_txn_);
+ mdb_txn_abort(read_txn_);
mdb_close(env_, dbi_);
mdb_env_close(env_);
env_ = NULL;
dbi_ = 0;
- txn_ = NULL;
+ write_txn_ = NULL;
+ read_txn_ = NULL;
}
}
LmdbDataset<K, V, KCoder, VCoder>::begin() const {
int retval;
- MDB_txn* iter_txn;
-
- retval = mdb_txn_begin(env_, NULL, MDB_RDONLY, &iter_txn);
- CHECK_EQ(MDB_SUCCESS, retval) << "mdb_txn_begin failed "
- << mdb_strerror(retval);
-
MDB_cursor* cursor;
- retval = mdb_cursor_open(iter_txn, dbi_, &cursor);
+ retval = mdb_cursor_open(read_txn_, dbi_, &cursor);
CHECK_EQ(retval, MDB_SUCCESS) << mdb_strerror(retval);
MDB_val key;
MDB_val val;
shared_ptr<DatasetState> state;
if (MDB_SUCCESS == retval) {
- state.reset(new LmdbState(cursor, iter_txn, &dbi_));
+ state.reset(new LmdbState(cursor, read_txn_, &dbi_));
+ } else {
+ mdb_cursor_close(cursor);
}
return const_iterator(this, state);
}
EXPECT_FALSE(dataset->get(key, &new_value));
}
+TYPED_TEST(DatasetTest, TestCreateManyItersShortScope) {
+ UNPACK_TYPES;
+
+ string name = this->DBName();
+ shared_ptr<Dataset<string, value_type> > dataset =
+ DatasetFactory<string, value_type>(backend);
+ EXPECT_TRUE(dataset->open(name, Dataset<string, value_type>::New));
+
+ string key = this->TestKey();
+ value_type value = this->TestValue();
+ EXPECT_TRUE(dataset->put(key, value));
+ EXPECT_TRUE(dataset->commit());
+
+ for (int i = 0; i < 1000; ++i) {
+ typename Dataset<string, value_type>::const_iterator iter =
+ dataset->begin();
+ }
+}
+
+TYPED_TEST(DatasetTest, TestCreateManyItersLongScope) {
+ UNPACK_TYPES;
+
+ string name = this->DBName();
+ shared_ptr<Dataset<string, value_type> > dataset =
+ DatasetFactory<string, value_type>(backend);
+ EXPECT_TRUE(dataset->open(name, Dataset<string, value_type>::New));
+
+ string key = this->TestKey();
+ value_type value = this->TestValue();
+ EXPECT_TRUE(dataset->put(key, value));
+ EXPECT_TRUE(dataset->commit());
+
+ vector<typename Dataset<string, value_type>::const_iterator> iters;
+ for (int i = 0; i < 1000; ++i) {
+ iters.push_back(dataset->begin());
+ }
+}
+
#undef UNPACK_TYPES
} // namespace caffe