==============================================================================*/
#include "tensorflow/contrib/tensorboard/db/summary_db_writer.h"
+#include <deque>
+
#include "tensorflow/contrib/tensorboard/db/summary_converter.h"
#include "tensorflow/core/framework/graph.pb.h"
#include "tensorflow/core/framework/node_def.pb.h"
const char* kAudioPluginName = "audio";
const char* kHistogramPluginName = "histograms";
-const int kScalarSlots = 10000;
-const int kImageSlots = 10;
-const int kAudioSlots = 10;
-const int kHistogramSlots = 1;
-const int kTensorSlots = 10;
-
const int64 kReserveMinBytes = 32;
const double kReserveMultiplier = 1.5;
+const int64 kPreallocateRows = 1000;
// Flush is a misnomer because what we're actually doing is having lots
// of commits inside any SqliteTransaction that writes potentially
}
}
-int GetSlots(const Tensor& t, const SummaryMetadata& metadata) {
- if (metadata.plugin_data().plugin_name() == kScalarPluginName) {
- return kScalarSlots;
- } else if (metadata.plugin_data().plugin_name() == kImagePluginName) {
- return kImageSlots;
- } else if (metadata.plugin_data().plugin_name() == kAudioPluginName) {
- return kAudioSlots;
- } else if (metadata.plugin_data().plugin_name() == kHistogramPluginName) {
- return kHistogramSlots;
- } else if (t.dims() == 0 && t.dtype() != DT_STRING) {
- return kScalarSlots;
- } else {
- return kTensorSlots;
- }
-}
-
Status SetDescription(Sqlite* db, int64 id, const StringPiece& markdown) {
const char* sql = R"sql(
INSERT OR REPLACE INTO Descriptions (id, description) VALUES (?, ?)
return insert.StepAndReset();
}
- Status GetIsWatching(Sqlite* db, bool* is_watching)
- SQLITE_TRANSACTIONS_EXCLUDED(*db) LOCKS_EXCLUDED(mu_) {
- mutex_lock lock(mu_);
- if (experiment_id_ == kAbsent) {
- *is_watching = true;
- return Status::OK();
- }
- const char* sql = R"sql(
- SELECT is_watching FROM Experiments WHERE experiment_id = ?
- )sql";
- SqliteStatement stmt;
- TF_RETURN_IF_ERROR(db->Prepare(sql, &stmt));
- stmt.BindInt(1, experiment_id_);
- TF_RETURN_IF_ERROR(stmt.StepOnce());
- *is_watching = stmt.ColumnInt(0) != 0;
- return Status::OK();
- }
-
private:
Status InitializeUser(Sqlite* db, uint64 now) EXCLUSIVE_LOCKS_REQUIRED(mu_) {
if (user_id_ != kAbsent || user_name_.empty()) return Status::OK();
/// \brief Tensor writer for a single series, e.g. Tag.
///
-/// This class can be used to write an infinite stream of Tensors to the
-/// database in a fixed block of contiguous disk space. This is
-/// accomplished using Algorithm R reservoir sampling.
-///
-/// The reservoir consists of a fixed number of rows, which are inserted
-/// using ZEROBLOB upon receiving the first sample, which is used to
-/// predict how big the other ones are likely to be. This is done
-/// transactionally in a way that tries to be mindful of other processes
-/// that might be trying to access the same DB.
-///
-/// Once the reservoir fills up, rows are replaced at random, and writes
-/// gradually become no-ops. This allows long training to go fast
-/// without configuration. The exception is when someone is actually
-/// looking at TensorBoard. When that happens, the "keep last" behavior
-/// is turned on and Append() will always result in a write.
-///
-/// If no one is watching training, this class still holds on to the
-/// most recent "dangling" Tensor, so if Finish() is called, the most
-/// recent training state can be written to disk.
-///
-/// The randomly selected sampling points should be consistent across
-/// multiple instances.
-///
/// This class is thread safe.
class SeriesWriter {
public:
- SeriesWriter(int64 series, int slots, RunMetadata* meta)
- : series_{series},
- slots_{slots},
- meta_{meta},
- rng_{std::mt19937_64::default_seed} {
+ SeriesWriter(int64 series, RunMetadata* meta) : series_{series}, meta_{meta} {
DCHECK(series_ > 0);
- DCHECK(slots_ > 0);
}
Status Append(Sqlite* db, int64 step, uint64 now, double computed_time,
- Tensor t) SQLITE_TRANSACTIONS_EXCLUDED(*db)
+ const Tensor& t) SQLITE_TRANSACTIONS_EXCLUDED(*db)
LOCKS_EXCLUDED(mu_) {
mutex_lock lock(mu_);
if (rowids_.empty()) {
return s;
}
}
- DCHECK(rowids_.size() == slots_);
- int64 rowid;
- size_t i = count_;
- if (i < slots_) {
- rowid = last_rowid_ = rowids_[i];
- } else {
- i = rng_() % (i + 1);
- if (i < slots_) {
- rowid = last_rowid_ = rowids_[i];
- } else {
- bool keep_last;
- TF_RETURN_IF_ERROR(meta_->GetIsWatching(db, &keep_last));
- if (!keep_last) {
- ++count_;
- dangling_tensor_.reset(new Tensor(std::move(t)));
- dangling_step_ = step;
- dangling_computed_time_ = computed_time;
- return Status::OK();
- }
- rowid = last_rowid_;
- }
- }
+ int64 rowid = rowids_.front();
Status s = Write(db, rowid, step, computed_time, t);
if (s.ok()) {
++count_;
- dangling_tensor_.reset();
}
+ rowids_.pop_front();
return s;
}
Status Finish(Sqlite* db) SQLITE_TRANSACTIONS_EXCLUDED(*db)
LOCKS_EXCLUDED(mu_) {
mutex_lock lock(mu_);
- // Short runs: Delete unused pre-allocated Tensors.
- if (count_ < rowids_.size()) {
+ // Delete unused pre-allocated Tensors.
+ if (!rowids_.empty()) {
SqliteTransaction txn(*db);
const char* sql = R"sql(
DELETE FROM Tensors WHERE rowid = ?
SqliteStatement deleter;
TF_RETURN_IF_ERROR(db->Prepare(sql, &deleter));
for (size_t i = count_; i < rowids_.size(); ++i) {
- deleter.BindInt(1, rowids_[i]);
+ deleter.BindInt(1, rowids_.front());
TF_RETURN_IF_ERROR(deleter.StepAndReset());
+ rowids_.pop_front();
}
TF_RETURN_IF_ERROR(txn.Commit());
rowids_.clear();
}
- // Long runs: Make last sample be the very most recent one.
- if (dangling_tensor_) {
- DCHECK(last_rowid_ != kAbsent);
- TF_RETURN_IF_ERROR(Write(db, last_rowid_, dangling_step_,
- dangling_computed_time_, *dangling_tensor_));
- dangling_tensor_.reset();
- }
return Status::OK();
}
Status Update(Sqlite* db, int64 step, double computed_time, const Tensor& t,
const StringPiece& data, int64 rowid) {
- // TODO(jart): How can we ensure reservoir fills on replace?
const char* sql = R"sql(
UPDATE OR REPLACE
Tensors
// TODO(jart): Maybe preallocate index pages by setting step. This
// is tricky because UPDATE OR REPLACE can have a side
// effect of deleting preallocated rows.
- for (int64 i = 0; i < slots_; ++i) {
+ for (int64 i = 0; i < kPreallocateRows; ++i) {
insert.BindInt(1, series_);
insert.BindInt(2, reserved_bytes);
TF_RETURN_WITH_CONTEXT_IF_ERROR(insert.StepAndReset(), "i=", i);
mutex mu_;
const int64 series_;
- const int slots_;
RunMetadata* const meta_;
- std::mt19937_64 rng_ GUARDED_BY(mu_);
uint64 count_ GUARDED_BY(mu_) = 0;
- int64 last_rowid_ GUARDED_BY(mu_) = kAbsent;
- std::vector<int64> rowids_ GUARDED_BY(mu_);
+ std::deque<int64> rowids_ GUARDED_BY(mu_);
uint64 unflushed_bytes_ GUARDED_BY(mu_) = 0;
- std::unique_ptr<Tensor> dangling_tensor_ GUARDED_BY(mu_);
- int64 dangling_step_ GUARDED_BY(mu_) = 0;
- double dangling_computed_time_ GUARDED_BY(mu_) = 0.0;
TF_DISALLOW_COPY_AND_ASSIGN(SeriesWriter);
};
explicit RunWriter(RunMetadata* meta) : meta_{meta} {}
Status Append(Sqlite* db, int64 tag_id, int64 step, uint64 now,
- double computed_time, Tensor t, int slots)
+ double computed_time, const Tensor& t)
SQLITE_TRANSACTIONS_EXCLUDED(*db) LOCKS_EXCLUDED(mu_) {
- SeriesWriter* writer = GetSeriesWriter(tag_id, slots);
- return writer->Append(db, step, now, computed_time, std::move(t));
+ SeriesWriter* writer = GetSeriesWriter(tag_id);
+ return writer->Append(db, step, now, computed_time, t);
}
Status Finish(Sqlite* db) SQLITE_TRANSACTIONS_EXCLUDED(*db)
}
private:
- SeriesWriter* GetSeriesWriter(int64 tag_id, int slots) LOCKS_EXCLUDED(mu_) {
+ SeriesWriter* GetSeriesWriter(int64 tag_id) LOCKS_EXCLUDED(mu_) {
mutex_lock sl(mu_);
auto spot = series_writers_.find(tag_id);
if (spot == series_writers_.end()) {
- SeriesWriter* writer = new SeriesWriter(tag_id, slots, meta_);
+ SeriesWriter* writer = new SeriesWriter(tag_id, meta_);
series_writers_[tag_id].reset(writer);
return writer;
} else {
TF_RETURN_IF_ERROR(
meta_.GetTagId(db_, now, computed_time, tag, &tag_id, metadata));
TF_RETURN_WITH_CONTEXT_IF_ERROR(
- run_.Append(db_, tag_id, step, now, computed_time, t,
- GetSlots(t, metadata)),
+ run_.Append(db_, tag_id, step, now, computed_time, t),
meta_.user_name(), "/", meta_.experiment_name(), "/", meta_.run_name(),
"/", tag, "@", step);
return Status::OK();
int64 tag_id;
TF_RETURN_IF_ERROR(meta_.GetTagId(db_, now, e->wall_time(), s->tag(),
&tag_id, s->metadata()));
- return run_.Append(db_, tag_id, e->step(), now, e->wall_time(), t,
- GetSlots(t, s->metadata()));
+ return run_.Append(db_, tag_id, e->step(), now, e->wall_time(), t);
}
// TODO(jart): Refactor Summary -> Tensor logic into separate file.
PatchPluginName(s->mutable_metadata(), kScalarPluginName);
TF_RETURN_IF_ERROR(meta_.GetTagId(db_, now, e->wall_time(), s->tag(),
&tag_id, s->metadata()));
- return run_.Append(db_, tag_id, e->step(), now, e->wall_time(),
- std::move(t), kScalarSlots);
+ return run_.Append(db_, tag_id, e->step(), now, e->wall_time(), t);
}
Status MigrateHistogram(const Event* e, Summary::Value* s, uint64 now) {
PatchPluginName(s->mutable_metadata(), kHistogramPluginName);
TF_RETURN_IF_ERROR(meta_.GetTagId(db_, now, e->wall_time(), s->tag(),
&tag_id, s->metadata()));
- return run_.Append(db_, tag_id, e->step(), now, e->wall_time(),
- std::move(t), kHistogramSlots);
+ return run_.Append(db_, tag_id, e->step(), now, e->wall_time(), t);
}
Status MigrateImage(const Event* e, Summary::Value* s, uint64 now) {
PatchPluginName(s->mutable_metadata(), kImagePluginName);
TF_RETURN_IF_ERROR(meta_.GetTagId(db_, now, e->wall_time(), s->tag(),
&tag_id, s->metadata()));
- return run_.Append(db_, tag_id, e->step(), now, e->wall_time(),
- std::move(t), kImageSlots);
+ return run_.Append(db_, tag_id, e->step(), now, e->wall_time(), t);
}
Status MigrateAudio(const Event* e, Summary::Value* s, uint64 now) {
PatchPluginName(s->mutable_metadata(), kAudioPluginName);
TF_RETURN_IF_ERROR(meta_.GetTagId(db_, now, e->wall_time(), s->tag(),
&tag_id, s->metadata()));
- return run_.Append(db_, tag_id, e->step(), now, e->wall_time(),
- std::move(t), kAudioSlots);
+ return run_.Append(db_, tag_id, e->step(), now, e->wall_time(), t);
}
Env* const env_;