LMDB doesn't support many concurrent read-only transactions, so this preallocates...
authorKevin James Matzen <kmatzen@cs.cornell.edu>
Sat, 18 Oct 2014 06:51:29 +0000 (02:51 -0400)
committerKevin James Matzen <kmatzen@cs.cornell.edu>
Sat, 18 Oct 2014 06:51:29 +0000 (02:51 -0400)
include/caffe/dataset.hpp
include/caffe/leveldb_dataset.hpp
include/caffe/lmdb_dataset.hpp
src/caffe/lmdb_dataset.cpp
src/caffe/test/test_dataset.cpp

index 90deb31..1dd8458 100644 (file)
@@ -170,7 +170,6 @@ class Dataset {
     iterator(const Dataset* parent, shared_ptr<DatasetState> state)
         : parent_(parent),
           state_(state) { }
-    ~iterator() { }
 
     iterator(const iterator& other)
         : parent_(other.parent_),
index f6c57fe..d58c181 100644 (file)
@@ -61,12 +61,11 @@ class LeveldbDataset : public Dataset<K, V, KCoder, VCoder> {
     shared_ptr<DatasetState> clone() {
       shared_ptr<leveldb::Iterator> new_iter;
 
-      if (iter_.get()) {
-        new_iter.reset(db_->NewIterator(leveldb::ReadOptions()));
-        CHECK(iter_->Valid());
-        new_iter->Seek(iter_->key());
-        CHECK(new_iter->Valid());
-      }
+      CHECK(iter_.get());
+      new_iter.reset(db_->NewIterator(leveldb::ReadOptions()));
+      CHECK(iter_->Valid());
+      new_iter->Seek(iter_->key());
+      CHECK(new_iter->Valid());
 
       return shared_ptr<DatasetState>(new LeveldbState(db_, new_iter));
     }
index 71b3224..ac1e5ee 100644 (file)
@@ -28,7 +28,8 @@ class LmdbDataset : public Dataset<K, V, KCoder, VCoder> {
   LmdbDataset()
       : env_(NULL),
         dbi_(0),
-        txn_(NULL) { }
+        write_txn_(NULL),
+        read_txn_(NULL) { }
 
   bool open(const string& filename, Mode mode);
   bool put(const K& key, const V& value);
@@ -55,21 +56,19 @@ class LmdbDataset : public Dataset<K, V, KCoder, VCoder> {
           dbi_(dbi) { }
 
     shared_ptr<DatasetState> clone() {
+      CHECK(cursor_);
+
       MDB_cursor* new_cursor;
+      int retval;
 
-      if (cursor_) {
-        int retval;
-        retval = mdb_cursor_open(txn_, *dbi_, &new_cursor);
-        CHECK_EQ(retval, MDB_SUCCESS) << mdb_strerror(retval);
-        MDB_val key;
-        MDB_val val;
-        retval = mdb_cursor_get(cursor_, &key, &val, MDB_GET_CURRENT);
-        CHECK_EQ(retval, MDB_SUCCESS) << mdb_strerror(retval);
-        retval = mdb_cursor_get(new_cursor, &key, &val, MDB_SET);
-        CHECK_EQ(MDB_SUCCESS, retval) << mdb_strerror(retval);
-      } else {
-        new_cursor = cursor_;
-      }
+      retval = mdb_cursor_open(txn_, *dbi_, &new_cursor);
+      CHECK_EQ(retval, MDB_SUCCESS) << mdb_strerror(retval);
+      MDB_val key;
+      MDB_val val;
+      retval = mdb_cursor_get(cursor_, &key, &val, MDB_GET_CURRENT);
+      CHECK_EQ(retval, MDB_SUCCESS) << mdb_strerror(retval);
+      retval = mdb_cursor_get(new_cursor, &key, &val, MDB_SET);
+      CHECK_EQ(MDB_SUCCESS, retval) << mdb_strerror(retval);
 
       return shared_ptr<DatasetState>(new LmdbState(new_cursor, txn_, dbi_));
     }
@@ -87,7 +86,8 @@ class LmdbDataset : public Dataset<K, V, KCoder, VCoder> {
 
   MDB_env* env_;
   MDB_dbi dbi_;
-  MDB_txn* txn_;
+  MDB_txn* write_txn_;
+  MDB_txn* read_txn_;
 };
 
 }  // namespace caffe
index 23852f0..8f8e68e 100644 (file)
@@ -15,7 +15,8 @@ bool LmdbDataset<K, V, KCoder, VCoder>::open(const string& filename,
   DLOG(INFO) << "LMDB: Open " << filename;
 
   CHECK(NULL == env_);
-  CHECK(NULL == txn_);
+  CHECK(NULL == write_txn_);
+  CHECK(NULL == read_txn_);
   CHECK_EQ(0, dbi_);
 
   int retval;
@@ -66,13 +67,19 @@ bool LmdbDataset<K, V, KCoder, VCoder>::open(const string& filename,
     return false;
   }
 
-  retval = mdb_txn_begin(env_, NULL, flag2, &txn_);
+  retval = mdb_txn_begin(env_, NULL, MDB_RDONLY, &read_txn_);
   if (MDB_SUCCESS != retval) {
     LOG(ERROR) << "mdb_txn_begin failed " << mdb_strerror(retval);
     return false;
   }
 
-  retval = mdb_open(txn_, NULL, 0, &dbi_);
+  retval = mdb_txn_begin(env_, NULL, flag2, &write_txn_);
+  if (MDB_SUCCESS != retval) {
+    LOG(ERROR) << "mdb_txn_begin failed " << mdb_strerror(retval);
+    return false;
+  }
+
+  retval = mdb_open(write_txn_, NULL, 0, &dbi_);
   if (MDB_SUCCESS != retval) {
     LOG(ERROR) << "mdb_open failed" << mdb_strerror(retval);
     return false;
@@ -103,10 +110,10 @@ bool LmdbDataset<K, V, KCoder, VCoder>::put(const K& key, const V& value) {
   mdbkey.mv_size = serialized_key.size();
   mdbkey.mv_data = serialized_key.data();
 
-  CHECK_NOTNULL(txn_);
+  CHECK_NOTNULL(write_txn_);
   CHECK_NE(0, dbi_);
 
-  int retval = mdb_put(txn_, dbi_, &mdbkey, &mdbdata, 0);
+  int retval = mdb_put(write_txn_, dbi_, &mdbkey, &mdbdata, 0);
   if (MDB_SUCCESS != retval) {
     LOG(ERROR) << "mdb_put failed " << mdb_strerror(retval);
     return false;
@@ -130,21 +137,12 @@ bool LmdbDataset<K, V, KCoder, VCoder>::get(const K& key, V* value) {
   mdbkey.mv_size = serialized_key.size();
 
   int retval;
-  MDB_txn* get_txn;
-  retval = mdb_txn_begin(env_, NULL, MDB_RDONLY, &get_txn);
-  if (MDB_SUCCESS != retval) {
-    LOG(ERROR) << "mdb_txn_begin failed " << mdb_strerror(retval);
-    return false;
-  }
-
-  retval = mdb_get(get_txn, dbi_, &mdbkey, &mdbdata);
+  retval = mdb_get(read_txn_, dbi_, &mdbkey, &mdbdata);
   if (MDB_SUCCESS != retval) {
     LOG(ERROR) << "mdb_get failed " << mdb_strerror(retval);
     return false;
   }
 
-  mdb_txn_abort(get_txn);
-
   if (!VCoder::deserialize(reinterpret_cast<char*>(mdbdata.mv_data),
       mdbdata.mv_size, value)) {
     LOG(ERROR) << "failed to deserialize value";
@@ -160,14 +158,8 @@ bool LmdbDataset<K, V, KCoder, VCoder>::first_key(K* key) {
 
   int retval;
 
-  MDB_txn* iter_txn;
-
-  retval = mdb_txn_begin(env_, NULL, MDB_RDONLY, &iter_txn);
-  CHECK_EQ(MDB_SUCCESS, retval) << "mdb_txn_begin failed "
-      << mdb_strerror(retval);
-
   MDB_cursor* cursor;
-  retval = mdb_cursor_open(iter_txn, dbi_, &cursor);
+  retval = mdb_cursor_open(read_txn_, dbi_, &cursor);
   CHECK_EQ(retval, MDB_SUCCESS) << mdb_strerror(retval);
   MDB_val mdbkey;
   MDB_val mdbval;
@@ -175,7 +167,6 @@ bool LmdbDataset<K, V, KCoder, VCoder>::first_key(K* key) {
   CHECK_EQ(retval, MDB_SUCCESS) << mdb_strerror(retval);
 
   mdb_cursor_close(cursor);
-  mdb_txn_abort(iter_txn);
 
   if (!KCoder::deserialize(reinterpret_cast<char*>(mdbkey.mv_data),
       mdbkey.mv_size, key)) {
@@ -192,14 +183,8 @@ bool LmdbDataset<K, V, KCoder, VCoder>::last_key(K* key) {
 
   int retval;
 
-  MDB_txn* iter_txn;
-
-  retval = mdb_txn_begin(env_, NULL, MDB_RDONLY, &iter_txn);
-  CHECK_EQ(MDB_SUCCESS, retval) << "mdb_txn_begin failed "
-      << mdb_strerror(retval);
-
   MDB_cursor* cursor;
-  retval = mdb_cursor_open(iter_txn, dbi_, &cursor);
+  retval = mdb_cursor_open(read_txn_, dbi_, &cursor);
   CHECK_EQ(retval, MDB_SUCCESS) << mdb_strerror(retval);
   MDB_val mdbkey;
   MDB_val mdbval;
@@ -207,7 +192,6 @@ bool LmdbDataset<K, V, KCoder, VCoder>::last_key(K* key) {
   CHECK_EQ(retval, MDB_SUCCESS) << mdb_strerror(retval);
 
   mdb_cursor_close(cursor);
-  mdb_txn_abort(iter_txn);
 
   if (!KCoder::deserialize(reinterpret_cast<char*>(mdbkey.mv_data),
       mdbkey.mv_size, key)) {
@@ -222,16 +206,24 @@ template <typename K, typename V, typename KCoder, typename VCoder>
 bool LmdbDataset<K, V, KCoder, VCoder>::commit() {
   DLOG(INFO) << "LMDB: Commit";
 
-  CHECK_NOTNULL(txn_);
+  CHECK_NOTNULL(write_txn_);
 
   int retval;
-  retval = mdb_txn_commit(txn_);
+  retval = mdb_txn_commit(write_txn_);
   if (MDB_SUCCESS != retval) {
     LOG(ERROR) << "mdb_txn_commit failed " << mdb_strerror(retval);
     return false;
   }
 
-  retval = mdb_txn_begin(env_, NULL, 0, &txn_);
+  mdb_txn_abort(read_txn_);
+
+  retval = mdb_txn_begin(env_, NULL, 0, &write_txn_);
+  if (MDB_SUCCESS != retval) {
+    LOG(ERROR) << "mdb_txn_begin failed " << mdb_strerror(retval);
+    return false;
+  }
+
+  retval = mdb_txn_begin(env_, NULL, MDB_RDONLY, &read_txn_);
   if (MDB_SUCCESS != retval) {
     LOG(ERROR) << "mdb_txn_begin failed " << mdb_strerror(retval);
     return false;
@@ -245,11 +237,14 @@ void LmdbDataset<K, V, KCoder, VCoder>::close() {
   DLOG(INFO) << "LMDB: Close";
 
   if (env_ && dbi_) {
+    mdb_txn_abort(write_txn_);
+    mdb_txn_abort(read_txn_);
     mdb_close(env_, dbi_);
     mdb_env_close(env_);
     env_ = NULL;
     dbi_ = 0;
-    txn_ = NULL;
+    write_txn_ = NULL;
+    read_txn_ = NULL;
   }
 }
 
@@ -268,14 +263,8 @@ typename LmdbDataset<K, V, KCoder, VCoder>::const_iterator
     LmdbDataset<K, V, KCoder, VCoder>::begin() const {
   int retval;
 
-  MDB_txn* iter_txn;
-
-  retval = mdb_txn_begin(env_, NULL, MDB_RDONLY, &iter_txn);
-  CHECK_EQ(MDB_SUCCESS, retval) << "mdb_txn_begin failed "
-      << mdb_strerror(retval);
-
   MDB_cursor* cursor;
-  retval = mdb_cursor_open(iter_txn, dbi_, &cursor);
+  retval = mdb_cursor_open(read_txn_, dbi_, &cursor);
   CHECK_EQ(retval, MDB_SUCCESS) << mdb_strerror(retval);
   MDB_val key;
   MDB_val val;
@@ -286,7 +275,9 @@ typename LmdbDataset<K, V, KCoder, VCoder>::const_iterator
 
   shared_ptr<DatasetState> state;
   if (MDB_SUCCESS == retval) {
-    state.reset(new LmdbState(cursor, iter_txn, &dbi_));
+    state.reset(new LmdbState(cursor, read_txn_, &dbi_));
+  } else {
+    mdb_cursor_close(cursor);
   }
   return const_iterator(this, state);
 }
index e5bd1d2..6645ca2 100644 (file)
@@ -751,6 +751,44 @@ TYPED_TEST(DatasetTest, TestReadOnlyGetNoCommitFails) {
   EXPECT_FALSE(dataset->get(key, &new_value));
 }
 
+TYPED_TEST(DatasetTest, TestCreateManyItersShortScope) {
+  UNPACK_TYPES;
+
+  string name = this->DBName();
+  shared_ptr<Dataset<string, value_type> > dataset =
+      DatasetFactory<string, value_type>(backend);
+  EXPECT_TRUE(dataset->open(name, Dataset<string, value_type>::New));
+
+  string key = this->TestKey();
+  value_type value = this->TestValue();
+  EXPECT_TRUE(dataset->put(key, value));
+  EXPECT_TRUE(dataset->commit());
+
+  for (int i = 0; i < 1000; ++i) {
+    typename Dataset<string, value_type>::const_iterator iter =
+        dataset->begin();
+  }
+}
+
+TYPED_TEST(DatasetTest, TestCreateManyItersLongScope) {
+  UNPACK_TYPES;
+
+  string name = this->DBName();
+  shared_ptr<Dataset<string, value_type> > dataset =
+      DatasetFactory<string, value_type>(backend);
+  EXPECT_TRUE(dataset->open(name, Dataset<string, value_type>::New));
+
+  string key = this->TestKey();
+  value_type value = this->TestValue();
+  EXPECT_TRUE(dataset->put(key, value));
+  EXPECT_TRUE(dataset->commit());
+
+  vector<typename Dataset<string, value_type>::const_iterator> iters;
+  for (int i = 0; i < 1000; ++i) {
+    iters.push_back(dataset->begin());
+  }
+}
+
 #undef UNPACK_TYPES
 
 }  // namespace caffe