Initialize Tizen 2.3
[external/leveldb.git] / db / db_impl.cc
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5 #include "db/db_impl.h"
6
7 #include <algorithm>
8 #include <set>
9 #include <string>
10 #include <stdint.h>
11 #include <stdio.h>
12 #include <vector>
13 #include "db/builder.h"
14 #include "db/db_iter.h"
15 #include "db/dbformat.h"
16 #include "db/filename.h"
17 #include "db/log_reader.h"
18 #include "db/log_writer.h"
19 #include "db/memtable.h"
20 #include "db/table_cache.h"
21 #include "db/version_set.h"
22 #include "db/write_batch_internal.h"
23 #include "leveldb/db.h"
24 #include "leveldb/env.h"
25 #include "leveldb/status.h"
26 #include "leveldb/table.h"
27 #include "leveldb/table_builder.h"
28 #include "port/port.h"
29 #include "table/block.h"
30 #include "table/merger.h"
31 #include "table/two_level_iterator.h"
32 #include "util/coding.h"
33 #include "util/logging.h"
34 #include "util/mutexlock.h"
35
36 namespace leveldb {
37
38 struct DBImpl::CompactionState {
39   Compaction* const compaction;
40
41   // Sequence numbers < smallest_snapshot are not significant since we
42   // will never have to service a snapshot below smallest_snapshot.
43   // Therefore if we have seen a sequence number S <= smallest_snapshot,
44   // we can drop all entries for the same key with sequence numbers < S.
45   SequenceNumber smallest_snapshot;
46
47   // Files produced by compaction
48   struct Output {
49     uint64_t number;
50     uint64_t file_size;
51     InternalKey smallest, largest;
52   };
53   std::vector<Output> outputs;
54
55   // State kept for output being generated
56   WritableFile* outfile;
57   TableBuilder* builder;
58
59   uint64_t total_bytes;
60
61   Output* current_output() { return &outputs[outputs.size()-1]; }
62
63   explicit CompactionState(Compaction* c)
64       : compaction(c),
65         outfile(NULL),
66         builder(NULL),
67         total_bytes(0) {
68   }
69 };
70
71 // Fix user-supplied options to be reasonable
72 template <class T,class V>
73 static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
74   if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
75   if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
76 }
77 Options SanitizeOptions(const std::string& dbname,
78                         const InternalKeyComparator* icmp,
79                         const Options& src) {
80   Options result = src;
81   result.comparator = icmp;
82   ClipToRange(&result.max_open_files,           20,     50000);
83   ClipToRange(&result.write_buffer_size,        64<<10, 1<<30);
84   ClipToRange(&result.block_size,               1<<10,  4<<20);
85   if (result.info_log == NULL) {
86     // Open a log file in the same directory as the db
87     src.env->CreateDir(dbname);  // In case it does not exist
88     src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname));
89     Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log);
90     if (!s.ok()) {
91       // No place suitable for logging
92       result.info_log = NULL;
93     }
94   }
95   if (result.block_cache == NULL) {
96     result.block_cache = NewLRUCache(8 << 20);
97   }
98   return result;
99 }
100
101 DBImpl::DBImpl(const Options& options, const std::string& dbname)
102     : env_(options.env),
103       internal_comparator_(options.comparator),
104       options_(SanitizeOptions(dbname, &internal_comparator_, options)),
105       owns_info_log_(options_.info_log != options.info_log),
106       owns_cache_(options_.block_cache != options.block_cache),
107       dbname_(dbname),
108       db_lock_(NULL),
109       shutting_down_(NULL),
110       bg_cv_(&mutex_),
111       mem_(new MemTable(internal_comparator_)),
112       imm_(NULL),
113       logfile_(NULL),
114       logfile_number_(0),
115       log_(NULL),
116       logger_(NULL),
117       logger_cv_(&mutex_),
118       bg_compaction_scheduled_(false),
119       manual_compaction_(NULL) {
120   mem_->Ref();
121   has_imm_.Release_Store(NULL);
122
123   // Reserve ten files or so for other uses and give the rest to TableCache.
124   const int table_cache_size = options.max_open_files - 10;
125   table_cache_ = new TableCache(dbname_, &options_, table_cache_size);
126
127   versions_ = new VersionSet(dbname_, &options_, table_cache_,
128                              &internal_comparator_);
129 }
130
131 DBImpl::~DBImpl() {
132   // Wait for background work to finish
133   mutex_.Lock();
134   shutting_down_.Release_Store(this);  // Any non-NULL value is ok
135   while (bg_compaction_scheduled_) {
136     bg_cv_.Wait();
137   }
138   mutex_.Unlock();
139
140   if (db_lock_ != NULL) {
141     env_->UnlockFile(db_lock_);
142   }
143
144   delete versions_;
145   if (mem_ != NULL) mem_->Unref();
146   if (imm_ != NULL) imm_->Unref();
147   delete log_;
148   delete logfile_;
149   delete table_cache_;
150
151   if (owns_info_log_) {
152     delete options_.info_log;
153   }
154   if (owns_cache_) {
155     delete options_.block_cache;
156   }
157 }
158
159 Status DBImpl::NewDB() {
160   VersionEdit new_db;
161   new_db.SetComparatorName(user_comparator()->Name());
162   new_db.SetLogNumber(0);
163   new_db.SetNextFile(2);
164   new_db.SetLastSequence(0);
165
166   const std::string manifest = DescriptorFileName(dbname_, 1);
167   WritableFile* file;
168   Status s = env_->NewWritableFile(manifest, &file);
169   if (!s.ok()) {
170     return s;
171   }
172   {
173     log::Writer log(file);
174     std::string record;
175     new_db.EncodeTo(&record);
176     s = log.AddRecord(record);
177     if (s.ok()) {
178       s = file->Close();
179     }
180   }
181   delete file;
182   if (s.ok()) {
183     // Make "CURRENT" file that points to the new manifest file.
184     s = SetCurrentFile(env_, dbname_, 1);
185   } else {
186     env_->DeleteFile(manifest);
187   }
188   return s;
189 }
190
191 void DBImpl::MaybeIgnoreError(Status* s) const {
192   if (s->ok() || options_.paranoid_checks) {
193     // No change needed
194   } else {
195     Log(options_.info_log, "Ignoring error %s", s->ToString().c_str());
196     *s = Status::OK();
197   }
198 }
199
200 void DBImpl::DeleteObsoleteFiles() {
201   // Make a set of all of the live files
202   std::set<uint64_t> live = pending_outputs_;
203   versions_->AddLiveFiles(&live);
204
205   std::vector<std::string> filenames;
206   env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
207   uint64_t number;
208   FileType type;
209   for (size_t i = 0; i < filenames.size(); i++) {
210     if (ParseFileName(filenames[i], &number, &type)) {
211       bool keep = true;
212       switch (type) {
213         case kLogFile:
214           keep = ((number >= versions_->LogNumber()) ||
215                   (number == versions_->PrevLogNumber()));
216           break;
217         case kDescriptorFile:
218           // Keep my manifest file, and any newer incarnations'
219           // (in case there is a race that allows other incarnations)
220           keep = (number >= versions_->ManifestFileNumber());
221           break;
222         case kTableFile:
223           keep = (live.find(number) != live.end());
224           break;
225         case kTempFile:
226           // Any temp files that are currently being written to must
227           // be recorded in pending_outputs_, which is inserted into "live"
228           keep = (live.find(number) != live.end());
229           break;
230         case kCurrentFile:
231         case kDBLockFile:
232         case kInfoLogFile:
233           keep = true;
234           break;
235       }
236
237       if (!keep) {
238         if (type == kTableFile) {
239           table_cache_->Evict(number);
240         }
241         Log(options_.info_log, "Delete type=%d #%lld\n",
242             int(type),
243             static_cast<unsigned long long>(number));
244         env_->DeleteFile(dbname_ + "/" + filenames[i]);
245       }
246     }
247   }
248 }
249
250 Status DBImpl::Recover(VersionEdit* edit) {
251   mutex_.AssertHeld();
252
253   // Ignore error from CreateDir since the creation of the DB is
254   // committed only when the descriptor is created, and this directory
255   // may already exist from a previous failed creation attempt.
256   env_->CreateDir(dbname_);
257   assert(db_lock_ == NULL);
258   Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
259   if (!s.ok()) {
260     return s;
261   }
262
263   if (!env_->FileExists(CurrentFileName(dbname_))) {
264     if (options_.create_if_missing) {
265       s = NewDB();
266       if (!s.ok()) {
267         return s;
268       }
269     } else {
270       return Status::InvalidArgument(
271           dbname_, "does not exist (create_if_missing is false)");
272     }
273   } else {
274     if (options_.error_if_exists) {
275       return Status::InvalidArgument(
276           dbname_, "exists (error_if_exists is true)");
277     }
278   }
279
280   s = versions_->Recover();
281   if (s.ok()) {
282     SequenceNumber max_sequence(0);
283
284     // Recover from all newer log files than the ones named in the
285     // descriptor (new log files may have been added by the previous
286     // incarnation without registering them in the descriptor).
287     //
288     // Note that PrevLogNumber() is no longer used, but we pay
289     // attention to it in case we are recovering a database
290     // produced by an older version of leveldb.
291     const uint64_t min_log = versions_->LogNumber();
292     const uint64_t prev_log = versions_->PrevLogNumber();
293     std::vector<std::string> filenames;
294     s = env_->GetChildren(dbname_, &filenames);
295     if (!s.ok()) {
296       return s;
297     }
298     uint64_t number;
299     FileType type;
300     std::vector<uint64_t> logs;
301     for (size_t i = 0; i < filenames.size(); i++) {
302       if (ParseFileName(filenames[i], &number, &type)
303           && type == kLogFile
304           && ((number >= min_log) || (number == prev_log))) {
305         logs.push_back(number);
306       }
307     }
308
309     // Recover in the order in which the logs were generated
310     std::sort(logs.begin(), logs.end());
311     for (size_t i = 0; i < logs.size(); i++) {
312       s = RecoverLogFile(logs[i], edit, &max_sequence);
313
314       // The previous incarnation may not have written any MANIFEST
315       // records after allocating this log number.  So we manually
316       // update the file number allocation counter in VersionSet.
317       versions_->MarkFileNumberUsed(logs[i]);
318     }
319
320     if (s.ok()) {
321       if (versions_->LastSequence() < max_sequence) {
322         versions_->SetLastSequence(max_sequence);
323       }
324     }
325   }
326
327   return s;
328 }
329
330 Status DBImpl::RecoverLogFile(uint64_t log_number,
331                               VersionEdit* edit,
332                               SequenceNumber* max_sequence) {
333   struct LogReporter : public log::Reader::Reporter {
334     Env* env;
335     Logger* info_log;
336     const char* fname;
337     Status* status;  // NULL if options_.paranoid_checks==false
338     virtual void Corruption(size_t bytes, const Status& s) {
339       Log(info_log, "%s%s: dropping %d bytes; %s",
340           (this->status == NULL ? "(ignoring error) " : ""),
341           fname, static_cast<int>(bytes), s.ToString().c_str());
342       if (this->status != NULL && this->status->ok()) *this->status = s;
343     }
344   };
345
346   mutex_.AssertHeld();
347
348   // Open the log file
349   std::string fname = LogFileName(dbname_, log_number);
350   SequentialFile* file;
351   Status status = env_->NewSequentialFile(fname, &file);
352   if (!status.ok()) {
353     MaybeIgnoreError(&status);
354     return status;
355   }
356
357   // Create the log reader.
358   LogReporter reporter;
359   reporter.env = env_;
360   reporter.info_log = options_.info_log;
361   reporter.fname = fname.c_str();
362   reporter.status = (options_.paranoid_checks ? &status : NULL);
363   // We intentially make log::Reader do checksumming even if
364   // paranoid_checks==false so that corruptions cause entire commits
365   // to be skipped instead of propagating bad information (like overly
366   // large sequence numbers).
367   log::Reader reader(file, &reporter, true/*checksum*/,
368                      0/*initial_offset*/);
369   Log(options_.info_log, "Recovering log #%llu",
370       (unsigned long long) log_number);
371
372   // Read all the records and add to a memtable
373   std::string scratch;
374   Slice record;
375   WriteBatch batch;
376   MemTable* mem = NULL;
377   while (reader.ReadRecord(&record, &scratch) &&
378          status.ok()) {
379     if (record.size() < 12) {
380       reporter.Corruption(
381           record.size(), Status::Corruption("log record too small"));
382       continue;
383     }
384     WriteBatchInternal::SetContents(&batch, record);
385
386     if (mem == NULL) {
387       mem = new MemTable(internal_comparator_);
388       mem->Ref();
389     }
390     status = WriteBatchInternal::InsertInto(&batch, mem);
391     MaybeIgnoreError(&status);
392     if (!status.ok()) {
393       break;
394     }
395     const SequenceNumber last_seq =
396         WriteBatchInternal::Sequence(&batch) +
397         WriteBatchInternal::Count(&batch) - 1;
398     if (last_seq > *max_sequence) {
399       *max_sequence = last_seq;
400     }
401
402     if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
403       status = WriteLevel0Table(mem, edit, NULL);
404       if (!status.ok()) {
405         // Reflect errors immediately so that conditions like full
406         // file-systems cause the DB::Open() to fail.
407         break;
408       }
409       mem->Unref();
410       mem = NULL;
411     }
412   }
413
414   if (status.ok() && mem != NULL) {
415     status = WriteLevel0Table(mem, edit, NULL);
416     // Reflect errors immediately so that conditions like full
417     // file-systems cause the DB::Open() to fail.
418   }
419
420   if (mem != NULL) mem->Unref();
421   delete file;
422   return status;
423 }
424
425 Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
426                                 Version* base) {
427   mutex_.AssertHeld();
428   const uint64_t start_micros = env_->NowMicros();
429   FileMetaData meta;
430   meta.number = versions_->NewFileNumber();
431   pending_outputs_.insert(meta.number);
432   Iterator* iter = mem->NewIterator();
433   Log(options_.info_log, "Level-0 table #%llu: started",
434       (unsigned long long) meta.number);
435
436   Status s;
437   {
438     mutex_.Unlock();
439     s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
440     mutex_.Lock();
441   }
442
443   Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
444       (unsigned long long) meta.number,
445       (unsigned long long) meta.file_size,
446       s.ToString().c_str());
447   delete iter;
448   pending_outputs_.erase(meta.number);
449
450
451   // Note that if file_size is zero, the file has been deleted and
452   // should not be added to the manifest.
453   int level = 0;
454   if (s.ok() && meta.file_size > 0) {
455     const Slice min_user_key = meta.smallest.user_key();
456     const Slice max_user_key = meta.largest.user_key();
457     if (base != NULL) {
458       level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
459     }
460     edit->AddFile(level, meta.number, meta.file_size,
461                   meta.smallest, meta.largest);
462   }
463
464   CompactionStats stats;
465   stats.micros = env_->NowMicros() - start_micros;
466   stats.bytes_written = meta.file_size;
467   stats_[level].Add(stats);
468   return s;
469 }
470
471 Status DBImpl::CompactMemTable() {
472   mutex_.AssertHeld();
473   assert(imm_ != NULL);
474
475   // Save the contents of the memtable as a new Table
476   VersionEdit edit;
477   Version* base = versions_->current();
478   base->Ref();
479   Status s = WriteLevel0Table(imm_, &edit, base);
480   base->Unref();
481
482   if (s.ok() && shutting_down_.Acquire_Load()) {
483     s = Status::IOError("Deleting DB during memtable compaction");
484   }
485
486   // Replace immutable memtable with the generated Table
487   if (s.ok()) {
488     edit.SetPrevLogNumber(0);
489     edit.SetLogNumber(logfile_number_);  // Earlier logs no longer needed
490     s = versions_->LogAndApply(&edit, &mutex_);
491   }
492
493   if (s.ok()) {
494     // Commit to the new state
495     imm_->Unref();
496     imm_ = NULL;
497     has_imm_.Release_Store(NULL);
498     DeleteObsoleteFiles();
499   }
500
501   return s;
502 }
503
504 void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
505   int max_level_with_files = 1;
506   {
507     MutexLock l(&mutex_);
508     Version* base = versions_->current();
509     for (int level = 1; level < config::kNumLevels; level++) {
510       if (base->OverlapInLevel(level, begin, end)) {
511         max_level_with_files = level;
512       }
513     }
514   }
515   TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
516   for (int level = 0; level < max_level_with_files; level++) {
517     TEST_CompactRange(level, begin, end);
518   }
519 }
520
521 void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
522   assert(level >= 0);
523   assert(level + 1 < config::kNumLevels);
524
525   InternalKey begin_storage, end_storage;
526
527   ManualCompaction manual;
528   manual.level = level;
529   manual.done = false;
530   if (begin == NULL) {
531     manual.begin = NULL;
532   } else {
533     begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
534     manual.begin = &begin_storage;
535   }
536   if (end == NULL) {
537     manual.end = NULL;
538   } else {
539     end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
540     manual.end = &end_storage;
541   }
542
543   MutexLock l(&mutex_);
544   while (!manual.done) {
545     while (manual_compaction_ != NULL) {
546       bg_cv_.Wait();
547     }
548     manual_compaction_ = &manual;
549     MaybeScheduleCompaction();
550     while (manual_compaction_ == &manual) {
551       bg_cv_.Wait();
552     }
553   }
554 }
555
556 Status DBImpl::TEST_CompactMemTable() {
557   MutexLock l(&mutex_);
558   LoggerId self;
559   AcquireLoggingResponsibility(&self);
560   Status s = MakeRoomForWrite(true /* force compaction */);
561   ReleaseLoggingResponsibility(&self);
562   if (s.ok()) {
563     // Wait until the compaction completes
564     while (imm_ != NULL && bg_error_.ok()) {
565       bg_cv_.Wait();
566     }
567     if (imm_ != NULL) {
568       s = bg_error_;
569     }
570   }
571   return s;
572 }
573
574 void DBImpl::MaybeScheduleCompaction() {
575   mutex_.AssertHeld();
576   if (bg_compaction_scheduled_) {
577     // Already scheduled
578   } else if (shutting_down_.Acquire_Load()) {
579     // DB is being deleted; no more background compactions
580   } else if (imm_ == NULL &&
581              manual_compaction_ == NULL &&
582              !versions_->NeedsCompaction()) {
583     // No work to be done
584   } else {
585     bg_compaction_scheduled_ = true;
586     env_->Schedule(&DBImpl::BGWork, this);
587   }
588 }
589
590 void DBImpl::BGWork(void* db) {
591   reinterpret_cast<DBImpl*>(db)->BackgroundCall();
592 }
593
594 void DBImpl::BackgroundCall() {
595   MutexLock l(&mutex_);
596   assert(bg_compaction_scheduled_);
597   if (!shutting_down_.Acquire_Load()) {
598     BackgroundCompaction();
599   }
600   bg_compaction_scheduled_ = false;
601
602   // Previous compaction may have produced too many files in a level,
603   // so reschedule another compaction if needed.
604   MaybeScheduleCompaction();
605   bg_cv_.SignalAll();
606 }
607
608 void DBImpl::BackgroundCompaction() {
609   mutex_.AssertHeld();
610
611   if (imm_ != NULL) {
612     CompactMemTable();
613     return;
614   }
615
616   Compaction* c;
617   bool is_manual = (manual_compaction_ != NULL);
618   InternalKey manual_end;
619   if (is_manual) {
620     ManualCompaction* m = manual_compaction_;
621     c = versions_->CompactRange(m->level, m->begin, m->end);
622     m->done = (c == NULL);
623     if (c != NULL) {
624       manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
625     }
626     Log(options_.info_log,
627         "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
628         m->level,
629         (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
630         (m->end ? m->end->DebugString().c_str() : "(end)"),
631         (m->done ? "(end)" : manual_end.DebugString().c_str()));
632   } else {
633     c = versions_->PickCompaction();
634   }
635
636   Status status;
637   if (c == NULL) {
638     // Nothing to do
639   } else if (!is_manual && c->IsTrivialMove()) {
640     // Move file to next level
641     assert(c->num_input_files(0) == 1);
642     FileMetaData* f = c->input(0, 0);
643     c->edit()->DeleteFile(c->level(), f->number);
644     c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
645                        f->smallest, f->largest);
646     status = versions_->LogAndApply(c->edit(), &mutex_);
647     VersionSet::LevelSummaryStorage tmp;
648     Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
649         static_cast<unsigned long long>(f->number),
650         c->level() + 1,
651         static_cast<unsigned long long>(f->file_size),
652         status.ToString().c_str(),
653         versions_->LevelSummary(&tmp));
654   } else {
655     CompactionState* compact = new CompactionState(c);
656     status = DoCompactionWork(compact);
657     CleanupCompaction(compact);
658   }
659   delete c;
660
661   if (status.ok()) {
662     // Done
663   } else if (shutting_down_.Acquire_Load()) {
664     // Ignore compaction errors found during shutting down
665   } else {
666     Log(options_.info_log,
667         "Compaction error: %s", status.ToString().c_str());
668     if (options_.paranoid_checks && bg_error_.ok()) {
669       bg_error_ = status;
670     }
671   }
672
673   if (is_manual) {
674     ManualCompaction* m = manual_compaction_;
675     if (!m->done) {
676       // We only compacted part of the requested range.  Update *m
677       // to the range that is left to be compacted.
678       m->tmp_storage = manual_end;
679       m->begin = &m->tmp_storage;
680     }
681     manual_compaction_ = NULL;
682   }
683 }
684
685 void DBImpl::CleanupCompaction(CompactionState* compact) {
686   mutex_.AssertHeld();
687   if (compact->builder != NULL) {
688     // May happen if we get a shutdown call in the middle of compaction
689     compact->builder->Abandon();
690     delete compact->builder;
691   } else {
692     assert(compact->outfile == NULL);
693   }
694   delete compact->outfile;
695   for (size_t i = 0; i < compact->outputs.size(); i++) {
696     const CompactionState::Output& out = compact->outputs[i];
697     pending_outputs_.erase(out.number);
698   }
699   delete compact;
700 }
701
702 Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
703   assert(compact != NULL);
704   assert(compact->builder == NULL);
705   uint64_t file_number;
706   {
707     mutex_.Lock();
708     file_number = versions_->NewFileNumber();
709     pending_outputs_.insert(file_number);
710     CompactionState::Output out;
711     out.number = file_number;
712     out.smallest.Clear();
713     out.largest.Clear();
714     compact->outputs.push_back(out);
715     mutex_.Unlock();
716   }
717
718   // Make the output file
719   std::string fname = TableFileName(dbname_, file_number);
720   Status s = env_->NewWritableFile(fname, &compact->outfile);
721   if (s.ok()) {
722     compact->builder = new TableBuilder(options_, compact->outfile);
723   }
724   return s;
725 }
726
727 Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
728                                           Iterator* input) {
729   assert(compact != NULL);
730   assert(compact->outfile != NULL);
731   assert(compact->builder != NULL);
732
733   const uint64_t output_number = compact->current_output()->number;
734   assert(output_number != 0);
735
736   // Check for iterator errors
737   Status s = input->status();
738   const uint64_t current_entries = compact->builder->NumEntries();
739   if (s.ok()) {
740     s = compact->builder->Finish();
741   } else {
742     compact->builder->Abandon();
743   }
744   const uint64_t current_bytes = compact->builder->FileSize();
745   compact->current_output()->file_size = current_bytes;
746   compact->total_bytes += current_bytes;
747   delete compact->builder;
748   compact->builder = NULL;
749
750   // Finish and check for file errors
751   if (s.ok()) {
752     s = compact->outfile->Sync();
753   }
754   if (s.ok()) {
755     s = compact->outfile->Close();
756   }
757   delete compact->outfile;
758   compact->outfile = NULL;
759
760   if (s.ok() && current_entries > 0) {
761     // Verify that the table is usable
762     Iterator* iter = table_cache_->NewIterator(ReadOptions(),
763                                                output_number,
764                                                current_bytes);
765     s = iter->status();
766     delete iter;
767     if (s.ok()) {
768       Log(options_.info_log,
769           "Generated table #%llu: %lld keys, %lld bytes",
770           (unsigned long long) output_number,
771           (unsigned long long) current_entries,
772           (unsigned long long) current_bytes);
773     }
774   }
775   return s;
776 }
777
778
779 Status DBImpl::InstallCompactionResults(CompactionState* compact) {
780   mutex_.AssertHeld();
781   Log(options_.info_log,  "Compacted %d@%d + %d@%d files => %lld bytes",
782       compact->compaction->num_input_files(0),
783       compact->compaction->level(),
784       compact->compaction->num_input_files(1),
785       compact->compaction->level() + 1,
786       static_cast<long long>(compact->total_bytes));
787
788   // Add compaction outputs
789   compact->compaction->AddInputDeletions(compact->compaction->edit());
790   const int level = compact->compaction->level();
791   for (size_t i = 0; i < compact->outputs.size(); i++) {
792     const CompactionState::Output& out = compact->outputs[i];
793     compact->compaction->edit()->AddFile(
794         level + 1,
795         out.number, out.file_size, out.smallest, out.largest);
796     pending_outputs_.erase(out.number);
797   }
798   compact->outputs.clear();
799
800   Status s = versions_->LogAndApply(compact->compaction->edit(), &mutex_);
801   if (s.ok()) {
802     compact->compaction->ReleaseInputs();
803     DeleteObsoleteFiles();
804   } else {
805     // Discard any files we may have created during this failed compaction
806     for (size_t i = 0; i < compact->outputs.size(); i++) {
807       env_->DeleteFile(TableFileName(dbname_, compact->outputs[i].number));
808     }
809   }
810   return s;
811 }
812
813 Status DBImpl::DoCompactionWork(CompactionState* compact) {
814   const uint64_t start_micros = env_->NowMicros();
815   int64_t imm_micros = 0;  // Micros spent doing imm_ compactions
816
817   Log(options_.info_log,  "Compacting %d@%d + %d@%d files",
818       compact->compaction->num_input_files(0),
819       compact->compaction->level(),
820       compact->compaction->num_input_files(1),
821       compact->compaction->level() + 1);
822
823   assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
824   assert(compact->builder == NULL);
825   assert(compact->outfile == NULL);
826   if (snapshots_.empty()) {
827     compact->smallest_snapshot = versions_->LastSequence();
828   } else {
829     compact->smallest_snapshot = snapshots_.oldest()->number_;
830   }
831
832   // Release mutex while we're actually doing the compaction work
833   mutex_.Unlock();
834
835   Iterator* input = versions_->MakeInputIterator(compact->compaction);
836   input->SeekToFirst();
837   Status status;
838   ParsedInternalKey ikey;
839   std::string current_user_key;
840   bool has_current_user_key = false;
841   SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
842   for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
843     // Prioritize immutable compaction work
844     if (has_imm_.NoBarrier_Load() != NULL) {
845       const uint64_t imm_start = env_->NowMicros();
846       mutex_.Lock();
847       if (imm_ != NULL) {
848         CompactMemTable();
849         bg_cv_.SignalAll();  // Wakeup MakeRoomForWrite() if necessary
850       }
851       mutex_.Unlock();
852       imm_micros += (env_->NowMicros() - imm_start);
853     }
854
855     Slice key = input->key();
856     if (compact->compaction->ShouldStopBefore(key) &&
857         compact->builder != NULL) {
858       status = FinishCompactionOutputFile(compact, input);
859       if (!status.ok()) {
860         break;
861       }
862     }
863
864     // Handle key/value, add to state, etc.
865     bool drop = false;
866     if (!ParseInternalKey(key, &ikey)) {
867       // Do not hide error keys
868       current_user_key.clear();
869       has_current_user_key = false;
870       last_sequence_for_key = kMaxSequenceNumber;
871     } else {
872       if (!has_current_user_key ||
873           user_comparator()->Compare(ikey.user_key,
874                                      Slice(current_user_key)) != 0) {
875         // First occurrence of this user key
876         current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
877         has_current_user_key = true;
878         last_sequence_for_key = kMaxSequenceNumber;
879       }
880
881       if (last_sequence_for_key <= compact->smallest_snapshot) {
882         // Hidden by an newer entry for same user key
883         drop = true;    // (A)
884       } else if (ikey.type == kTypeDeletion &&
885                  ikey.sequence <= compact->smallest_snapshot &&
886                  compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
887         // For this user key:
888         // (1) there is no data in higher levels
889         // (2) data in lower levels will have larger sequence numbers
890         // (3) data in layers that are being compacted here and have
891         //     smaller sequence numbers will be dropped in the next
892         //     few iterations of this loop (by rule (A) above).
893         // Therefore this deletion marker is obsolete and can be dropped.
894         drop = true;
895       }
896
897       last_sequence_for_key = ikey.sequence;
898     }
899 #if 0
900     Log(options_.info_log,
901         "  Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
902         "%d smallest_snapshot: %d",
903         ikey.user_key.ToString().c_str(),
904         (int)ikey.sequence, ikey.type, kTypeValue, drop,
905         compact->compaction->IsBaseLevelForKey(ikey.user_key),
906         (int)last_sequence_for_key, (int)compact->smallest_snapshot);
907 #endif
908
909     if (!drop) {
910       // Open output file if necessary
911       if (compact->builder == NULL) {
912         status = OpenCompactionOutputFile(compact);
913         if (!status.ok()) {
914           break;
915         }
916       }
917       if (compact->builder->NumEntries() == 0) {
918         compact->current_output()->smallest.DecodeFrom(key);
919       }
920       compact->current_output()->largest.DecodeFrom(key);
921       compact->builder->Add(key, input->value());
922
923       // Close output file if it is big enough
924       if (compact->builder->FileSize() >=
925           compact->compaction->MaxOutputFileSize()) {
926         status = FinishCompactionOutputFile(compact, input);
927         if (!status.ok()) {
928           break;
929         }
930       }
931     }
932
933     input->Next();
934   }
935
936   if (status.ok() && shutting_down_.Acquire_Load()) {
937     status = Status::IOError("Deleting DB during compaction");
938   }
939   if (status.ok() && compact->builder != NULL) {
940     status = FinishCompactionOutputFile(compact, input);
941   }
942   if (status.ok()) {
943     status = input->status();
944   }
945   delete input;
946   input = NULL;
947
948   CompactionStats stats;
949   stats.micros = env_->NowMicros() - start_micros - imm_micros;
950   for (int which = 0; which < 2; which++) {
951     for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
952       stats.bytes_read += compact->compaction->input(which, i)->file_size;
953     }
954   }
955   for (size_t i = 0; i < compact->outputs.size(); i++) {
956     stats.bytes_written += compact->outputs[i].file_size;
957   }
958
959   mutex_.Lock();
960   stats_[compact->compaction->level() + 1].Add(stats);
961
962   if (status.ok()) {
963     status = InstallCompactionResults(compact);
964   }
965   VersionSet::LevelSummaryStorage tmp;
966   Log(options_.info_log,
967       "compacted to: %s", versions_->LevelSummary(&tmp));
968   return status;
969 }
970
971 namespace {
972 struct IterState {
973   port::Mutex* mu;
974   Version* version;
975   MemTable* mem;
976   MemTable* imm;
977 };
978
979 static void CleanupIteratorState(void* arg1, void* arg2) {
980   IterState* state = reinterpret_cast<IterState*>(arg1);
981   state->mu->Lock();
982   state->mem->Unref();
983   if (state->imm != NULL) state->imm->Unref();
984   state->version->Unref();
985   state->mu->Unlock();
986   delete state;
987 }
988 }  // namespace
989
990 Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
991                                       SequenceNumber* latest_snapshot) {
992   IterState* cleanup = new IterState;
993   mutex_.Lock();
994   *latest_snapshot = versions_->LastSequence();
995
996   // Collect together all needed child iterators
997   std::vector<Iterator*> list;
998   list.push_back(mem_->NewIterator());
999   mem_->Ref();
1000   if (imm_ != NULL) {
1001     list.push_back(imm_->NewIterator());
1002     imm_->Ref();
1003   }
1004   versions_->current()->AddIterators(options, &list);
1005   Iterator* internal_iter =
1006       NewMergingIterator(&internal_comparator_, &list[0], list.size());
1007   versions_->current()->Ref();
1008
1009   cleanup->mu = &mutex_;
1010   cleanup->mem = mem_;
1011   cleanup->imm = imm_;
1012   cleanup->version = versions_->current();
1013   internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL);
1014
1015   mutex_.Unlock();
1016   return internal_iter;
1017 }
1018
1019 Iterator* DBImpl::TEST_NewInternalIterator() {
1020   SequenceNumber ignored;
1021   return NewInternalIterator(ReadOptions(), &ignored);
1022 }
1023
1024 int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
1025   MutexLock l(&mutex_);
1026   return versions_->MaxNextLevelOverlappingBytes();
1027 }
1028
1029 Status DBImpl::Get(const ReadOptions& options,
1030                    const Slice& key,
1031                    std::string* value) {
1032   Status s;
1033   MutexLock l(&mutex_);
1034   SequenceNumber snapshot;
1035   if (options.snapshot != NULL) {
1036     snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
1037   } else {
1038     snapshot = versions_->LastSequence();
1039   }
1040
1041   MemTable* mem = mem_;
1042   MemTable* imm = imm_;
1043   Version* current = versions_->current();
1044   mem->Ref();
1045   if (imm != NULL) imm->Ref();
1046   current->Ref();
1047
1048   bool have_stat_update = false;
1049   Version::GetStats stats;
1050
1051   // Unlock while reading from files and memtables
1052   {
1053     mutex_.Unlock();
1054     // First look in the memtable, then in the immutable memtable (if any).
1055     LookupKey lkey(key, snapshot);
1056     if (mem->Get(lkey, value, &s)) {
1057       // Done
1058     } else if (imm != NULL && imm->Get(lkey, value, &s)) {
1059       // Done
1060     } else {
1061       s = current->Get(options, lkey, value, &stats);
1062       have_stat_update = true;
1063     }
1064     mutex_.Lock();
1065   }
1066
1067   if (have_stat_update && current->UpdateStats(stats)) {
1068     MaybeScheduleCompaction();
1069   }
1070   mem->Unref();
1071   if (imm != NULL) imm->Unref();
1072   current->Unref();
1073   return s;
1074 }
1075
1076 Iterator* DBImpl::NewIterator(const ReadOptions& options) {
1077   SequenceNumber latest_snapshot;
1078   Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot);
1079   return NewDBIterator(
1080       &dbname_, env_, user_comparator(), internal_iter,
1081       (options.snapshot != NULL
1082        ? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
1083        : latest_snapshot));
1084 }
1085
1086 const Snapshot* DBImpl::GetSnapshot() {
1087   MutexLock l(&mutex_);
1088   return snapshots_.New(versions_->LastSequence());
1089 }
1090
1091 void DBImpl::ReleaseSnapshot(const Snapshot* s) {
1092   MutexLock l(&mutex_);
1093   snapshots_.Delete(reinterpret_cast<const SnapshotImpl*>(s));
1094 }
1095
1096 // Convenience methods
1097 Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
1098   return DB::Put(o, key, val);
1099 }
1100
1101 Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
1102   return DB::Delete(options, key);
1103 }
1104
1105 // There is at most one thread that is the current logger.  This call
1106 // waits until preceding logger(s) have finished and becomes the
1107 // current logger.
1108 void DBImpl::AcquireLoggingResponsibility(LoggerId* self) {
1109   while (logger_ != NULL) {
1110     logger_cv_.Wait();
1111   }
1112   logger_ = self;
1113 }
1114
1115 void DBImpl::ReleaseLoggingResponsibility(LoggerId* self) {
1116   assert(logger_ == self);
1117   logger_ = NULL;
1118   logger_cv_.SignalAll();
1119 }
1120
1121 Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
1122   Status status;
1123   MutexLock l(&mutex_);
1124   LoggerId self;
1125   AcquireLoggingResponsibility(&self);
1126   status = MakeRoomForWrite(false);  // May temporarily release lock and wait
1127   uint64_t last_sequence = versions_->LastSequence();
1128   if (status.ok()) {
1129     WriteBatchInternal::SetSequence(updates, last_sequence + 1);
1130     last_sequence += WriteBatchInternal::Count(updates);
1131
1132     // Add to log and apply to memtable.  We can release the lock during
1133     // this phase since the "logger_" flag protects against concurrent
1134     // loggers and concurrent writes into mem_.
1135     {
1136       assert(logger_ == &self);
1137       mutex_.Unlock();
1138       status = log_->AddRecord(WriteBatchInternal::Contents(updates));
1139       if (status.ok() && options.sync) {
1140         status = logfile_->Sync();
1141       }
1142       if (status.ok()) {
1143         status = WriteBatchInternal::InsertInto(updates, mem_);
1144       }
1145       mutex_.Lock();
1146       assert(logger_ == &self);
1147     }
1148
1149     versions_->SetLastSequence(last_sequence);
1150   }
1151   ReleaseLoggingResponsibility(&self);
1152   return status;
1153 }
1154
1155 // REQUIRES: mutex_ is held
1156 // REQUIRES: this thread is the current logger
1157 Status DBImpl::MakeRoomForWrite(bool force) {
1158   mutex_.AssertHeld();
1159   assert(logger_ != NULL);
1160   bool allow_delay = !force;
1161   Status s;
1162   while (true) {
1163     if (!bg_error_.ok()) {
1164       // Yield previous error
1165       s = bg_error_;
1166       break;
1167     } else if (
1168         allow_delay &&
1169         versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
1170       // We are getting close to hitting a hard limit on the number of
1171       // L0 files.  Rather than delaying a single write by several
1172       // seconds when we hit the hard limit, start delaying each
1173       // individual write by 1ms to reduce latency variance.  Also,
1174       // this delay hands over some CPU to the compaction thread in
1175       // case it is sharing the same core as the writer.
1176       mutex_.Unlock();
1177       env_->SleepForMicroseconds(1000);
1178       allow_delay = false;  // Do not delay a single write more than once
1179       mutex_.Lock();
1180     } else if (!force &&
1181                (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
1182       // There is room in current memtable
1183       break;
1184     } else if (imm_ != NULL) {
1185       // We have filled up the current memtable, but the previous
1186       // one is still being compacted, so we wait.
1187       bg_cv_.Wait();
1188     } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
1189       // There are too many level-0 files.
1190       Log(options_.info_log, "waiting...\n");
1191       bg_cv_.Wait();
1192     } else {
1193       // Attempt to switch to a new memtable and trigger compaction of old
1194       assert(versions_->PrevLogNumber() == 0);
1195       uint64_t new_log_number = versions_->NewFileNumber();
1196       WritableFile* lfile = NULL;
1197       s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
1198       if (!s.ok()) {
1199         break;
1200       }
1201       delete log_;
1202       delete logfile_;
1203       logfile_ = lfile;
1204       logfile_number_ = new_log_number;
1205       log_ = new log::Writer(lfile);
1206       imm_ = mem_;
1207       has_imm_.Release_Store(imm_);
1208       mem_ = new MemTable(internal_comparator_);
1209       mem_->Ref();
1210       force = false;   // Do not force another compaction if have room
1211       MaybeScheduleCompaction();
1212     }
1213   }
1214   return s;
1215 }
1216
1217 bool DBImpl::GetProperty(const Slice& property, std::string* value) {
1218   value->clear();
1219
1220   MutexLock l(&mutex_);
1221   Slice in = property;
1222   Slice prefix("leveldb.");
1223   if (!in.starts_with(prefix)) return false;
1224   in.remove_prefix(prefix.size());
1225
1226   if (in.starts_with("num-files-at-level")) {
1227     in.remove_prefix(strlen("num-files-at-level"));
1228     uint64_t level;
1229     bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
1230     if (!ok || level >= config::kNumLevels) {
1231       return false;
1232     } else {
1233       char buf[100];
1234       snprintf(buf, sizeof(buf), "%d",
1235                versions_->NumLevelFiles(static_cast<int>(level)));
1236       *value = buf;
1237       return true;
1238     }
1239   } else if (in == "stats") {
1240     char buf[200];
1241     snprintf(buf, sizeof(buf),
1242              "                               Compactions\n"
1243              "Level  Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
1244              "--------------------------------------------------\n"
1245              );
1246     value->append(buf);
1247     for (int level = 0; level < config::kNumLevels; level++) {
1248       int files = versions_->NumLevelFiles(level);
1249       if (stats_[level].micros > 0 || files > 0) {
1250         snprintf(
1251             buf, sizeof(buf),
1252             "%3d %8d %8.0f %9.0f %8.0f %9.0f\n",
1253             level,
1254             files,
1255             versions_->NumLevelBytes(level) / 1048576.0,
1256             stats_[level].micros / 1e6,
1257             stats_[level].bytes_read / 1048576.0,
1258             stats_[level].bytes_written / 1048576.0);
1259         value->append(buf);
1260       }
1261     }
1262     return true;
1263   } else if (in == "sstables") {
1264     *value = versions_->current()->DebugString();
1265     return true;
1266   }
1267
1268   return false;
1269 }
1270
1271 void DBImpl::GetApproximateSizes(
1272     const Range* range, int n,
1273     uint64_t* sizes) {
1274   // TODO(opt): better implementation
1275   Version* v;
1276   {
1277     MutexLock l(&mutex_);
1278     versions_->current()->Ref();
1279     v = versions_->current();
1280   }
1281
1282   for (int i = 0; i < n; i++) {
1283     // Convert user_key into a corresponding internal key.
1284     InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
1285     InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
1286     uint64_t start = versions_->ApproximateOffsetOf(v, k1);
1287     uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
1288     sizes[i] = (limit >= start ? limit - start : 0);
1289   }
1290
1291   {
1292     MutexLock l(&mutex_);
1293     v->Unref();
1294   }
1295 }
1296
1297 // Default implementations of convenience methods that subclasses of DB
1298 // can call if they wish
1299 Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
1300   WriteBatch batch;
1301   batch.Put(key, value);
1302   return Write(opt, &batch);
1303 }
1304
1305 Status DB::Delete(const WriteOptions& opt, const Slice& key) {
1306   WriteBatch batch;
1307   batch.Delete(key);
1308   return Write(opt, &batch);
1309 }
1310
1311 DB::~DB() { }
1312
1313 Status DB::Open(const Options& options, const std::string& dbname,
1314                 DB** dbptr) {
1315   *dbptr = NULL;
1316
1317   DBImpl* impl = new DBImpl(options, dbname);
1318   impl->mutex_.Lock();
1319   VersionEdit edit;
1320   Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
1321   if (s.ok()) {
1322     uint64_t new_log_number = impl->versions_->NewFileNumber();
1323     WritableFile* lfile;
1324     s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
1325                                      &lfile);
1326     if (s.ok()) {
1327       edit.SetLogNumber(new_log_number);
1328       impl->logfile_ = lfile;
1329       impl->logfile_number_ = new_log_number;
1330       impl->log_ = new log::Writer(lfile);
1331       s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
1332     }
1333     if (s.ok()) {
1334       impl->DeleteObsoleteFiles();
1335       impl->MaybeScheduleCompaction();
1336     }
1337   }
1338   impl->mutex_.Unlock();
1339   if (s.ok()) {
1340     *dbptr = impl;
1341   } else {
1342     delete impl;
1343   }
1344   return s;
1345 }
1346
1347 Snapshot::~Snapshot() {
1348 }
1349
1350 Status DestroyDB(const std::string& dbname, const Options& options) {
1351   Env* env = options.env;
1352   std::vector<std::string> filenames;
1353   // Ignore error in case directory does not exist
1354   env->GetChildren(dbname, &filenames);
1355   if (filenames.empty()) {
1356     return Status::OK();
1357   }
1358
1359   FileLock* lock;
1360   const std::string lockname = LockFileName(dbname);
1361   Status result = env->LockFile(lockname, &lock);
1362   if (result.ok()) {
1363     uint64_t number;
1364     FileType type;
1365     for (size_t i = 0; i < filenames.size(); i++) {
1366       if (ParseFileName(filenames[i], &number, &type) &&
1367           filenames[i] != lockname) {  // Lock file will be deleted at end
1368         Status del = env->DeleteFile(dbname + "/" + filenames[i]);
1369         if (result.ok() && !del.ok()) {
1370           result = del;
1371         }
1372       }
1373     }
1374     env->UnlockFile(lock);  // Ignore error since state is already gone
1375     env->DeleteFile(lockname);
1376     env->DeleteDir(dbname);  // Ignore error in case dir contains other files
1377   }
1378   return result;
1379 }
1380
1381 }  // namespace leveldb