#define CAFFE_UTIL_DB_LMDB_HPP
#include <string>
+#include <vector>
#include "lmdb.h"
class LMDBTransaction : public Transaction {
public:
- explicit LMDBTransaction(MDB_dbi* mdb_dbi, MDB_txn* mdb_txn)
- : mdb_dbi_(mdb_dbi), mdb_txn_(mdb_txn) { }
+ explicit LMDBTransaction(MDB_env* mdb_env)
+ : mdb_env_(mdb_env) { }
virtual void Put(const string& key, const string& value);
- virtual void Commit() { MDB_CHECK(mdb_txn_commit(mdb_txn_)); }
+ virtual void Commit();
private:
- MDB_dbi* mdb_dbi_;
- MDB_txn* mdb_txn_;
+ MDB_env* mdb_env_;
+ vector<string> keys, values;
+
+ void DoubleMapSize();
DISABLE_COPY_AND_ASSIGN(LMDBTransaction);
};
namespace caffe { namespace db {
-const size_t LMDB_MAP_SIZE = 1099511627776; // 1 TB
-
void LMDB::Open(const string& source, Mode mode) {
MDB_CHECK(mdb_env_create(&mdb_env_));
- MDB_CHECK(mdb_env_set_mapsize(mdb_env_, LMDB_MAP_SIZE));
if (mode == NEW) {
CHECK_EQ(mkdir(source.c_str(), 0744), 0) << "mkdir " << source << "failed";
}
}
LMDBTransaction* LMDB::NewTransaction() {
- MDB_txn* mdb_txn;
- MDB_CHECK(mdb_txn_begin(mdb_env_, NULL, 0, &mdb_txn));
- MDB_CHECK(mdb_dbi_open(mdb_txn, NULL, 0, &mdb_dbi_));
- return new LMDBTransaction(&mdb_dbi_, mdb_txn);
+ return new LMDBTransaction(mdb_env_);
}
void LMDBTransaction::Put(const string& key, const string& value) {
- MDB_val mdb_key, mdb_value;
- mdb_key.mv_data = const_cast<char*>(key.data());
- mdb_key.mv_size = key.size();
- mdb_value.mv_data = const_cast<char*>(value.data());
- mdb_value.mv_size = value.size();
- MDB_CHECK(mdb_put(mdb_txn_, *mdb_dbi_, &mdb_key, &mdb_value, 0));
+ keys.push_back(key);
+ values.push_back(value);
+}
+
+void LMDBTransaction::Commit() {
+ MDB_dbi mdb_dbi;
+ MDB_val mdb_key, mdb_data;
+ MDB_txn *mdb_txn;
+
+ // Initialize MDB variables
+ MDB_CHECK(mdb_txn_begin(mdb_env_, NULL, 0, &mdb_txn));
+ MDB_CHECK(mdb_dbi_open(mdb_txn, NULL, 0, &mdb_dbi));
+
+ bool out_of_memory = false;
+ for (int i = 0; i < keys.size(); i++) {
+ mdb_key.mv_size = keys[i].size();
+ mdb_key.mv_data = const_cast<char*>(keys[i].data());
+ mdb_data.mv_size = values[i].size();
+ mdb_data.mv_data = const_cast<char*>(values[i].data());
+
+ int put_rc = mdb_put(mdb_txn, mdb_dbi, &mdb_key, &mdb_data, 0);
+ if (put_rc == MDB_MAP_FULL) {
+ out_of_memory = true;
+ break;
+ } else {
+ // Failed for some other reason
+ MDB_CHECK(put_rc);
+ }
+ }
+
+ if (!out_of_memory) {
+ // Commit the transaction
+ MDB_CHECK(mdb_txn_commit(mdb_txn));
+ mdb_dbi_close(mdb_env_, mdb_dbi);
+ keys.clear();
+ values.clear();
+ } else {
+ // Double the map size and retry
+ mdb_txn_abort(mdb_txn);
+ mdb_dbi_close(mdb_env_, mdb_dbi);
+ DoubleMapSize();
+ Commit();
+ }
+}
+
+void LMDBTransaction::DoubleMapSize() {
+ struct MDB_envinfo current_info;
+ MDB_CHECK(mdb_env_info(mdb_env_, ¤t_info));
+ size_t new_size = current_info.me_mapsize * 2;
+ DLOG(INFO) << "Doubling LMDB map size to " << (new_size>>20) << "MB ...";
+ MDB_CHECK(mdb_env_set_mapsize(mdb_env_, new_size));
}
} // namespace db