1310aeb43ad35f504527251f4d474aae4413e907
[platform/upstream/leveldb.git] / db / version_set.cc
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5 #include "db/version_set.h"
6
7 #include <algorithm>
8 #include <stdio.h>
9 #include "db/filename.h"
10 #include "db/log_reader.h"
11 #include "db/log_writer.h"
12 #include "db/memtable.h"
13 #include "db/table_cache.h"
14 #include "leveldb/env.h"
15 #include "leveldb/table_builder.h"
16 #include "table/merger.h"
17 #include "table/two_level_iterator.h"
18 #include "util/coding.h"
19 #include "util/logging.h"
20
21 namespace leveldb {
22
23 static const int kTargetFileSize = 2 * 1048576;
24
25 // Maximum bytes of overlaps in grandparent (i.e., level+2) before we
26 // stop building a single file in a level->level+1 compaction.
27 static const int64_t kMaxGrandParentOverlapBytes = 10 * kTargetFileSize;
28
29 // Maximum number of bytes in all compacted files.  We avoid expanding
30 // the lower level file set of a compaction if it would make the
31 // total compaction cover more than this many bytes.
32 static const int64_t kExpandedCompactionByteSizeLimit = 25 * kTargetFileSize;
33
34 static double MaxBytesForLevel(int level) {
35   // Note: the result for level zero is not really used since we set
36   // the level-0 compaction threshold based on number of files.
37   double result = 10 * 1048576.0;  // Result for both level-0 and level-1
38   while (level > 1) {
39     result *= 10;
40     level--;
41   }
42   return result;
43 }
44
45 static uint64_t MaxFileSizeForLevel(int level) {
46   return kTargetFileSize;  // We could vary per level to reduce number of files?
47 }
48
49 static int64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
50   int64_t sum = 0;
51   for (size_t i = 0; i < files.size(); i++) {
52     sum += files[i]->file_size;
53   }
54   return sum;
55 }
56
57 namespace {
58 std::string IntSetToString(const std::set<uint64_t>& s) {
59   std::string result = "{";
60   for (std::set<uint64_t>::const_iterator it = s.begin();
61        it != s.end();
62        ++it) {
63     result += (result.size() > 1) ? "," : "";
64     result += NumberToString(*it);
65   }
66   result += "}";
67   return result;
68 }
69 }  // namespace
70
71 Version::~Version() {
72   assert(refs_ == 0);
73
74   // Remove from linked list
75   prev_->next_ = next_;
76   next_->prev_ = prev_;
77
78   // Drop references to files
79   for (int level = 0; level < config::kNumLevels; level++) {
80     for (size_t i = 0; i < files_[level].size(); i++) {
81       FileMetaData* f = files_[level][i];
82       assert(f->refs > 0);
83       f->refs--;
84       if (f->refs <= 0) {
85         delete f;
86       }
87     }
88   }
89 }
90
91 int FindFile(const InternalKeyComparator& icmp,
92              const std::vector<FileMetaData*>& files,
93              const Slice& key) {
94   uint32_t left = 0;
95   uint32_t right = files.size();
96   while (left < right) {
97     uint32_t mid = (left + right) / 2;
98     const FileMetaData* f = files[mid];
99     if (icmp.InternalKeyComparator::Compare(f->largest.Encode(), key) < 0) {
100       // Key at "mid.largest" is < "target".  Therefore all
101       // files at or before "mid" are uninteresting.
102       left = mid + 1;
103     } else {
104       // Key at "mid.largest" is >= "target".  Therefore all files
105       // after "mid" are uninteresting.
106       right = mid;
107     }
108   }
109   return right;
110 }
111
112 static bool AfterFile(const Comparator* ucmp,
113                       const Slice* user_key, const FileMetaData* f) {
114   // NULL user_key occurs before all keys and is therefore never after *f
115   return (user_key != NULL &&
116           ucmp->Compare(*user_key, f->largest.user_key()) > 0);
117 }
118
119 static bool BeforeFile(const Comparator* ucmp,
120                        const Slice* user_key, const FileMetaData* f) {
121   // NULL user_key occurs after all keys and is therefore never before *f
122   return (user_key != NULL &&
123           ucmp->Compare(*user_key, f->smallest.user_key()) < 0);
124 }
125
126 bool SomeFileOverlapsRange(
127     const InternalKeyComparator& icmp,
128     bool disjoint_sorted_files,
129     const std::vector<FileMetaData*>& files,
130     const Slice* smallest_user_key,
131     const Slice* largest_user_key) {
132   const Comparator* ucmp = icmp.user_comparator();
133   if (!disjoint_sorted_files) {
134     // Need to check against all files
135     for (int i = 0; i < files.size(); i++) {
136       const FileMetaData* f = files[i];
137       if (AfterFile(ucmp, smallest_user_key, f) ||
138           BeforeFile(ucmp, largest_user_key, f)) {
139         // No overlap
140       } else {
141         return true;  // Overlap
142       }
143     }
144     return false;
145   }
146
147   // Binary search over file list
148   uint32_t index = 0;
149   if (smallest_user_key != NULL) {
150     // Find the earliest possible internal key for smallest_user_key
151     InternalKey small(*smallest_user_key, kMaxSequenceNumber,kValueTypeForSeek);
152     index = FindFile(icmp, files, small.Encode());
153   }
154
155   if (index >= files.size()) {
156     // beginning of range is after all files, so no overlap.
157     return false;
158   }
159
160   return !BeforeFile(ucmp, largest_user_key, files[index]);
161 }
162
163 // An internal iterator.  For a given version/level pair, yields
164 // information about the files in the level.  For a given entry, key()
165 // is the largest key that occurs in the file, and value() is an
166 // 16-byte value containing the file number and file size, both
167 // encoded using EncodeFixed64.
168 class Version::LevelFileNumIterator : public Iterator {
169  public:
170   LevelFileNumIterator(const InternalKeyComparator& icmp,
171                        const std::vector<FileMetaData*>* flist)
172       : icmp_(icmp),
173         flist_(flist),
174         index_(flist->size()) {        // Marks as invalid
175   }
176   virtual bool Valid() const {
177     return index_ < flist_->size();
178   }
179   virtual void Seek(const Slice& target) {
180     index_ = FindFile(icmp_, *flist_, target);
181   }
182   virtual void SeekToFirst() { index_ = 0; }
183   virtual void SeekToLast() {
184     index_ = flist_->empty() ? 0 : flist_->size() - 1;
185   }
186   virtual void Next() {
187     assert(Valid());
188     index_++;
189   }
190   virtual void Prev() {
191     assert(Valid());
192     if (index_ == 0) {
193       index_ = flist_->size();  // Marks as invalid
194     } else {
195       index_--;
196     }
197   }
198   Slice key() const {
199     assert(Valid());
200     return (*flist_)[index_]->largest.Encode();
201   }
202   Slice value() const {
203     assert(Valid());
204     EncodeFixed64(value_buf_, (*flist_)[index_]->number);
205     EncodeFixed64(value_buf_+8, (*flist_)[index_]->file_size);
206     return Slice(value_buf_, sizeof(value_buf_));
207   }
208   virtual Status status() const { return Status::OK(); }
209  private:
210   const InternalKeyComparator icmp_;
211   const std::vector<FileMetaData*>* const flist_;
212   uint32_t index_;
213
214   // Backing store for value().  Holds the file number and size.
215   mutable char value_buf_[16];
216 };
217
218 static Iterator* GetFileIterator(void* arg,
219                                  const ReadOptions& options,
220                                  const Slice& file_value) {
221   TableCache* cache = reinterpret_cast<TableCache*>(arg);
222   if (file_value.size() != 16) {
223     return NewErrorIterator(
224         Status::Corruption("FileReader invoked with unexpected value"));
225   } else {
226     return cache->NewIterator(options,
227                               DecodeFixed64(file_value.data()),
228                               DecodeFixed64(file_value.data() + 8));
229   }
230 }
231
232 Iterator* Version::NewConcatenatingIterator(const ReadOptions& options,
233                                             int level) const {
234   return NewTwoLevelIterator(
235       new LevelFileNumIterator(vset_->icmp_, &files_[level]),
236       &GetFileIterator, vset_->table_cache_, options);
237 }
238
239 void Version::AddIterators(const ReadOptions& options,
240                            std::vector<Iterator*>* iters) {
241   // Merge all level zero files together since they may overlap
242   for (size_t i = 0; i < files_[0].size(); i++) {
243     iters->push_back(
244         vset_->table_cache_->NewIterator(
245             options, files_[0][i]->number, files_[0][i]->file_size));
246   }
247
248   // For levels > 0, we can use a concatenating iterator that sequentially
249   // walks through the non-overlapping files in the level, opening them
250   // lazily.
251   for (int level = 1; level < config::kNumLevels; level++) {
252     if (!files_[level].empty()) {
253       iters->push_back(NewConcatenatingIterator(options, level));
254     }
255   }
256 }
257
258 // If "*iter" points at a value or deletion for user_key, store
259 // either the value, or a NotFound error and return true.
260 // Else return false.
261 static bool GetValue(const Comparator* cmp,
262                      Iterator* iter, const Slice& user_key,
263                      std::string* value,
264                      Status* s) {
265   if (!iter->Valid()) {
266     return false;
267   }
268   ParsedInternalKey parsed_key;
269   if (!ParseInternalKey(iter->key(), &parsed_key)) {
270     *s = Status::Corruption("corrupted key for ", user_key);
271     return true;
272   }
273   if (cmp->Compare(parsed_key.user_key, user_key) != 0) {
274     return false;
275   }
276   switch (parsed_key.type) {
277     case kTypeDeletion:
278       *s = Status::NotFound(Slice());  // Use an empty error message for speed
279       break;
280     case kTypeValue: {
281       Slice v = iter->value();
282       value->assign(v.data(), v.size());
283       break;
284     }
285   }
286   return true;
287 }
288
289 static bool NewestFirst(FileMetaData* a, FileMetaData* b) {
290   return a->number > b->number;
291 }
292
293 Status Version::Get(const ReadOptions& options,
294                     const LookupKey& k,
295                     std::string* value,
296                     GetStats* stats) {
297   Slice ikey = k.internal_key();
298   Slice user_key = k.user_key();
299   const Comparator* ucmp = vset_->icmp_.user_comparator();
300   Status s;
301
302   stats->seek_file = NULL;
303   stats->seek_file_level = -1;
304   FileMetaData* last_file_read = NULL;
305   int last_file_read_level = -1;
306
307   // We can search level-by-level since entries never hop across
308   // levels.  Therefore we are guaranteed that if we find data
309   // in an smaller level, later levels are irrelevant.
310   std::vector<FileMetaData*> tmp;
311   FileMetaData* tmp2;
312   for (int level = 0; level < config::kNumLevels; level++) {
313     size_t num_files = files_[level].size();
314     if (num_files == 0) continue;
315
316     // Get the list of files to search in this level
317     FileMetaData* const* files = &files_[level][0];
318     if (level == 0) {
319       // Level-0 files may overlap each other.  Find all files that
320       // overlap user_key and process them in order from newest to oldest.
321       tmp.reserve(num_files);
322       for (uint32_t i = 0; i < num_files; i++) {
323         FileMetaData* f = files[i];
324         if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
325             ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
326           tmp.push_back(f);
327         }
328       }
329       if (tmp.empty()) continue;
330
331       std::sort(tmp.begin(), tmp.end(), NewestFirst);
332       files = &tmp[0];
333       num_files = tmp.size();
334     } else {
335       // Binary search to find earliest index whose largest key >= ikey.
336       uint32_t index = FindFile(vset_->icmp_, files_[level], ikey);
337       if (index >= num_files) {
338         files = NULL;
339         num_files = 0;
340       } else {
341         tmp2 = files[index];
342         if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) {
343           // All of "tmp2" is past any data for user_key
344           files = NULL;
345           num_files = 0;
346         } else {
347           files = &tmp2;
348           num_files = 1;
349         }
350       }
351     }
352
353     for (uint32_t i = 0; i < num_files; ++i) {
354       if (last_file_read != NULL && stats->seek_file == NULL) {
355         // We have had more than one seek for this read.  Charge the 1st file.
356         stats->seek_file = last_file_read;
357         stats->seek_file_level = last_file_read_level;
358       }
359
360       FileMetaData* f = files[i];
361       last_file_read = f;
362       last_file_read_level = level;
363
364       Iterator* iter = vset_->table_cache_->NewIterator(
365           options,
366           f->number,
367           f->file_size);
368       iter->Seek(ikey);
369       const bool done = GetValue(ucmp, iter, user_key, value, &s);
370       if (!iter->status().ok()) {
371         s = iter->status();
372         delete iter;
373         return s;
374       } else {
375         delete iter;
376         if (done) {
377           return s;
378         }
379       }
380     }
381   }
382
383   return Status::NotFound(Slice());  // Use an empty error message for speed
384 }
385
386 bool Version::UpdateStats(const GetStats& stats) {
387   FileMetaData* f = stats.seek_file;
388   if (f != NULL) {
389     f->allowed_seeks--;
390     if (f->allowed_seeks <= 0 && file_to_compact_ == NULL) {
391       file_to_compact_ = f;
392       file_to_compact_level_ = stats.seek_file_level;
393       return true;
394     }
395   }
396   return false;
397 }
398
399 void Version::Ref() {
400   ++refs_;
401 }
402
403 void Version::Unref() {
404   assert(this != &vset_->dummy_versions_);
405   assert(refs_ >= 1);
406   --refs_;
407   if (refs_ == 0) {
408     delete this;
409   }
410 }
411
412 bool Version::OverlapInLevel(int level,
413                              const Slice* smallest_user_key,
414                              const Slice* largest_user_key) {
415   return SomeFileOverlapsRange(vset_->icmp_, (level > 0), files_[level],
416                                smallest_user_key, largest_user_key);
417 }
418
419 int Version::PickLevelForMemTableOutput(
420     const Slice& smallest_user_key,
421     const Slice& largest_user_key) {
422   int level = 0;
423   if (!OverlapInLevel(0, &smallest_user_key, &largest_user_key)) {
424     // Push to next level if there is no overlap in next level,
425     // and the #bytes overlapping in the level after that are limited.
426     InternalKey start(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek);
427     InternalKey limit(largest_user_key, 0, static_cast<ValueType>(0));
428     std::vector<FileMetaData*> overlaps;
429     while (level < config::kMaxMemCompactLevel) {
430       if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) {
431         break;
432       }
433       GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
434       const int64_t sum = TotalFileSize(overlaps);
435       if (sum > kMaxGrandParentOverlapBytes) {
436         break;
437       }
438       level++;
439     }
440   }
441   return level;
442 }
443
444 // Store in "*inputs" all files in "level" that overlap [begin,end]
445 void Version::GetOverlappingInputs(
446     int level,
447     const InternalKey* begin,
448     const InternalKey* end,
449     std::vector<FileMetaData*>* inputs) {
450   inputs->clear();
451   Slice user_begin, user_end;
452   if (begin != NULL) {
453     user_begin = begin->user_key();
454   }
455   if (end != NULL) {
456     user_end = end->user_key();
457   }
458   const Comparator* user_cmp = vset_->icmp_.user_comparator();
459   for (size_t i = 0; i < files_[level].size(); ) {
460     FileMetaData* f = files_[level][i++];
461     const Slice file_start = f->smallest.user_key();
462     const Slice file_limit = f->largest.user_key();
463     if (begin != NULL && user_cmp->Compare(file_limit, user_begin) < 0) {
464       // "f" is completely before specified range; skip it
465     } else if (end != NULL && user_cmp->Compare(file_start, user_end) > 0) {
466       // "f" is completely after specified range; skip it
467     } else {
468       inputs->push_back(f);
469       if (level == 0) {
470         // Level-0 files may overlap each other.  So check if the newly
471         // added file has expanded the range.  If so, restart search.
472         if (begin != NULL && user_cmp->Compare(file_start, user_begin) < 0) {
473           user_begin = file_start;
474           inputs->clear();
475           i = 0;
476         } else if (end != NULL && user_cmp->Compare(file_limit, user_end) > 0) {
477           user_end = file_limit;
478           inputs->clear();
479           i = 0;
480         }
481       }
482     }
483   }
484 }
485
486 std::string Version::DebugString() const {
487   std::string r;
488   for (int level = 0; level < config::kNumLevels; level++) {
489     // E.g.,
490     //   --- level 1 ---
491     //   17:123['a' .. 'd']
492     //   20:43['e' .. 'g']
493     r.append("--- level ");
494     AppendNumberTo(&r, level);
495     r.append(" ---\n");
496     const std::vector<FileMetaData*>& files = files_[level];
497     for (size_t i = 0; i < files.size(); i++) {
498       r.push_back(' ');
499       AppendNumberTo(&r, files[i]->number);
500       r.push_back(':');
501       AppendNumberTo(&r, files[i]->file_size);
502       r.append("[");
503       r.append(files[i]->smallest.DebugString());
504       r.append(" .. ");
505       r.append(files[i]->largest.DebugString());
506       r.append("]\n");
507     }
508   }
509   return r;
510 }
511
512 // A helper class so we can efficiently apply a whole sequence
513 // of edits to a particular state without creating intermediate
514 // Versions that contain full copies of the intermediate state.
515 class VersionSet::Builder {
516  private:
517   // Helper to sort by v->files_[file_number].smallest
518   struct BySmallestKey {
519     const InternalKeyComparator* internal_comparator;
520
521     bool operator()(FileMetaData* f1, FileMetaData* f2) const {
522       int r = internal_comparator->Compare(f1->smallest, f2->smallest);
523       if (r != 0) {
524         return (r < 0);
525       } else {
526         // Break ties by file number
527         return (f1->number < f2->number);
528       }
529     }
530   };
531
532   typedef std::set<FileMetaData*, BySmallestKey> FileSet;
533   struct LevelState {
534     std::set<uint64_t> deleted_files;
535     FileSet* added_files;
536   };
537
538   VersionSet* vset_;
539   Version* base_;
540   LevelState levels_[config::kNumLevels];
541
542  public:
543   // Initialize a builder with the files from *base and other info from *vset
544   Builder(VersionSet* vset, Version* base)
545       : vset_(vset),
546         base_(base) {
547     base_->Ref();
548     BySmallestKey cmp;
549     cmp.internal_comparator = &vset_->icmp_;
550     for (int level = 0; level < config::kNumLevels; level++) {
551       levels_[level].added_files = new FileSet(cmp);
552     }
553   }
554
555   ~Builder() {
556     for (int level = 0; level < config::kNumLevels; level++) {
557       const FileSet* added = levels_[level].added_files;
558       std::vector<FileMetaData*> to_unref;
559       to_unref.reserve(added->size());
560       for (FileSet::const_iterator it = added->begin();
561           it != added->end(); ++it) {
562         to_unref.push_back(*it);
563       }
564       delete added;
565       for (uint32_t i = 0; i < to_unref.size(); i++) {
566         FileMetaData* f = to_unref[i];
567         f->refs--;
568         if (f->refs <= 0) {
569           delete f;
570         }
571       }
572     }
573     base_->Unref();
574   }
575
576   // Apply all of the edits in *edit to the current state.
577   void Apply(VersionEdit* edit) {
578     // Update compaction pointers
579     for (size_t i = 0; i < edit->compact_pointers_.size(); i++) {
580       const int level = edit->compact_pointers_[i].first;
581       vset_->compact_pointer_[level] =
582           edit->compact_pointers_[i].second.Encode().ToString();
583     }
584
585     // Delete files
586     const VersionEdit::DeletedFileSet& del = edit->deleted_files_;
587     for (VersionEdit::DeletedFileSet::const_iterator iter = del.begin();
588          iter != del.end();
589          ++iter) {
590       const int level = iter->first;
591       const uint64_t number = iter->second;
592       levels_[level].deleted_files.insert(number);
593     }
594
595     // Add new files
596     for (size_t i = 0; i < edit->new_files_.size(); i++) {
597       const int level = edit->new_files_[i].first;
598       FileMetaData* f = new FileMetaData(edit->new_files_[i].second);
599       f->refs = 1;
600
601       // We arrange to automatically compact this file after
602       // a certain number of seeks.  Let's assume:
603       //   (1) One seek costs 10ms
604       //   (2) Writing or reading 1MB costs 10ms (100MB/s)
605       //   (3) A compaction of 1MB does 25MB of IO:
606       //         1MB read from this level
607       //         10-12MB read from next level (boundaries may be misaligned)
608       //         10-12MB written to next level
609       // This implies that 25 seeks cost the same as the compaction
610       // of 1MB of data.  I.e., one seek costs approximately the
611       // same as the compaction of 40KB of data.  We are a little
612       // conservative and allow approximately one seek for every 16KB
613       // of data before triggering a compaction.
614       f->allowed_seeks = (f->file_size / 16384);
615       if (f->allowed_seeks < 100) f->allowed_seeks = 100;
616
617       levels_[level].deleted_files.erase(f->number);
618       levels_[level].added_files->insert(f);
619     }
620   }
621
622   // Save the current state in *v.
623   void SaveTo(Version* v) {
624     BySmallestKey cmp;
625     cmp.internal_comparator = &vset_->icmp_;
626     for (int level = 0; level < config::kNumLevels; level++) {
627       // Merge the set of added files with the set of pre-existing files.
628       // Drop any deleted files.  Store the result in *v.
629       const std::vector<FileMetaData*>& base_files = base_->files_[level];
630       std::vector<FileMetaData*>::const_iterator base_iter = base_files.begin();
631       std::vector<FileMetaData*>::const_iterator base_end = base_files.end();
632       const FileSet* added = levels_[level].added_files;
633       v->files_[level].reserve(base_files.size() + added->size());
634       for (FileSet::const_iterator added_iter = added->begin();
635            added_iter != added->end();
636            ++added_iter) {
637         // Add all smaller files listed in base_
638         for (std::vector<FileMetaData*>::const_iterator bpos
639                  = std::upper_bound(base_iter, base_end, *added_iter, cmp);
640              base_iter != bpos;
641              ++base_iter) {
642           MaybeAddFile(v, level, *base_iter);
643         }
644
645         MaybeAddFile(v, level, *added_iter);
646       }
647
648       // Add remaining base files
649       for (; base_iter != base_end; ++base_iter) {
650         MaybeAddFile(v, level, *base_iter);
651       }
652
653 #ifndef NDEBUG
654       // Make sure there is no overlap in levels > 0
655       if (level > 0) {
656         for (uint32_t i = 1; i < v->files_[level].size(); i++) {
657           const InternalKey& prev_end = v->files_[level][i-1]->largest;
658           const InternalKey& this_begin = v->files_[level][i]->smallest;
659           if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) {
660             fprintf(stderr, "overlapping ranges in same level %s vs. %s\n",
661                     prev_end.DebugString().c_str(),
662                     this_begin.DebugString().c_str());
663             abort();
664           }
665         }
666       }
667 #endif
668     }
669   }
670
671   void MaybeAddFile(Version* v, int level, FileMetaData* f) {
672     if (levels_[level].deleted_files.count(f->number) > 0) {
673       // File is deleted: do nothing
674     } else {
675       std::vector<FileMetaData*>* files = &v->files_[level];
676       if (level > 0 && !files->empty()) {
677         // Must not overlap
678         assert(vset_->icmp_.Compare((*files)[files->size()-1]->largest,
679                                     f->smallest) < 0);
680       }
681       f->refs++;
682       files->push_back(f);
683     }
684   }
685 };
686
687 VersionSet::VersionSet(const std::string& dbname,
688                        const Options* options,
689                        TableCache* table_cache,
690                        const InternalKeyComparator* cmp)
691     : env_(options->env),
692       dbname_(dbname),
693       options_(options),
694       table_cache_(table_cache),
695       icmp_(*cmp),
696       next_file_number_(2),
697       manifest_file_number_(0),  // Filled by Recover()
698       last_sequence_(0),
699       log_number_(0),
700       prev_log_number_(0),
701       descriptor_file_(NULL),
702       descriptor_log_(NULL),
703       dummy_versions_(this),
704       current_(NULL) {
705   AppendVersion(new Version(this));
706 }
707
708 VersionSet::~VersionSet() {
709   current_->Unref();
710   assert(dummy_versions_.next_ == &dummy_versions_);  // List must be empty
711   delete descriptor_log_;
712   delete descriptor_file_;
713 }
714
715 void VersionSet::AppendVersion(Version* v) {
716   // Make "v" current
717   assert(v->refs_ == 0);
718   assert(v != current_);
719   if (current_ != NULL) {
720     current_->Unref();
721   }
722   current_ = v;
723   v->Ref();
724
725   // Append to linked list
726   v->prev_ = dummy_versions_.prev_;
727   v->next_ = &dummy_versions_;
728   v->prev_->next_ = v;
729   v->next_->prev_ = v;
730 }
731
732 Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
733   if (edit->has_log_number_) {
734     assert(edit->log_number_ >= log_number_);
735     assert(edit->log_number_ < next_file_number_);
736   } else {
737     edit->SetLogNumber(log_number_);
738   }
739
740   if (!edit->has_prev_log_number_) {
741     edit->SetPrevLogNumber(prev_log_number_);
742   }
743
744   edit->SetNextFile(next_file_number_);
745   edit->SetLastSequence(last_sequence_);
746
747   Version* v = new Version(this);
748   {
749     Builder builder(this, current_);
750     builder.Apply(edit);
751     builder.SaveTo(v);
752   }
753   Finalize(v);
754
755   // Initialize new descriptor log file if necessary by creating
756   // a temporary file that contains a snapshot of the current version.
757   std::string new_manifest_file;
758   Status s;
759   if (descriptor_log_ == NULL) {
760     // No reason to unlock *mu here since we only hit this path in the
761     // first call to LogAndApply (when opening the database).
762     assert(descriptor_file_ == NULL);
763     new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
764     edit->SetNextFile(next_file_number_);
765     s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
766     if (s.ok()) {
767       descriptor_log_ = new log::Writer(descriptor_file_);
768       s = WriteSnapshot(descriptor_log_);
769     }
770   }
771
772   // Unlock during expensive MANIFEST log write
773   {
774     mu->Unlock();
775
776     // Write new record to MANIFEST log
777     if (s.ok()) {
778       std::string record;
779       edit->EncodeTo(&record);
780       s = descriptor_log_->AddRecord(record);
781       if (s.ok()) {
782         s = descriptor_file_->Sync();
783       }
784     }
785
786     // If we just created a new descriptor file, install it by writing a
787     // new CURRENT file that points to it.
788     if (s.ok() && !new_manifest_file.empty()) {
789       s = SetCurrentFile(env_, dbname_, manifest_file_number_);
790     }
791
792     mu->Lock();
793   }
794
795   // Install the new version
796   if (s.ok()) {
797     AppendVersion(v);
798     log_number_ = edit->log_number_;
799     prev_log_number_ = edit->prev_log_number_;
800   } else {
801     delete v;
802     if (!new_manifest_file.empty()) {
803       delete descriptor_log_;
804       delete descriptor_file_;
805       descriptor_log_ = NULL;
806       descriptor_file_ = NULL;
807       env_->DeleteFile(new_manifest_file);
808     }
809   }
810
811   return s;
812 }
813
814 Status VersionSet::Recover() {
815   struct LogReporter : public log::Reader::Reporter {
816     Status* status;
817     virtual void Corruption(size_t bytes, const Status& s) {
818       if (this->status->ok()) *this->status = s;
819     }
820   };
821
822   // Read "CURRENT" file, which contains a pointer to the current manifest file
823   std::string current;
824   Status s = ReadFileToString(env_, CurrentFileName(dbname_), &current);
825   if (!s.ok()) {
826     return s;
827   }
828   if (current.empty() || current[current.size()-1] != '\n') {
829     return Status::Corruption("CURRENT file does not end with newline");
830   }
831   current.resize(current.size() - 1);
832
833   std::string dscname = dbname_ + "/" + current;
834   SequentialFile* file;
835   s = env_->NewSequentialFile(dscname, &file);
836   if (!s.ok()) {
837     return s;
838   }
839
840   bool have_log_number = false;
841   bool have_prev_log_number = false;
842   bool have_next_file = false;
843   bool have_last_sequence = false;
844   uint64_t next_file = 0;
845   uint64_t last_sequence = 0;
846   uint64_t log_number = 0;
847   uint64_t prev_log_number = 0;
848   Builder builder(this, current_);
849
850   {
851     LogReporter reporter;
852     reporter.status = &s;
853     log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/);
854     Slice record;
855     std::string scratch;
856     while (reader.ReadRecord(&record, &scratch) && s.ok()) {
857       VersionEdit edit;
858       s = edit.DecodeFrom(record);
859       if (s.ok()) {
860         if (edit.has_comparator_ &&
861             edit.comparator_ != icmp_.user_comparator()->Name()) {
862           s = Status::InvalidArgument(
863               edit.comparator_ + "does not match existing comparator ",
864               icmp_.user_comparator()->Name());
865         }
866       }
867
868       if (s.ok()) {
869         builder.Apply(&edit);
870       }
871
872       if (edit.has_log_number_) {
873         log_number = edit.log_number_;
874         have_log_number = true;
875       }
876
877       if (edit.has_prev_log_number_) {
878         prev_log_number = edit.prev_log_number_;
879         have_prev_log_number = true;
880       }
881
882       if (edit.has_next_file_number_) {
883         next_file = edit.next_file_number_;
884         have_next_file = true;
885       }
886
887       if (edit.has_last_sequence_) {
888         last_sequence = edit.last_sequence_;
889         have_last_sequence = true;
890       }
891     }
892   }
893   delete file;
894   file = NULL;
895
896   if (s.ok()) {
897     if (!have_next_file) {
898       s = Status::Corruption("no meta-nextfile entry in descriptor");
899     } else if (!have_log_number) {
900       s = Status::Corruption("no meta-lognumber entry in descriptor");
901     } else if (!have_last_sequence) {
902       s = Status::Corruption("no last-sequence-number entry in descriptor");
903     }
904
905     if (!have_prev_log_number) {
906       prev_log_number = 0;
907     }
908
909     MarkFileNumberUsed(prev_log_number);
910     MarkFileNumberUsed(log_number);
911   }
912
913   if (s.ok()) {
914     Version* v = new Version(this);
915     builder.SaveTo(v);
916     // Install recovered version
917     Finalize(v);
918     AppendVersion(v);
919     manifest_file_number_ = next_file;
920     next_file_number_ = next_file + 1;
921     last_sequence_ = last_sequence;
922     log_number_ = log_number;
923     prev_log_number_ = prev_log_number;
924   }
925
926   return s;
927 }
928
929 void VersionSet::MarkFileNumberUsed(uint64_t number) {
930   if (next_file_number_ <= number) {
931     next_file_number_ = number + 1;
932   }
933 }
934
935 void VersionSet::Finalize(Version* v) {
936   // Precomputed best level for next compaction
937   int best_level = -1;
938   double best_score = -1;
939
940   for (int level = 0; level < config::kNumLevels-1; level++) {
941     double score;
942     if (level == 0) {
943       // We treat level-0 specially by bounding the number of files
944       // instead of number of bytes for two reasons:
945       //
946       // (1) With larger write-buffer sizes, it is nice not to do too
947       // many level-0 compactions.
948       //
949       // (2) The files in level-0 are merged on every read and
950       // therefore we wish to avoid too many files when the individual
951       // file size is small (perhaps because of a small write-buffer
952       // setting, or very high compression ratios, or lots of
953       // overwrites/deletions).
954       score = v->files_[level].size() /
955           static_cast<double>(config::kL0_CompactionTrigger);
956     } else {
957       // Compute the ratio of current size to size limit.
958       const uint64_t level_bytes = TotalFileSize(v->files_[level]);
959       score = static_cast<double>(level_bytes) / MaxBytesForLevel(level);
960     }
961
962     if (score > best_score) {
963       best_level = level;
964       best_score = score;
965     }
966   }
967
968   v->compaction_level_ = best_level;
969   v->compaction_score_ = best_score;
970 }
971
972 Status VersionSet::WriteSnapshot(log::Writer* log) {
973   // TODO: Break up into multiple records to reduce memory usage on recovery?
974
975   // Save metadata
976   VersionEdit edit;
977   edit.SetComparatorName(icmp_.user_comparator()->Name());
978
979   // Save compaction pointers
980   for (int level = 0; level < config::kNumLevels; level++) {
981     if (!compact_pointer_[level].empty()) {
982       InternalKey key;
983       key.DecodeFrom(compact_pointer_[level]);
984       edit.SetCompactPointer(level, key);
985     }
986   }
987
988   // Save files
989   for (int level = 0; level < config::kNumLevels; level++) {
990     const std::vector<FileMetaData*>& files = current_->files_[level];
991     for (size_t i = 0; i < files.size(); i++) {
992       const FileMetaData* f = files[i];
993       edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest);
994     }
995   }
996
997   std::string record;
998   edit.EncodeTo(&record);
999   return log->AddRecord(record);
1000 }
1001
1002 int VersionSet::NumLevelFiles(int level) const {
1003   assert(level >= 0);
1004   assert(level < config::kNumLevels);
1005   return current_->files_[level].size();
1006 }
1007
1008 const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const {
1009   // Update code if kNumLevels changes
1010   assert(config::kNumLevels == 7);
1011   snprintf(scratch->buffer, sizeof(scratch->buffer),
1012            "files[ %d %d %d %d %d %d %d ]",
1013            int(current_->files_[0].size()),
1014            int(current_->files_[1].size()),
1015            int(current_->files_[2].size()),
1016            int(current_->files_[3].size()),
1017            int(current_->files_[4].size()),
1018            int(current_->files_[5].size()),
1019            int(current_->files_[6].size()));
1020   return scratch->buffer;
1021 }
1022
1023 uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
1024   uint64_t result = 0;
1025   for (int level = 0; level < config::kNumLevels; level++) {
1026     const std::vector<FileMetaData*>& files = v->files_[level];
1027     for (size_t i = 0; i < files.size(); i++) {
1028       if (icmp_.Compare(files[i]->largest, ikey) <= 0) {
1029         // Entire file is before "ikey", so just add the file size
1030         result += files[i]->file_size;
1031       } else if (icmp_.Compare(files[i]->smallest, ikey) > 0) {
1032         // Entire file is after "ikey", so ignore
1033         if (level > 0) {
1034           // Files other than level 0 are sorted by meta->smallest, so
1035           // no further files in this level will contain data for
1036           // "ikey".
1037           break;
1038         }
1039       } else {
1040         // "ikey" falls in the range for this table.  Add the
1041         // approximate offset of "ikey" within the table.
1042         Table* tableptr;
1043         Iterator* iter = table_cache_->NewIterator(
1044             ReadOptions(), files[i]->number, files[i]->file_size, &tableptr);
1045         if (tableptr != NULL) {
1046           result += tableptr->ApproximateOffsetOf(ikey.Encode());
1047         }
1048         delete iter;
1049       }
1050     }
1051   }
1052   return result;
1053 }
1054
1055 void VersionSet::AddLiveFiles(std::set<uint64_t>* live) {
1056   for (Version* v = dummy_versions_.next_;
1057        v != &dummy_versions_;
1058        v = v->next_) {
1059     for (int level = 0; level < config::kNumLevels; level++) {
1060       const std::vector<FileMetaData*>& files = v->files_[level];
1061       for (size_t i = 0; i < files.size(); i++) {
1062         live->insert(files[i]->number);
1063       }
1064     }
1065   }
1066 }
1067
1068 int64_t VersionSet::NumLevelBytes(int level) const {
1069   assert(level >= 0);
1070   assert(level < config::kNumLevels);
1071   return TotalFileSize(current_->files_[level]);
1072 }
1073
1074 int64_t VersionSet::MaxNextLevelOverlappingBytes() {
1075   int64_t result = 0;
1076   std::vector<FileMetaData*> overlaps;
1077   for (int level = 1; level < config::kNumLevels - 1; level++) {
1078     for (size_t i = 0; i < current_->files_[level].size(); i++) {
1079       const FileMetaData* f = current_->files_[level][i];
1080       current_->GetOverlappingInputs(level+1, &f->smallest, &f->largest,
1081                                      &overlaps);
1082       const int64_t sum = TotalFileSize(overlaps);
1083       if (sum > result) {
1084         result = sum;
1085       }
1086     }
1087   }
1088   return result;
1089 }
1090
1091 // Stores the minimal range that covers all entries in inputs in
1092 // *smallest, *largest.
1093 // REQUIRES: inputs is not empty
1094 void VersionSet::GetRange(const std::vector<FileMetaData*>& inputs,
1095                           InternalKey* smallest,
1096                           InternalKey* largest) {
1097   assert(!inputs.empty());
1098   smallest->Clear();
1099   largest->Clear();
1100   for (size_t i = 0; i < inputs.size(); i++) {
1101     FileMetaData* f = inputs[i];
1102     if (i == 0) {
1103       *smallest = f->smallest;
1104       *largest = f->largest;
1105     } else {
1106       if (icmp_.Compare(f->smallest, *smallest) < 0) {
1107         *smallest = f->smallest;
1108       }
1109       if (icmp_.Compare(f->largest, *largest) > 0) {
1110         *largest = f->largest;
1111       }
1112     }
1113   }
1114 }
1115
1116 // Stores the minimal range that covers all entries in inputs1 and inputs2
1117 // in *smallest, *largest.
1118 // REQUIRES: inputs is not empty
1119 void VersionSet::GetRange2(const std::vector<FileMetaData*>& inputs1,
1120                            const std::vector<FileMetaData*>& inputs2,
1121                            InternalKey* smallest,
1122                            InternalKey* largest) {
1123   std::vector<FileMetaData*> all = inputs1;
1124   all.insert(all.end(), inputs2.begin(), inputs2.end());
1125   GetRange(all, smallest, largest);
1126 }
1127
1128 Iterator* VersionSet::MakeInputIterator(Compaction* c) {
1129   ReadOptions options;
1130   options.verify_checksums = options_->paranoid_checks;
1131   options.fill_cache = false;
1132
1133   // Level-0 files have to be merged together.  For other levels,
1134   // we will make a concatenating iterator per level.
1135   // TODO(opt): use concatenating iterator for level-0 if there is no overlap
1136   const int space = (c->level() == 0 ? c->inputs_[0].size() + 1 : 2);
1137   Iterator** list = new Iterator*[space];
1138   int num = 0;
1139   for (int which = 0; which < 2; which++) {
1140     if (!c->inputs_[which].empty()) {
1141       if (c->level() + which == 0) {
1142         const std::vector<FileMetaData*>& files = c->inputs_[which];
1143         for (size_t i = 0; i < files.size(); i++) {
1144           list[num++] = table_cache_->NewIterator(
1145               options, files[i]->number, files[i]->file_size);
1146         }
1147       } else {
1148         // Create concatenating iterator for the files from this level
1149         list[num++] = NewTwoLevelIterator(
1150             new Version::LevelFileNumIterator(icmp_, &c->inputs_[which]),
1151             &GetFileIterator, table_cache_, options);
1152       }
1153     }
1154   }
1155   assert(num <= space);
1156   Iterator* result = NewMergingIterator(&icmp_, list, num);
1157   delete[] list;
1158   return result;
1159 }
1160
1161 Compaction* VersionSet::PickCompaction() {
1162   Compaction* c;
1163   int level;
1164
1165   // We prefer compactions triggered by too much data in a level over
1166   // the compactions triggered by seeks.
1167   const bool size_compaction = (current_->compaction_score_ >= 1);
1168   const bool seek_compaction = (current_->file_to_compact_ != NULL);
1169   if (size_compaction) {
1170     level = current_->compaction_level_;
1171     assert(level >= 0);
1172     assert(level+1 < config::kNumLevels);
1173     c = new Compaction(level);
1174
1175     // Pick the first file that comes after compact_pointer_[level]
1176     for (size_t i = 0; i < current_->files_[level].size(); i++) {
1177       FileMetaData* f = current_->files_[level][i];
1178       if (compact_pointer_[level].empty() ||
1179           icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
1180         c->inputs_[0].push_back(f);
1181         break;
1182       }
1183     }
1184     if (c->inputs_[0].empty()) {
1185       // Wrap-around to the beginning of the key space
1186       c->inputs_[0].push_back(current_->files_[level][0]);
1187     }
1188   } else if (seek_compaction) {
1189     level = current_->file_to_compact_level_;
1190     c = new Compaction(level);
1191     c->inputs_[0].push_back(current_->file_to_compact_);
1192   } else {
1193     return NULL;
1194   }
1195
1196   c->input_version_ = current_;
1197   c->input_version_->Ref();
1198
1199   // Files in level 0 may overlap each other, so pick up all overlapping ones
1200   if (level == 0) {
1201     InternalKey smallest, largest;
1202     GetRange(c->inputs_[0], &smallest, &largest);
1203     // Note that the next call will discard the file we placed in
1204     // c->inputs_[0] earlier and replace it with an overlapping set
1205     // which will include the picked file.
1206     current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
1207     assert(!c->inputs_[0].empty());
1208   }
1209
1210   SetupOtherInputs(c);
1211
1212   return c;
1213 }
1214
1215 void VersionSet::SetupOtherInputs(Compaction* c) {
1216   const int level = c->level();
1217   InternalKey smallest, largest;
1218   GetRange(c->inputs_[0], &smallest, &largest);
1219
1220   current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1]);
1221
1222   // Get entire range covered by compaction
1223   InternalKey all_start, all_limit;
1224   GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
1225
1226   // See if we can grow the number of inputs in "level" without
1227   // changing the number of "level+1" files we pick up.
1228   if (!c->inputs_[1].empty()) {
1229     std::vector<FileMetaData*> expanded0;
1230     current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0);
1231     const int64_t inputs0_size = TotalFileSize(c->inputs_[0]);
1232     const int64_t inputs1_size = TotalFileSize(c->inputs_[1]);
1233     const int64_t expanded0_size = TotalFileSize(expanded0);
1234     if (expanded0.size() > c->inputs_[0].size() &&
1235         inputs1_size + expanded0_size < kExpandedCompactionByteSizeLimit) {
1236       InternalKey new_start, new_limit;
1237       GetRange(expanded0, &new_start, &new_limit);
1238       std::vector<FileMetaData*> expanded1;
1239       current_->GetOverlappingInputs(level+1, &new_start, &new_limit,
1240                                      &expanded1);
1241       if (expanded1.size() == c->inputs_[1].size()) {
1242         Log(options_->info_log,
1243             "Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n",
1244             level,
1245             int(c->inputs_[0].size()),
1246             int(c->inputs_[1].size()),
1247             long(inputs0_size), long(inputs1_size),
1248             int(expanded0.size()),
1249             int(expanded1.size()),
1250             long(expanded0_size), long(inputs1_size));
1251         smallest = new_start;
1252         largest = new_limit;
1253         c->inputs_[0] = expanded0;
1254         c->inputs_[1] = expanded1;
1255         GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
1256       }
1257     }
1258   }
1259
1260   // Compute the set of grandparent files that overlap this compaction
1261   // (parent == level+1; grandparent == level+2)
1262   if (level + 2 < config::kNumLevels) {
1263     current_->GetOverlappingInputs(level + 2, &all_start, &all_limit,
1264                                    &c->grandparents_);
1265   }
1266
1267   if (false) {
1268     Log(options_->info_log, "Compacting %d '%s' .. '%s'",
1269         level,
1270         smallest.DebugString().c_str(),
1271         largest.DebugString().c_str());
1272   }
1273
1274   // Update the place where we will do the next compaction for this level.
1275   // We update this immediately instead of waiting for the VersionEdit
1276   // to be applied so that if the compaction fails, we will try a different
1277   // key range next time.
1278   compact_pointer_[level] = largest.Encode().ToString();
1279   c->edit_.SetCompactPointer(level, largest);
1280 }
1281
1282 Compaction* VersionSet::CompactRange(
1283     int level,
1284     const InternalKey* begin,
1285     const InternalKey* end) {
1286   std::vector<FileMetaData*> inputs;
1287   current_->GetOverlappingInputs(level, begin, end, &inputs);
1288   if (inputs.empty()) {
1289     return NULL;
1290   }
1291
1292   // Avoid compacting too much in one shot in case the range is large.
1293   const uint64_t limit = MaxFileSizeForLevel(level);
1294   uint64_t total = 0;
1295   for (int i = 0; i < inputs.size(); i++) {
1296     uint64_t s = inputs[i]->file_size;
1297     total += s;
1298     if (total >= limit) {
1299       inputs.resize(i + 1);
1300       break;
1301     }
1302   }
1303
1304   Compaction* c = new Compaction(level);
1305   c->input_version_ = current_;
1306   c->input_version_->Ref();
1307   c->inputs_[0] = inputs;
1308   SetupOtherInputs(c);
1309   return c;
1310 }
1311
1312 Compaction::Compaction(int level)
1313     : level_(level),
1314       max_output_file_size_(MaxFileSizeForLevel(level)),
1315       input_version_(NULL),
1316       grandparent_index_(0),
1317       seen_key_(false),
1318       overlapped_bytes_(0) {
1319   for (int i = 0; i < config::kNumLevels; i++) {
1320     level_ptrs_[i] = 0;
1321   }
1322 }
1323
1324 Compaction::~Compaction() {
1325   if (input_version_ != NULL) {
1326     input_version_->Unref();
1327   }
1328 }
1329
1330 bool Compaction::IsTrivialMove() const {
1331   // Avoid a move if there is lots of overlapping grandparent data.
1332   // Otherwise, the move could create a parent file that will require
1333   // a very expensive merge later on.
1334   return (num_input_files(0) == 1 &&
1335           num_input_files(1) == 0 &&
1336           TotalFileSize(grandparents_) <= kMaxGrandParentOverlapBytes);
1337 }
1338
1339 void Compaction::AddInputDeletions(VersionEdit* edit) {
1340   for (int which = 0; which < 2; which++) {
1341     for (size_t i = 0; i < inputs_[which].size(); i++) {
1342       edit->DeleteFile(level_ + which, inputs_[which][i]->number);
1343     }
1344   }
1345 }
1346
1347 bool Compaction::IsBaseLevelForKey(const Slice& user_key) {
1348   // Maybe use binary search to find right entry instead of linear search?
1349   const Comparator* user_cmp = input_version_->vset_->icmp_.user_comparator();
1350   for (int lvl = level_ + 2; lvl < config::kNumLevels; lvl++) {
1351     const std::vector<FileMetaData*>& files = input_version_->files_[lvl];
1352     for (; level_ptrs_[lvl] < files.size(); ) {
1353       FileMetaData* f = files[level_ptrs_[lvl]];
1354       if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) {
1355         // We've advanced far enough
1356         if (user_cmp->Compare(user_key, f->smallest.user_key()) >= 0) {
1357           // Key falls in this file's range, so definitely not base level
1358           return false;
1359         }
1360         break;
1361       }
1362       level_ptrs_[lvl]++;
1363     }
1364   }
1365   return true;
1366 }
1367
1368 bool Compaction::ShouldStopBefore(const Slice& internal_key) {
1369   // Scan to find earliest grandparent file that contains key.
1370   const InternalKeyComparator* icmp = &input_version_->vset_->icmp_;
1371   while (grandparent_index_ < grandparents_.size() &&
1372       icmp->Compare(internal_key,
1373                     grandparents_[grandparent_index_]->largest.Encode()) > 0) {
1374     if (seen_key_) {
1375       overlapped_bytes_ += grandparents_[grandparent_index_]->file_size;
1376     }
1377     grandparent_index_++;
1378   }
1379   seen_key_ = true;
1380
1381   if (overlapped_bytes_ > kMaxGrandParentOverlapBytes) {
1382     // Too much overlap for current output; start new output
1383     overlapped_bytes_ = 0;
1384     return true;
1385   } else {
1386     return false;
1387   }
1388 }
1389
1390 void Compaction::ReleaseInputs() {
1391   if (input_version_ != NULL) {
1392     input_version_->Unref();
1393     input_version_ = NULL;
1394   }
1395 }
1396
1397 }  // namespace leveldb