Initialize Tizen 2.3
[external/leveldb.git] / db / version_set.cc
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5 #include "db/version_set.h"
6
7 #include <algorithm>
8 #include <stdio.h>
9 #include "db/filename.h"
10 #include "db/log_reader.h"
11 #include "db/log_writer.h"
12 #include "db/memtable.h"
13 #include "db/table_cache.h"
14 #include "leveldb/env.h"
15 #include "leveldb/table_builder.h"
16 #include "table/merger.h"
17 #include "table/two_level_iterator.h"
18 #include "util/coding.h"
19 #include "util/logging.h"
20
21 namespace leveldb {
22
23 static const int kTargetFileSize = 2 * 1048576;
24
25 // Maximum bytes of overlaps in grandparent (i.e., level+2) before we
26 // stop building a single file in a level->level+1 compaction.
27 static const int64_t kMaxGrandParentOverlapBytes = 10 * kTargetFileSize;
28
29 static double MaxBytesForLevel(int level) {
30   // Note: the result for level zero is not really used since we set
31   // the level-0 compaction threshold based on number of files.
32   double result = 10 * 1048576.0;  // Result for both level-0 and level-1
33   while (level > 1) {
34     result *= 10;
35     level--;
36   }
37   return result;
38 }
39
40 static uint64_t MaxFileSizeForLevel(int level) {
41   return kTargetFileSize;  // We could vary per level to reduce number of files?
42 }
43
44 static int64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
45   int64_t sum = 0;
46   for (size_t i = 0; i < files.size(); i++) {
47     sum += files[i]->file_size;
48   }
49   return sum;
50 }
51
52 namespace {
53 std::string IntSetToString(const std::set<uint64_t>& s) {
54   std::string result = "{";
55   for (std::set<uint64_t>::const_iterator it = s.begin();
56        it != s.end();
57        ++it) {
58     result += (result.size() > 1) ? "," : "";
59     result += NumberToString(*it);
60   }
61   result += "}";
62   return result;
63 }
64 }  // namespace
65
66 Version::~Version() {
67   assert(refs_ == 0);
68
69   // Remove from linked list
70   prev_->next_ = next_;
71   next_->prev_ = prev_;
72
73   // Drop references to files
74   for (int level = 0; level < config::kNumLevels; level++) {
75     for (size_t i = 0; i < files_[level].size(); i++) {
76       FileMetaData* f = files_[level][i];
77       assert(f->refs > 0);
78       f->refs--;
79       if (f->refs <= 0) {
80         delete f;
81       }
82     }
83   }
84 }
85
86 int FindFile(const InternalKeyComparator& icmp,
87              const std::vector<FileMetaData*>& files,
88              const Slice& key) {
89   uint32_t left = 0;
90   uint32_t right = files.size();
91   while (left < right) {
92     uint32_t mid = (left + right) / 2;
93     const FileMetaData* f = files[mid];
94     if (icmp.InternalKeyComparator::Compare(f->largest.Encode(), key) < 0) {
95       // Key at "mid.largest" is < "target".  Therefore all
96       // files at or before "mid" are uninteresting.
97       left = mid + 1;
98     } else {
99       // Key at "mid.largest" is >= "target".  Therefore all files
100       // after "mid" are uninteresting.
101       right = mid;
102     }
103   }
104   return right;
105 }
106
107 static bool AfterFile(const Comparator* ucmp,
108                       const Slice* user_key, const FileMetaData* f) {
109   // NULL user_key occurs before all keys and is therefore never after *f
110   return (user_key != NULL &&
111           ucmp->Compare(*user_key, f->largest.user_key()) > 0);
112 }
113
114 static bool BeforeFile(const Comparator* ucmp,
115                        const Slice* user_key, const FileMetaData* f) {
116   // NULL user_key occurs after all keys and is therefore never before *f
117   return (user_key != NULL &&
118           ucmp->Compare(*user_key, f->smallest.user_key()) < 0);
119 }
120
121 bool SomeFileOverlapsRange(
122     const InternalKeyComparator& icmp,
123     bool disjoint_sorted_files,
124     const std::vector<FileMetaData*>& files,
125     const Slice* smallest_user_key,
126     const Slice* largest_user_key) {
127   const Comparator* ucmp = icmp.user_comparator();
128   if (!disjoint_sorted_files) {
129     // Need to check against all files
130     for (int i = 0; i < files.size(); i++) {
131       const FileMetaData* f = files[i];
132       if (AfterFile(ucmp, smallest_user_key, f) ||
133           BeforeFile(ucmp, largest_user_key, f)) {
134         // No overlap
135       } else {
136         return true;  // Overlap
137       }
138     }
139     return false;
140   }
141
142   // Binary search over file list
143   uint32_t index = 0;
144   if (smallest_user_key != NULL) {
145     // Find the earliest possible internal key for smallest_user_key
146     InternalKey small(*smallest_user_key, kMaxSequenceNumber,kValueTypeForSeek);
147     index = FindFile(icmp, files, small.Encode());
148   }
149
150   if (index >= files.size()) {
151     // beginning of range is after all files, so no overlap.
152     return false;
153   }
154
155   return !BeforeFile(ucmp, largest_user_key, files[index]);
156 }
157
158 // An internal iterator.  For a given version/level pair, yields
159 // information about the files in the level.  For a given entry, key()
160 // is the largest key that occurs in the file, and value() is an
161 // 16-byte value containing the file number and file size, both
162 // encoded using EncodeFixed64.
163 class Version::LevelFileNumIterator : public Iterator {
164  public:
165   LevelFileNumIterator(const InternalKeyComparator& icmp,
166                        const std::vector<FileMetaData*>* flist)
167       : icmp_(icmp),
168         flist_(flist),
169         index_(flist->size()) {        // Marks as invalid
170   }
171   virtual bool Valid() const {
172     return index_ < flist_->size();
173   }
174   virtual void Seek(const Slice& target) {
175     index_ = FindFile(icmp_, *flist_, target);
176   }
177   virtual void SeekToFirst() { index_ = 0; }
178   virtual void SeekToLast() {
179     index_ = flist_->empty() ? 0 : flist_->size() - 1;
180   }
181   virtual void Next() {
182     assert(Valid());
183     index_++;
184   }
185   virtual void Prev() {
186     assert(Valid());
187     if (index_ == 0) {
188       index_ = flist_->size();  // Marks as invalid
189     } else {
190       index_--;
191     }
192   }
193   Slice key() const {
194     assert(Valid());
195     return (*flist_)[index_]->largest.Encode();
196   }
197   Slice value() const {
198     assert(Valid());
199     EncodeFixed64(value_buf_, (*flist_)[index_]->number);
200     EncodeFixed64(value_buf_+8, (*flist_)[index_]->file_size);
201     return Slice(value_buf_, sizeof(value_buf_));
202   }
203   virtual Status status() const { return Status::OK(); }
204  private:
205   const InternalKeyComparator icmp_;
206   const std::vector<FileMetaData*>* const flist_;
207   uint32_t index_;
208
209   // Backing store for value().  Holds the file number and size.
210   mutable char value_buf_[16];
211 };
212
213 static Iterator* GetFileIterator(void* arg,
214                                  const ReadOptions& options,
215                                  const Slice& file_value) {
216   TableCache* cache = reinterpret_cast<TableCache*>(arg);
217   if (file_value.size() != 16) {
218     return NewErrorIterator(
219         Status::Corruption("FileReader invoked with unexpected value"));
220   } else {
221     return cache->NewIterator(options,
222                               DecodeFixed64(file_value.data()),
223                               DecodeFixed64(file_value.data() + 8));
224   }
225 }
226
227 Iterator* Version::NewConcatenatingIterator(const ReadOptions& options,
228                                             int level) const {
229   return NewTwoLevelIterator(
230       new LevelFileNumIterator(vset_->icmp_, &files_[level]),
231       &GetFileIterator, vset_->table_cache_, options);
232 }
233
234 void Version::AddIterators(const ReadOptions& options,
235                            std::vector<Iterator*>* iters) {
236   // Merge all level zero files together since they may overlap
237   for (size_t i = 0; i < files_[0].size(); i++) {
238     iters->push_back(
239         vset_->table_cache_->NewIterator(
240             options, files_[0][i]->number, files_[0][i]->file_size));
241   }
242
243   // For levels > 0, we can use a concatenating iterator that sequentially
244   // walks through the non-overlapping files in the level, opening them
245   // lazily.
246   for (int level = 1; level < config::kNumLevels; level++) {
247     if (!files_[level].empty()) {
248       iters->push_back(NewConcatenatingIterator(options, level));
249     }
250   }
251 }
252
253 // If "*iter" points at a value or deletion for user_key, store
254 // either the value, or a NotFound error and return true.
255 // Else return false.
256 static bool GetValue(const Comparator* cmp,
257                      Iterator* iter, const Slice& user_key,
258                      std::string* value,
259                      Status* s) {
260   if (!iter->Valid()) {
261     return false;
262   }
263   ParsedInternalKey parsed_key;
264   if (!ParseInternalKey(iter->key(), &parsed_key)) {
265     *s = Status::Corruption("corrupted key for ", user_key);
266     return true;
267   }
268   if (cmp->Compare(parsed_key.user_key, user_key) != 0) {
269     return false;
270   }
271   switch (parsed_key.type) {
272     case kTypeDeletion:
273       *s = Status::NotFound(Slice());  // Use an empty error message for speed
274       break;
275     case kTypeValue: {
276       Slice v = iter->value();
277       value->assign(v.data(), v.size());
278       break;
279     }
280   }
281   return true;
282 }
283
284 static bool NewestFirst(FileMetaData* a, FileMetaData* b) {
285   return a->number > b->number;
286 }
287
288 Status Version::Get(const ReadOptions& options,
289                     const LookupKey& k,
290                     std::string* value,
291                     GetStats* stats) {
292   Slice ikey = k.internal_key();
293   Slice user_key = k.user_key();
294   const Comparator* ucmp = vset_->icmp_.user_comparator();
295   Status s;
296
297   stats->seek_file = NULL;
298   stats->seek_file_level = -1;
299   FileMetaData* last_file_read = NULL;
300   int last_file_read_level = -1;
301
302   // We can search level-by-level since entries never hop across
303   // levels.  Therefore we are guaranteed that if we find data
304   // in an smaller level, later levels are irrelevant.
305   std::vector<FileMetaData*> tmp;
306   FileMetaData* tmp2;
307   for (int level = 0; level < config::kNumLevels; level++) {
308     size_t num_files = files_[level].size();
309     if (num_files == 0) continue;
310
311     // Get the list of files to search in this level
312     FileMetaData* const* files = &files_[level][0];
313     if (level == 0) {
314       // Level-0 files may overlap each other.  Find all files that
315       // overlap user_key and process them in order from newest to oldest.
316       tmp.reserve(num_files);
317       for (uint32_t i = 0; i < num_files; i++) {
318         FileMetaData* f = files[i];
319         if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
320             ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
321           tmp.push_back(f);
322         }
323       }
324       if (tmp.empty()) continue;
325
326       std::sort(tmp.begin(), tmp.end(), NewestFirst);
327       files = &tmp[0];
328       num_files = tmp.size();
329     } else {
330       // Binary search to find earliest index whose largest key >= ikey.
331       uint32_t index = FindFile(vset_->icmp_, files_[level], ikey);
332       if (index >= num_files) {
333         files = NULL;
334         num_files = 0;
335       } else {
336         tmp2 = files[index];
337         if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) {
338           // All of "tmp2" is past any data for user_key
339           files = NULL;
340           num_files = 0;
341         } else {
342           files = &tmp2;
343           num_files = 1;
344         }
345       }
346     }
347
348     for (uint32_t i = 0; i < num_files; ++i) {
349       if (last_file_read != NULL && stats->seek_file == NULL) {
350         // We have had more than one seek for this read.  Charge the 1st file.
351         stats->seek_file = last_file_read;
352         stats->seek_file_level = last_file_read_level;
353       }
354
355       FileMetaData* f = files[i];
356       last_file_read = f;
357       last_file_read_level = level;
358
359       Iterator* iter = vset_->table_cache_->NewIterator(
360           options,
361           f->number,
362           f->file_size);
363       iter->Seek(ikey);
364       const bool done = GetValue(ucmp, iter, user_key, value, &s);
365       if (!iter->status().ok()) {
366         s = iter->status();
367         delete iter;
368         return s;
369       } else {
370         delete iter;
371         if (done) {
372           return s;
373         }
374       }
375     }
376   }
377
378   return Status::NotFound(Slice());  // Use an empty error message for speed
379 }
380
381 bool Version::UpdateStats(const GetStats& stats) {
382   FileMetaData* f = stats.seek_file;
383   if (f != NULL) {
384     f->allowed_seeks--;
385     if (f->allowed_seeks <= 0 && file_to_compact_ == NULL) {
386       file_to_compact_ = f;
387       file_to_compact_level_ = stats.seek_file_level;
388       return true;
389     }
390   }
391   return false;
392 }
393
394 void Version::Ref() {
395   ++refs_;
396 }
397
398 void Version::Unref() {
399   assert(this != &vset_->dummy_versions_);
400   assert(refs_ >= 1);
401   --refs_;
402   if (refs_ == 0) {
403     delete this;
404   }
405 }
406
407 bool Version::OverlapInLevel(int level,
408                              const Slice* smallest_user_key,
409                              const Slice* largest_user_key) {
410   return SomeFileOverlapsRange(vset_->icmp_, (level > 0), files_[level],
411                                smallest_user_key, largest_user_key);
412 }
413
414 int Version::PickLevelForMemTableOutput(
415     const Slice& smallest_user_key,
416     const Slice& largest_user_key) {
417   int level = 0;
418   if (!OverlapInLevel(0, &smallest_user_key, &largest_user_key)) {
419     // Push to next level if there is no overlap in next level,
420     // and the #bytes overlapping in the level after that are limited.
421     InternalKey start(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek);
422     InternalKey limit(largest_user_key, 0, static_cast<ValueType>(0));
423     std::vector<FileMetaData*> overlaps;
424     while (level < config::kMaxMemCompactLevel) {
425       if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) {
426         break;
427       }
428       GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
429       const int64_t sum = TotalFileSize(overlaps);
430       if (sum > kMaxGrandParentOverlapBytes) {
431         break;
432       }
433       level++;
434     }
435   }
436   return level;
437 }
438
439 // Store in "*inputs" all files in "level" that overlap [begin,end]
440 void Version::GetOverlappingInputs(
441     int level,
442     const InternalKey* begin,
443     const InternalKey* end,
444     std::vector<FileMetaData*>* inputs) {
445   inputs->clear();
446   Slice user_begin, user_end;
447   if (begin != NULL) {
448     user_begin = begin->user_key();
449   }
450   if (end != NULL) {
451     user_end = end->user_key();
452   }
453   const Comparator* user_cmp = vset_->icmp_.user_comparator();
454   for (size_t i = 0; i < files_[level].size(); ) {
455     FileMetaData* f = files_[level][i++];
456     const Slice file_start = f->smallest.user_key();
457     const Slice file_limit = f->largest.user_key();
458     if (begin != NULL && user_cmp->Compare(file_limit, user_begin) < 0) {
459       // "f" is completely before specified range; skip it
460     } else if (end != NULL && user_cmp->Compare(file_start, user_end) > 0) {
461       // "f" is completely after specified range; skip it
462     } else {
463       inputs->push_back(f);
464       if (level == 0) {
465         // Level-0 files may overlap each other.  So check if the newly
466         // added file has expanded the range.  If so, restart search.
467         if (begin != NULL && user_cmp->Compare(file_start, user_begin) < 0) {
468           user_begin = file_start;
469           inputs->clear();
470           i = 0;
471         } else if (end != NULL && user_cmp->Compare(file_limit, user_end) > 0) {
472           user_end = file_limit;
473           inputs->clear();
474           i = 0;
475         }
476       }
477     }
478   }
479 }
480
481 std::string Version::DebugString() const {
482   std::string r;
483   for (int level = 0; level < config::kNumLevels; level++) {
484     // E.g.,
485     //   --- level 1 ---
486     //   17:123['a' .. 'd']
487     //   20:43['e' .. 'g']
488     r.append("--- level ");
489     AppendNumberTo(&r, level);
490     r.append(" ---\n");
491     const std::vector<FileMetaData*>& files = files_[level];
492     for (size_t i = 0; i < files.size(); i++) {
493       r.push_back(' ');
494       AppendNumberTo(&r, files[i]->number);
495       r.push_back(':');
496       AppendNumberTo(&r, files[i]->file_size);
497       r.append("[");
498       r.append(files[i]->smallest.DebugString());
499       r.append(" .. ");
500       r.append(files[i]->largest.DebugString());
501       r.append("]\n");
502     }
503   }
504   return r;
505 }
506
507 // A helper class so we can efficiently apply a whole sequence
508 // of edits to a particular state without creating intermediate
509 // Versions that contain full copies of the intermediate state.
510 class VersionSet::Builder {
511  private:
512   // Helper to sort by v->files_[file_number].smallest
513   struct BySmallestKey {
514     const InternalKeyComparator* internal_comparator;
515
516     bool operator()(FileMetaData* f1, FileMetaData* f2) const {
517       int r = internal_comparator->Compare(f1->smallest, f2->smallest);
518       if (r != 0) {
519         return (r < 0);
520       } else {
521         // Break ties by file number
522         return (f1->number < f2->number);
523       }
524     }
525   };
526
527   typedef std::set<FileMetaData*, BySmallestKey> FileSet;
528   struct LevelState {
529     std::set<uint64_t> deleted_files;
530     FileSet* added_files;
531   };
532
533   VersionSet* vset_;
534   Version* base_;
535   LevelState levels_[config::kNumLevels];
536
537  public:
538   // Initialize a builder with the files from *base and other info from *vset
539   Builder(VersionSet* vset, Version* base)
540       : vset_(vset),
541         base_(base) {
542     base_->Ref();
543     BySmallestKey cmp;
544     cmp.internal_comparator = &vset_->icmp_;
545     for (int level = 0; level < config::kNumLevels; level++) {
546       levels_[level].added_files = new FileSet(cmp);
547     }
548   }
549
550   ~Builder() {
551     for (int level = 0; level < config::kNumLevels; level++) {
552       const FileSet* added = levels_[level].added_files;
553       std::vector<FileMetaData*> to_unref;
554       to_unref.reserve(added->size());
555       for (FileSet::const_iterator it = added->begin();
556           it != added->end(); ++it) {
557         to_unref.push_back(*it);
558       }
559       delete added;
560       for (uint32_t i = 0; i < to_unref.size(); i++) {
561         FileMetaData* f = to_unref[i];
562         f->refs--;
563         if (f->refs <= 0) {
564           delete f;
565         }
566       }
567     }
568     base_->Unref();
569   }
570
571   // Apply all of the edits in *edit to the current state.
572   void Apply(VersionEdit* edit) {
573     // Update compaction pointers
574     for (size_t i = 0; i < edit->compact_pointers_.size(); i++) {
575       const int level = edit->compact_pointers_[i].first;
576       vset_->compact_pointer_[level] =
577           edit->compact_pointers_[i].second.Encode().ToString();
578     }
579
580     // Delete files
581     const VersionEdit::DeletedFileSet& del = edit->deleted_files_;
582     for (VersionEdit::DeletedFileSet::const_iterator iter = del.begin();
583          iter != del.end();
584          ++iter) {
585       const int level = iter->first;
586       const uint64_t number = iter->second;
587       levels_[level].deleted_files.insert(number);
588     }
589
590     // Add new files
591     for (size_t i = 0; i < edit->new_files_.size(); i++) {
592       const int level = edit->new_files_[i].first;
593       FileMetaData* f = new FileMetaData(edit->new_files_[i].second);
594       f->refs = 1;
595
596       // We arrange to automatically compact this file after
597       // a certain number of seeks.  Let's assume:
598       //   (1) One seek costs 10ms
599       //   (2) Writing or reading 1MB costs 10ms (100MB/s)
600       //   (3) A compaction of 1MB does 25MB of IO:
601       //         1MB read from this level
602       //         10-12MB read from next level (boundaries may be misaligned)
603       //         10-12MB written to next level
604       // This implies that 25 seeks cost the same as the compaction
605       // of 1MB of data.  I.e., one seek costs approximately the
606       // same as the compaction of 40KB of data.  We are a little
607       // conservative and allow approximately one seek for every 16KB
608       // of data before triggering a compaction.
609       f->allowed_seeks = (f->file_size / 16384);
610       if (f->allowed_seeks < 100) f->allowed_seeks = 100;
611
612       levels_[level].deleted_files.erase(f->number);
613       levels_[level].added_files->insert(f);
614     }
615   }
616
617   // Save the current state in *v.
618   void SaveTo(Version* v) {
619     BySmallestKey cmp;
620     cmp.internal_comparator = &vset_->icmp_;
621     for (int level = 0; level < config::kNumLevels; level++) {
622       // Merge the set of added files with the set of pre-existing files.
623       // Drop any deleted files.  Store the result in *v.
624       const std::vector<FileMetaData*>& base_files = base_->files_[level];
625       std::vector<FileMetaData*>::const_iterator base_iter = base_files.begin();
626       std::vector<FileMetaData*>::const_iterator base_end = base_files.end();
627       const FileSet* added = levels_[level].added_files;
628       v->files_[level].reserve(base_files.size() + added->size());
629       for (FileSet::const_iterator added_iter = added->begin();
630            added_iter != added->end();
631            ++added_iter) {
632         // Add all smaller files listed in base_
633         for (std::vector<FileMetaData*>::const_iterator bpos
634                  = std::upper_bound(base_iter, base_end, *added_iter, cmp);
635              base_iter != bpos;
636              ++base_iter) {
637           MaybeAddFile(v, level, *base_iter);
638         }
639
640         MaybeAddFile(v, level, *added_iter);
641       }
642
643       // Add remaining base files
644       for (; base_iter != base_end; ++base_iter) {
645         MaybeAddFile(v, level, *base_iter);
646       }
647
648 #ifndef NDEBUG
649       // Make sure there is no overlap in levels > 0
650       if (level > 0) {
651         for (uint32_t i = 1; i < v->files_[level].size(); i++) {
652           const InternalKey& prev_end = v->files_[level][i-1]->largest;
653           const InternalKey& this_begin = v->files_[level][i]->smallest;
654           if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) {
655             fprintf(stderr, "overlapping ranges in same level %s vs. %s\n",
656                     prev_end.DebugString().c_str(),
657                     this_begin.DebugString().c_str());
658             abort();
659           }
660         }
661       }
662 #endif
663     }
664   }
665
666   void MaybeAddFile(Version* v, int level, FileMetaData* f) {
667     if (levels_[level].deleted_files.count(f->number) > 0) {
668       // File is deleted: do nothing
669     } else {
670       std::vector<FileMetaData*>* files = &v->files_[level];
671       if (level > 0 && !files->empty()) {
672         // Must not overlap
673         assert(vset_->icmp_.Compare((*files)[files->size()-1]->largest,
674                                     f->smallest) < 0);
675       }
676       f->refs++;
677       files->push_back(f);
678     }
679   }
680 };
681
682 VersionSet::VersionSet(const std::string& dbname,
683                        const Options* options,
684                        TableCache* table_cache,
685                        const InternalKeyComparator* cmp)
686     : env_(options->env),
687       dbname_(dbname),
688       options_(options),
689       table_cache_(table_cache),
690       icmp_(*cmp),
691       next_file_number_(2),
692       manifest_file_number_(0),  // Filled by Recover()
693       last_sequence_(0),
694       log_number_(0),
695       prev_log_number_(0),
696       descriptor_file_(NULL),
697       descriptor_log_(NULL),
698       dummy_versions_(this),
699       current_(NULL) {
700   AppendVersion(new Version(this));
701 }
702
703 VersionSet::~VersionSet() {
704   current_->Unref();
705   assert(dummy_versions_.next_ == &dummy_versions_);  // List must be empty
706   delete descriptor_log_;
707   delete descriptor_file_;
708 }
709
710 void VersionSet::AppendVersion(Version* v) {
711   // Make "v" current
712   assert(v->refs_ == 0);
713   assert(v != current_);
714   if (current_ != NULL) {
715     current_->Unref();
716   }
717   current_ = v;
718   v->Ref();
719
720   // Append to linked list
721   v->prev_ = dummy_versions_.prev_;
722   v->next_ = &dummy_versions_;
723   v->prev_->next_ = v;
724   v->next_->prev_ = v;
725 }
726
727 Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
728   if (edit->has_log_number_) {
729     assert(edit->log_number_ >= log_number_);
730     assert(edit->log_number_ < next_file_number_);
731   } else {
732     edit->SetLogNumber(log_number_);
733   }
734
735   if (!edit->has_prev_log_number_) {
736     edit->SetPrevLogNumber(prev_log_number_);
737   }
738
739   edit->SetNextFile(next_file_number_);
740   edit->SetLastSequence(last_sequence_);
741
742   Version* v = new Version(this);
743   {
744     Builder builder(this, current_);
745     builder.Apply(edit);
746     builder.SaveTo(v);
747   }
748   Finalize(v);
749
750   // Initialize new descriptor log file if necessary by creating
751   // a temporary file that contains a snapshot of the current version.
752   std::string new_manifest_file;
753   Status s;
754   if (descriptor_log_ == NULL) {
755     // No reason to unlock *mu here since we only hit this path in the
756     // first call to LogAndApply (when opening the database).
757     assert(descriptor_file_ == NULL);
758     new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
759     edit->SetNextFile(next_file_number_);
760     s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
761     if (s.ok()) {
762       descriptor_log_ = new log::Writer(descriptor_file_);
763       s = WriteSnapshot(descriptor_log_);
764     }
765   }
766
767   // Unlock during expensive MANIFEST log write
768   {
769     mu->Unlock();
770
771     // Write new record to MANIFEST log
772     if (s.ok()) {
773       std::string record;
774       edit->EncodeTo(&record);
775       s = descriptor_log_->AddRecord(record);
776       if (s.ok()) {
777         s = descriptor_file_->Sync();
778       }
779     }
780
781     // If we just created a new descriptor file, install it by writing a
782     // new CURRENT file that points to it.
783     if (s.ok() && !new_manifest_file.empty()) {
784       s = SetCurrentFile(env_, dbname_, manifest_file_number_);
785     }
786
787     mu->Lock();
788   }
789
790   // Install the new version
791   if (s.ok()) {
792     AppendVersion(v);
793     log_number_ = edit->log_number_;
794     prev_log_number_ = edit->prev_log_number_;
795   } else {
796     delete v;
797     if (!new_manifest_file.empty()) {
798       delete descriptor_log_;
799       delete descriptor_file_;
800       descriptor_log_ = NULL;
801       descriptor_file_ = NULL;
802       env_->DeleteFile(new_manifest_file);
803     }
804   }
805
806   return s;
807 }
808
809 Status VersionSet::Recover() {
810   struct LogReporter : public log::Reader::Reporter {
811     Status* status;
812     virtual void Corruption(size_t bytes, const Status& s) {
813       if (this->status->ok()) *this->status = s;
814     }
815   };
816
817   // Read "CURRENT" file, which contains a pointer to the current manifest file
818   std::string current;
819   Status s = ReadFileToString(env_, CurrentFileName(dbname_), &current);
820   if (!s.ok()) {
821     return s;
822   }
823   if (current.empty() || current[current.size()-1] != '\n') {
824     return Status::Corruption("CURRENT file does not end with newline");
825   }
826   current.resize(current.size() - 1);
827
828   std::string dscname = dbname_ + "/" + current;
829   SequentialFile* file;
830   s = env_->NewSequentialFile(dscname, &file);
831   if (!s.ok()) {
832     return s;
833   }
834
835   bool have_log_number = false;
836   bool have_prev_log_number = false;
837   bool have_next_file = false;
838   bool have_last_sequence = false;
839   uint64_t next_file = 0;
840   uint64_t last_sequence = 0;
841   uint64_t log_number = 0;
842   uint64_t prev_log_number = 0;
843   Builder builder(this, current_);
844
845   {
846     LogReporter reporter;
847     reporter.status = &s;
848     log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/);
849     Slice record;
850     std::string scratch;
851     while (reader.ReadRecord(&record, &scratch) && s.ok()) {
852       VersionEdit edit;
853       s = edit.DecodeFrom(record);
854       if (s.ok()) {
855         if (edit.has_comparator_ &&
856             edit.comparator_ != icmp_.user_comparator()->Name()) {
857           s = Status::InvalidArgument(
858               edit.comparator_ + "does not match existing comparator ",
859               icmp_.user_comparator()->Name());
860         }
861       }
862
863       if (s.ok()) {
864         builder.Apply(&edit);
865       }
866
867       if (edit.has_log_number_) {
868         log_number = edit.log_number_;
869         have_log_number = true;
870       }
871
872       if (edit.has_prev_log_number_) {
873         prev_log_number = edit.prev_log_number_;
874         have_prev_log_number = true;
875       }
876
877       if (edit.has_next_file_number_) {
878         next_file = edit.next_file_number_;
879         have_next_file = true;
880       }
881
882       if (edit.has_last_sequence_) {
883         last_sequence = edit.last_sequence_;
884         have_last_sequence = true;
885       }
886     }
887   }
888   delete file;
889   file = NULL;
890
891   if (s.ok()) {
892     if (!have_next_file) {
893       s = Status::Corruption("no meta-nextfile entry in descriptor");
894     } else if (!have_log_number) {
895       s = Status::Corruption("no meta-lognumber entry in descriptor");
896     } else if (!have_last_sequence) {
897       s = Status::Corruption("no last-sequence-number entry in descriptor");
898     }
899
900     if (!have_prev_log_number) {
901       prev_log_number = 0;
902     }
903
904     MarkFileNumberUsed(prev_log_number);
905     MarkFileNumberUsed(log_number);
906   }
907
908   if (s.ok()) {
909     Version* v = new Version(this);
910     builder.SaveTo(v);
911     // Install recovered version
912     Finalize(v);
913     AppendVersion(v);
914     manifest_file_number_ = next_file;
915     next_file_number_ = next_file + 1;
916     last_sequence_ = last_sequence;
917     log_number_ = log_number;
918     prev_log_number_ = prev_log_number;
919   }
920
921   return s;
922 }
923
924 void VersionSet::MarkFileNumberUsed(uint64_t number) {
925   if (next_file_number_ <= number) {
926     next_file_number_ = number + 1;
927   }
928 }
929
930 void VersionSet::Finalize(Version* v) {
931   // Precomputed best level for next compaction
932   int best_level = -1;
933   double best_score = -1;
934
935   for (int level = 0; level < config::kNumLevels-1; level++) {
936     double score;
937     if (level == 0) {
938       // We treat level-0 specially by bounding the number of files
939       // instead of number of bytes for two reasons:
940       //
941       // (1) With larger write-buffer sizes, it is nice not to do too
942       // many level-0 compactions.
943       //
944       // (2) The files in level-0 are merged on every read and
945       // therefore we wish to avoid too many files when the individual
946       // file size is small (perhaps because of a small write-buffer
947       // setting, or very high compression ratios, or lots of
948       // overwrites/deletions).
949       score = v->files_[level].size() /
950           static_cast<double>(config::kL0_CompactionTrigger);
951     } else {
952       // Compute the ratio of current size to size limit.
953       const uint64_t level_bytes = TotalFileSize(v->files_[level]);
954       score = static_cast<double>(level_bytes) / MaxBytesForLevel(level);
955     }
956
957     if (score > best_score) {
958       best_level = level;
959       best_score = score;
960     }
961   }
962
963   v->compaction_level_ = best_level;
964   v->compaction_score_ = best_score;
965 }
966
967 Status VersionSet::WriteSnapshot(log::Writer* log) {
968   // TODO: Break up into multiple records to reduce memory usage on recovery?
969
970   // Save metadata
971   VersionEdit edit;
972   edit.SetComparatorName(icmp_.user_comparator()->Name());
973
974   // Save compaction pointers
975   for (int level = 0; level < config::kNumLevels; level++) {
976     if (!compact_pointer_[level].empty()) {
977       InternalKey key;
978       key.DecodeFrom(compact_pointer_[level]);
979       edit.SetCompactPointer(level, key);
980     }
981   }
982
983   // Save files
984   for (int level = 0; level < config::kNumLevels; level++) {
985     const std::vector<FileMetaData*>& files = current_->files_[level];
986     for (size_t i = 0; i < files.size(); i++) {
987       const FileMetaData* f = files[i];
988       edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest);
989     }
990   }
991
992   std::string record;
993   edit.EncodeTo(&record);
994   return log->AddRecord(record);
995 }
996
997 int VersionSet::NumLevelFiles(int level) const {
998   assert(level >= 0);
999   assert(level < config::kNumLevels);
1000   return current_->files_[level].size();
1001 }
1002
1003 const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const {
1004   // Update code if kNumLevels changes
1005   assert(config::kNumLevels == 7);
1006   snprintf(scratch->buffer, sizeof(scratch->buffer),
1007            "files[ %d %d %d %d %d %d %d ]",
1008            int(current_->files_[0].size()),
1009            int(current_->files_[1].size()),
1010            int(current_->files_[2].size()),
1011            int(current_->files_[3].size()),
1012            int(current_->files_[4].size()),
1013            int(current_->files_[5].size()),
1014            int(current_->files_[6].size()));
1015   return scratch->buffer;
1016 }
1017
1018 uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
1019   uint64_t result = 0;
1020   for (int level = 0; level < config::kNumLevels; level++) {
1021     const std::vector<FileMetaData*>& files = v->files_[level];
1022     for (size_t i = 0; i < files.size(); i++) {
1023       if (icmp_.Compare(files[i]->largest, ikey) <= 0) {
1024         // Entire file is before "ikey", so just add the file size
1025         result += files[i]->file_size;
1026       } else if (icmp_.Compare(files[i]->smallest, ikey) > 0) {
1027         // Entire file is after "ikey", so ignore
1028         if (level > 0) {
1029           // Files other than level 0 are sorted by meta->smallest, so
1030           // no further files in this level will contain data for
1031           // "ikey".
1032           break;
1033         }
1034       } else {
1035         // "ikey" falls in the range for this table.  Add the
1036         // approximate offset of "ikey" within the table.
1037         Table* tableptr;
1038         Iterator* iter = table_cache_->NewIterator(
1039             ReadOptions(), files[i]->number, files[i]->file_size, &tableptr);
1040         if (tableptr != NULL) {
1041           result += tableptr->ApproximateOffsetOf(ikey.Encode());
1042         }
1043         delete iter;
1044       }
1045     }
1046   }
1047   return result;
1048 }
1049
1050 void VersionSet::AddLiveFiles(std::set<uint64_t>* live) {
1051   for (Version* v = dummy_versions_.next_;
1052        v != &dummy_versions_;
1053        v = v->next_) {
1054     for (int level = 0; level < config::kNumLevels; level++) {
1055       const std::vector<FileMetaData*>& files = v->files_[level];
1056       for (size_t i = 0; i < files.size(); i++) {
1057         live->insert(files[i]->number);
1058       }
1059     }
1060   }
1061 }
1062
1063 int64_t VersionSet::NumLevelBytes(int level) const {
1064   assert(level >= 0);
1065   assert(level < config::kNumLevels);
1066   return TotalFileSize(current_->files_[level]);
1067 }
1068
1069 int64_t VersionSet::MaxNextLevelOverlappingBytes() {
1070   int64_t result = 0;
1071   std::vector<FileMetaData*> overlaps;
1072   for (int level = 1; level < config::kNumLevels - 1; level++) {
1073     for (size_t i = 0; i < current_->files_[level].size(); i++) {
1074       const FileMetaData* f = current_->files_[level][i];
1075       current_->GetOverlappingInputs(level+1, &f->smallest, &f->largest,
1076                                      &overlaps);
1077       const int64_t sum = TotalFileSize(overlaps);
1078       if (sum > result) {
1079         result = sum;
1080       }
1081     }
1082   }
1083   return result;
1084 }
1085
1086 // Stores the minimal range that covers all entries in inputs in
1087 // *smallest, *largest.
1088 // REQUIRES: inputs is not empty
1089 void VersionSet::GetRange(const std::vector<FileMetaData*>& inputs,
1090                           InternalKey* smallest,
1091                           InternalKey* largest) {
1092   assert(!inputs.empty());
1093   smallest->Clear();
1094   largest->Clear();
1095   for (size_t i = 0; i < inputs.size(); i++) {
1096     FileMetaData* f = inputs[i];
1097     if (i == 0) {
1098       *smallest = f->smallest;
1099       *largest = f->largest;
1100     } else {
1101       if (icmp_.Compare(f->smallest, *smallest) < 0) {
1102         *smallest = f->smallest;
1103       }
1104       if (icmp_.Compare(f->largest, *largest) > 0) {
1105         *largest = f->largest;
1106       }
1107     }
1108   }
1109 }
1110
1111 // Stores the minimal range that covers all entries in inputs1 and inputs2
1112 // in *smallest, *largest.
1113 // REQUIRES: inputs is not empty
1114 void VersionSet::GetRange2(const std::vector<FileMetaData*>& inputs1,
1115                            const std::vector<FileMetaData*>& inputs2,
1116                            InternalKey* smallest,
1117                            InternalKey* largest) {
1118   std::vector<FileMetaData*> all = inputs1;
1119   all.insert(all.end(), inputs2.begin(), inputs2.end());
1120   GetRange(all, smallest, largest);
1121 }
1122
1123 Iterator* VersionSet::MakeInputIterator(Compaction* c) {
1124   ReadOptions options;
1125   options.verify_checksums = options_->paranoid_checks;
1126   options.fill_cache = false;
1127
1128   // Level-0 files have to be merged together.  For other levels,
1129   // we will make a concatenating iterator per level.
1130   // TODO(opt): use concatenating iterator for level-0 if there is no overlap
1131   const int space = (c->level() == 0 ? c->inputs_[0].size() + 1 : 2);
1132   Iterator** list = new Iterator*[space];
1133   int num = 0;
1134   for (int which = 0; which < 2; which++) {
1135     if (!c->inputs_[which].empty()) {
1136       if (c->level() + which == 0) {
1137         const std::vector<FileMetaData*>& files = c->inputs_[which];
1138         for (size_t i = 0; i < files.size(); i++) {
1139           list[num++] = table_cache_->NewIterator(
1140               options, files[i]->number, files[i]->file_size);
1141         }
1142       } else {
1143         // Create concatenating iterator for the files from this level
1144         list[num++] = NewTwoLevelIterator(
1145             new Version::LevelFileNumIterator(icmp_, &c->inputs_[which]),
1146             &GetFileIterator, table_cache_, options);
1147       }
1148     }
1149   }
1150   assert(num <= space);
1151   Iterator* result = NewMergingIterator(&icmp_, list, num);
1152   delete[] list;
1153   return result;
1154 }
1155
1156 Compaction* VersionSet::PickCompaction() {
1157   Compaction* c;
1158   int level;
1159
1160   // We prefer compactions triggered by too much data in a level over
1161   // the compactions triggered by seeks.
1162   const bool size_compaction = (current_->compaction_score_ >= 1);
1163   const bool seek_compaction = (current_->file_to_compact_ != NULL);
1164   if (size_compaction) {
1165     level = current_->compaction_level_;
1166     assert(level >= 0);
1167     assert(level+1 < config::kNumLevels);
1168     c = new Compaction(level);
1169
1170     // Pick the first file that comes after compact_pointer_[level]
1171     for (size_t i = 0; i < current_->files_[level].size(); i++) {
1172       FileMetaData* f = current_->files_[level][i];
1173       if (compact_pointer_[level].empty() ||
1174           icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
1175         c->inputs_[0].push_back(f);
1176         break;
1177       }
1178     }
1179     if (c->inputs_[0].empty()) {
1180       // Wrap-around to the beginning of the key space
1181       c->inputs_[0].push_back(current_->files_[level][0]);
1182     }
1183   } else if (seek_compaction) {
1184     level = current_->file_to_compact_level_;
1185     c = new Compaction(level);
1186     c->inputs_[0].push_back(current_->file_to_compact_);
1187   } else {
1188     return NULL;
1189   }
1190
1191   c->input_version_ = current_;
1192   c->input_version_->Ref();
1193
1194   // Files in level 0 may overlap each other, so pick up all overlapping ones
1195   if (level == 0) {
1196     InternalKey smallest, largest;
1197     GetRange(c->inputs_[0], &smallest, &largest);
1198     // Note that the next call will discard the file we placed in
1199     // c->inputs_[0] earlier and replace it with an overlapping set
1200     // which will include the picked file.
1201     current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
1202     assert(!c->inputs_[0].empty());
1203   }
1204
1205   SetupOtherInputs(c);
1206
1207   return c;
1208 }
1209
1210 void VersionSet::SetupOtherInputs(Compaction* c) {
1211   const int level = c->level();
1212   InternalKey smallest, largest;
1213   GetRange(c->inputs_[0], &smallest, &largest);
1214
1215   current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1]);
1216
1217   // Get entire range covered by compaction
1218   InternalKey all_start, all_limit;
1219   GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
1220
1221   // See if we can grow the number of inputs in "level" without
1222   // changing the number of "level+1" files we pick up.
1223   if (!c->inputs_[1].empty()) {
1224     std::vector<FileMetaData*> expanded0;
1225     current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0);
1226     if (expanded0.size() > c->inputs_[0].size()) {
1227       InternalKey new_start, new_limit;
1228       GetRange(expanded0, &new_start, &new_limit);
1229       std::vector<FileMetaData*> expanded1;
1230       current_->GetOverlappingInputs(level+1, &new_start, &new_limit,
1231                                      &expanded1);
1232       if (expanded1.size() == c->inputs_[1].size()) {
1233         Log(options_->info_log,
1234             "Expanding@%d %d+%d to %d+%d\n",
1235             level,
1236             int(c->inputs_[0].size()),
1237             int(c->inputs_[1].size()),
1238             int(expanded0.size()),
1239             int(expanded1.size()));
1240         smallest = new_start;
1241         largest = new_limit;
1242         c->inputs_[0] = expanded0;
1243         c->inputs_[1] = expanded1;
1244         GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
1245       }
1246     }
1247   }
1248
1249   // Compute the set of grandparent files that overlap this compaction
1250   // (parent == level+1; grandparent == level+2)
1251   if (level + 2 < config::kNumLevels) {
1252     current_->GetOverlappingInputs(level + 2, &all_start, &all_limit,
1253                                    &c->grandparents_);
1254   }
1255
1256   if (false) {
1257     Log(options_->info_log, "Compacting %d '%s' .. '%s'",
1258         level,
1259         smallest.DebugString().c_str(),
1260         largest.DebugString().c_str());
1261   }
1262
1263   // Update the place where we will do the next compaction for this level.
1264   // We update this immediately instead of waiting for the VersionEdit
1265   // to be applied so that if the compaction fails, we will try a different
1266   // key range next time.
1267   compact_pointer_[level] = largest.Encode().ToString();
1268   c->edit_.SetCompactPointer(level, largest);
1269 }
1270
1271 Compaction* VersionSet::CompactRange(
1272     int level,
1273     const InternalKey* begin,
1274     const InternalKey* end) {
1275   std::vector<FileMetaData*> inputs;
1276   current_->GetOverlappingInputs(level, begin, end, &inputs);
1277   if (inputs.empty()) {
1278     return NULL;
1279   }
1280
1281   // Avoid compacting too much in one shot in case the range is large.
1282   const uint64_t limit = MaxFileSizeForLevel(level);
1283   uint64_t total = 0;
1284   for (int i = 0; i < inputs.size(); i++) {
1285     uint64_t s = inputs[i]->file_size;
1286     total += s;
1287     if (total >= limit) {
1288       inputs.resize(i + 1);
1289       break;
1290     }
1291   }
1292
1293   Compaction* c = new Compaction(level);
1294   c->input_version_ = current_;
1295   c->input_version_->Ref();
1296   c->inputs_[0] = inputs;
1297   SetupOtherInputs(c);
1298   return c;
1299 }
1300
1301 Compaction::Compaction(int level)
1302     : level_(level),
1303       max_output_file_size_(MaxFileSizeForLevel(level)),
1304       input_version_(NULL),
1305       grandparent_index_(0),
1306       seen_key_(false),
1307       overlapped_bytes_(0) {
1308   for (int i = 0; i < config::kNumLevels; i++) {
1309     level_ptrs_[i] = 0;
1310   }
1311 }
1312
1313 Compaction::~Compaction() {
1314   if (input_version_ != NULL) {
1315     input_version_->Unref();
1316   }
1317 }
1318
1319 bool Compaction::IsTrivialMove() const {
1320   // Avoid a move if there is lots of overlapping grandparent data.
1321   // Otherwise, the move could create a parent file that will require
1322   // a very expensive merge later on.
1323   return (num_input_files(0) == 1 &&
1324           num_input_files(1) == 0 &&
1325           TotalFileSize(grandparents_) <= kMaxGrandParentOverlapBytes);
1326 }
1327
1328 void Compaction::AddInputDeletions(VersionEdit* edit) {
1329   for (int which = 0; which < 2; which++) {
1330     for (size_t i = 0; i < inputs_[which].size(); i++) {
1331       edit->DeleteFile(level_ + which, inputs_[which][i]->number);
1332     }
1333   }
1334 }
1335
1336 bool Compaction::IsBaseLevelForKey(const Slice& user_key) {
1337   // Maybe use binary search to find right entry instead of linear search?
1338   const Comparator* user_cmp = input_version_->vset_->icmp_.user_comparator();
1339   for (int lvl = level_ + 2; lvl < config::kNumLevels; lvl++) {
1340     const std::vector<FileMetaData*>& files = input_version_->files_[lvl];
1341     for (; level_ptrs_[lvl] < files.size(); ) {
1342       FileMetaData* f = files[level_ptrs_[lvl]];
1343       if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) {
1344         // We've advanced far enough
1345         if (user_cmp->Compare(user_key, f->smallest.user_key()) >= 0) {
1346           // Key falls in this file's range, so definitely not base level
1347           return false;
1348         }
1349         break;
1350       }
1351       level_ptrs_[lvl]++;
1352     }
1353   }
1354   return true;
1355 }
1356
1357 bool Compaction::ShouldStopBefore(const Slice& internal_key) {
1358   // Scan to find earliest grandparent file that contains key.
1359   const InternalKeyComparator* icmp = &input_version_->vset_->icmp_;
1360   while (grandparent_index_ < grandparents_.size() &&
1361       icmp->Compare(internal_key,
1362                     grandparents_[grandparent_index_]->largest.Encode()) > 0) {
1363     if (seen_key_) {
1364       overlapped_bytes_ += grandparents_[grandparent_index_]->file_size;
1365     }
1366     grandparent_index_++;
1367   }
1368   seen_key_ = true;
1369
1370   if (overlapped_bytes_ > kMaxGrandParentOverlapBytes) {
1371     // Too much overlap for current output; start new output
1372     overlapped_bytes_ = 0;
1373     return true;
1374   } else {
1375     return false;
1376   }
1377 }
1378
1379 void Compaction::ReleaseInputs() {
1380   if (input_version_ != NULL) {
1381     input_version_->Unref();
1382     input_version_ = NULL;
1383   }
1384 }
1385
1386 }  // namespace leveldb