From: Kevin James Matzen Date: Sat, 18 Oct 2014 06:51:29 +0000 (-0400) Subject: LMDB doesn't support many concurrent read-only transactions, so this preallocates... X-Git-Tag: submit/tizen/20180823.020014~572^2~100^2 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=e41ddbd3d70eff49a7bab430bb085d3b89647b4d;p=platform%2Fupstream%2Fcaffeonacl.git LMDB doesn't support many concurrent read-only transactions, so this preallocates one read-only transaction and reuses it. It's very important that iterations are considered invalid after a commit has been performed. --- diff --git a/include/caffe/dataset.hpp b/include/caffe/dataset.hpp index 90deb31..1dd8458 100644 --- a/include/caffe/dataset.hpp +++ b/include/caffe/dataset.hpp @@ -170,7 +170,6 @@ class Dataset { iterator(const Dataset* parent, shared_ptr state) : parent_(parent), state_(state) { } - ~iterator() { } iterator(const iterator& other) : parent_(other.parent_), diff --git a/include/caffe/leveldb_dataset.hpp b/include/caffe/leveldb_dataset.hpp index f6c57fe..d58c181 100644 --- a/include/caffe/leveldb_dataset.hpp +++ b/include/caffe/leveldb_dataset.hpp @@ -61,12 +61,11 @@ class LeveldbDataset : public Dataset { shared_ptr clone() { shared_ptr new_iter; - if (iter_.get()) { - new_iter.reset(db_->NewIterator(leveldb::ReadOptions())); - CHECK(iter_->Valid()); - new_iter->Seek(iter_->key()); - CHECK(new_iter->Valid()); - } + CHECK(iter_.get()); + new_iter.reset(db_->NewIterator(leveldb::ReadOptions())); + CHECK(iter_->Valid()); + new_iter->Seek(iter_->key()); + CHECK(new_iter->Valid()); return shared_ptr(new LeveldbState(db_, new_iter)); } diff --git a/include/caffe/lmdb_dataset.hpp b/include/caffe/lmdb_dataset.hpp index 71b3224..ac1e5ee 100644 --- a/include/caffe/lmdb_dataset.hpp +++ b/include/caffe/lmdb_dataset.hpp @@ -28,7 +28,8 @@ class LmdbDataset : public Dataset { LmdbDataset() : env_(NULL), dbi_(0), - txn_(NULL) { } + write_txn_(NULL), + read_txn_(NULL) { } bool open(const string& filename, Mode mode); bool put(const K& key, const V& value); @@ -55,21 +56,19 @@ class LmdbDataset : public Dataset { dbi_(dbi) { } shared_ptr clone() { + CHECK(cursor_); + MDB_cursor* new_cursor; + int retval; - if (cursor_) { - int retval; - retval = mdb_cursor_open(txn_, *dbi_, &new_cursor); - CHECK_EQ(retval, MDB_SUCCESS) << mdb_strerror(retval); - MDB_val key; - MDB_val val; - retval = mdb_cursor_get(cursor_, &key, &val, MDB_GET_CURRENT); - CHECK_EQ(retval, MDB_SUCCESS) << mdb_strerror(retval); - retval = mdb_cursor_get(new_cursor, &key, &val, MDB_SET); - CHECK_EQ(MDB_SUCCESS, retval) << mdb_strerror(retval); - } else { - new_cursor = cursor_; - } + retval = mdb_cursor_open(txn_, *dbi_, &new_cursor); + CHECK_EQ(retval, MDB_SUCCESS) << mdb_strerror(retval); + MDB_val key; + MDB_val val; + retval = mdb_cursor_get(cursor_, &key, &val, MDB_GET_CURRENT); + CHECK_EQ(retval, MDB_SUCCESS) << mdb_strerror(retval); + retval = mdb_cursor_get(new_cursor, &key, &val, MDB_SET); + CHECK_EQ(MDB_SUCCESS, retval) << mdb_strerror(retval); return shared_ptr(new LmdbState(new_cursor, txn_, dbi_)); } @@ -87,7 +86,8 @@ class LmdbDataset : public Dataset { MDB_env* env_; MDB_dbi dbi_; - MDB_txn* txn_; + MDB_txn* write_txn_; + MDB_txn* read_txn_; }; } // namespace caffe diff --git a/src/caffe/lmdb_dataset.cpp b/src/caffe/lmdb_dataset.cpp index 23852f0..8f8e68e 100644 --- a/src/caffe/lmdb_dataset.cpp +++ b/src/caffe/lmdb_dataset.cpp @@ -15,7 +15,8 @@ bool LmdbDataset::open(const string& filename, DLOG(INFO) << "LMDB: Open " << filename; CHECK(NULL == env_); - CHECK(NULL == txn_); + CHECK(NULL == write_txn_); + CHECK(NULL == read_txn_); CHECK_EQ(0, dbi_); int retval; @@ -66,13 +67,19 @@ bool LmdbDataset::open(const string& filename, return false; } - retval = mdb_txn_begin(env_, NULL, flag2, &txn_); + retval = mdb_txn_begin(env_, NULL, MDB_RDONLY, &read_txn_); if (MDB_SUCCESS != retval) { LOG(ERROR) << "mdb_txn_begin failed " << mdb_strerror(retval); return false; } - retval = mdb_open(txn_, NULL, 0, &dbi_); + retval = mdb_txn_begin(env_, NULL, flag2, &write_txn_); + if (MDB_SUCCESS != retval) { + LOG(ERROR) << "mdb_txn_begin failed " << mdb_strerror(retval); + return false; + } + + retval = mdb_open(write_txn_, NULL, 0, &dbi_); if (MDB_SUCCESS != retval) { LOG(ERROR) << "mdb_open failed" << mdb_strerror(retval); return false; @@ -103,10 +110,10 @@ bool LmdbDataset::put(const K& key, const V& value) { mdbkey.mv_size = serialized_key.size(); mdbkey.mv_data = serialized_key.data(); - CHECK_NOTNULL(txn_); + CHECK_NOTNULL(write_txn_); CHECK_NE(0, dbi_); - int retval = mdb_put(txn_, dbi_, &mdbkey, &mdbdata, 0); + int retval = mdb_put(write_txn_, dbi_, &mdbkey, &mdbdata, 0); if (MDB_SUCCESS != retval) { LOG(ERROR) << "mdb_put failed " << mdb_strerror(retval); return false; @@ -130,21 +137,12 @@ bool LmdbDataset::get(const K& key, V* value) { mdbkey.mv_size = serialized_key.size(); int retval; - MDB_txn* get_txn; - retval = mdb_txn_begin(env_, NULL, MDB_RDONLY, &get_txn); - if (MDB_SUCCESS != retval) { - LOG(ERROR) << "mdb_txn_begin failed " << mdb_strerror(retval); - return false; - } - - retval = mdb_get(get_txn, dbi_, &mdbkey, &mdbdata); + retval = mdb_get(read_txn_, dbi_, &mdbkey, &mdbdata); if (MDB_SUCCESS != retval) { LOG(ERROR) << "mdb_get failed " << mdb_strerror(retval); return false; } - mdb_txn_abort(get_txn); - if (!VCoder::deserialize(reinterpret_cast(mdbdata.mv_data), mdbdata.mv_size, value)) { LOG(ERROR) << "failed to deserialize value"; @@ -160,14 +158,8 @@ bool LmdbDataset::first_key(K* key) { int retval; - MDB_txn* iter_txn; - - retval = mdb_txn_begin(env_, NULL, MDB_RDONLY, &iter_txn); - CHECK_EQ(MDB_SUCCESS, retval) << "mdb_txn_begin failed " - << mdb_strerror(retval); - MDB_cursor* cursor; - retval = mdb_cursor_open(iter_txn, dbi_, &cursor); + retval = mdb_cursor_open(read_txn_, dbi_, &cursor); CHECK_EQ(retval, MDB_SUCCESS) << mdb_strerror(retval); MDB_val mdbkey; MDB_val mdbval; @@ -175,7 +167,6 @@ bool LmdbDataset::first_key(K* key) { CHECK_EQ(retval, MDB_SUCCESS) << mdb_strerror(retval); mdb_cursor_close(cursor); - mdb_txn_abort(iter_txn); if (!KCoder::deserialize(reinterpret_cast(mdbkey.mv_data), mdbkey.mv_size, key)) { @@ -192,14 +183,8 @@ bool LmdbDataset::last_key(K* key) { int retval; - MDB_txn* iter_txn; - - retval = mdb_txn_begin(env_, NULL, MDB_RDONLY, &iter_txn); - CHECK_EQ(MDB_SUCCESS, retval) << "mdb_txn_begin failed " - << mdb_strerror(retval); - MDB_cursor* cursor; - retval = mdb_cursor_open(iter_txn, dbi_, &cursor); + retval = mdb_cursor_open(read_txn_, dbi_, &cursor); CHECK_EQ(retval, MDB_SUCCESS) << mdb_strerror(retval); MDB_val mdbkey; MDB_val mdbval; @@ -207,7 +192,6 @@ bool LmdbDataset::last_key(K* key) { CHECK_EQ(retval, MDB_SUCCESS) << mdb_strerror(retval); mdb_cursor_close(cursor); - mdb_txn_abort(iter_txn); if (!KCoder::deserialize(reinterpret_cast(mdbkey.mv_data), mdbkey.mv_size, key)) { @@ -222,16 +206,24 @@ template bool LmdbDataset::commit() { DLOG(INFO) << "LMDB: Commit"; - CHECK_NOTNULL(txn_); + CHECK_NOTNULL(write_txn_); int retval; - retval = mdb_txn_commit(txn_); + retval = mdb_txn_commit(write_txn_); if (MDB_SUCCESS != retval) { LOG(ERROR) << "mdb_txn_commit failed " << mdb_strerror(retval); return false; } - retval = mdb_txn_begin(env_, NULL, 0, &txn_); + mdb_txn_abort(read_txn_); + + retval = mdb_txn_begin(env_, NULL, 0, &write_txn_); + if (MDB_SUCCESS != retval) { + LOG(ERROR) << "mdb_txn_begin failed " << mdb_strerror(retval); + return false; + } + + retval = mdb_txn_begin(env_, NULL, MDB_RDONLY, &read_txn_); if (MDB_SUCCESS != retval) { LOG(ERROR) << "mdb_txn_begin failed " << mdb_strerror(retval); return false; @@ -245,11 +237,14 @@ void LmdbDataset::close() { DLOG(INFO) << "LMDB: Close"; if (env_ && dbi_) { + mdb_txn_abort(write_txn_); + mdb_txn_abort(read_txn_); mdb_close(env_, dbi_); mdb_env_close(env_); env_ = NULL; dbi_ = 0; - txn_ = NULL; + write_txn_ = NULL; + read_txn_ = NULL; } } @@ -268,14 +263,8 @@ typename LmdbDataset::const_iterator LmdbDataset::begin() const { int retval; - MDB_txn* iter_txn; - - retval = mdb_txn_begin(env_, NULL, MDB_RDONLY, &iter_txn); - CHECK_EQ(MDB_SUCCESS, retval) << "mdb_txn_begin failed " - << mdb_strerror(retval); - MDB_cursor* cursor; - retval = mdb_cursor_open(iter_txn, dbi_, &cursor); + retval = mdb_cursor_open(read_txn_, dbi_, &cursor); CHECK_EQ(retval, MDB_SUCCESS) << mdb_strerror(retval); MDB_val key; MDB_val val; @@ -286,7 +275,9 @@ typename LmdbDataset::const_iterator shared_ptr state; if (MDB_SUCCESS == retval) { - state.reset(new LmdbState(cursor, iter_txn, &dbi_)); + state.reset(new LmdbState(cursor, read_txn_, &dbi_)); + } else { + mdb_cursor_close(cursor); } return const_iterator(this, state); } diff --git a/src/caffe/test/test_dataset.cpp b/src/caffe/test/test_dataset.cpp index e5bd1d2..6645ca2 100644 --- a/src/caffe/test/test_dataset.cpp +++ b/src/caffe/test/test_dataset.cpp @@ -751,6 +751,44 @@ TYPED_TEST(DatasetTest, TestReadOnlyGetNoCommitFails) { EXPECT_FALSE(dataset->get(key, &new_value)); } +TYPED_TEST(DatasetTest, TestCreateManyItersShortScope) { + UNPACK_TYPES; + + string name = this->DBName(); + shared_ptr > dataset = + DatasetFactory(backend); + EXPECT_TRUE(dataset->open(name, Dataset::New)); + + string key = this->TestKey(); + value_type value = this->TestValue(); + EXPECT_TRUE(dataset->put(key, value)); + EXPECT_TRUE(dataset->commit()); + + for (int i = 0; i < 1000; ++i) { + typename Dataset::const_iterator iter = + dataset->begin(); + } +} + +TYPED_TEST(DatasetTest, TestCreateManyItersLongScope) { + UNPACK_TYPES; + + string name = this->DBName(); + shared_ptr > dataset = + DatasetFactory(backend); + EXPECT_TRUE(dataset->open(name, Dataset::New)); + + string key = this->TestKey(); + value_type value = this->TestValue(); + EXPECT_TRUE(dataset->put(key, value)); + EXPECT_TRUE(dataset->commit()); + + vector::const_iterator> iters; + for (int i = 0; i < 1000; ++i) { + iters.push_back(dataset->begin()); + } +} + #undef UNPACK_TYPES } // namespace caffe