tizen beta release
[platform/upstream/leveldb.git] / db / repair.cc
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 //
5 // We recover the contents of the descriptor from the other files we find.
6 // (1) Any log files are first converted to tables
7 // (2) We scan every table to compute
8 //     (a) smallest/largest for the table
9 //     (b) largest sequence number in the table
10 // (3) We generate descriptor contents:
11 //      - log number is set to zero
12 //      - next-file-number is set to 1 + largest file number we found
13 //      - last-sequence-number is set to largest sequence# found across
14 //        all tables (see 2c)
15 //      - compaction pointers are cleared
16 //      - every table file is added at level 0
17 //
18 // Possible optimization 1:
19 //   (a) Compute total size and use to pick appropriate max-level M
20 //   (b) Sort tables by largest sequence# in the table
21 //   (c) For each table: if it overlaps earlier table, place in level-0,
22 //       else place in level-M.
23 // Possible optimization 2:
24 //   Store per-table metadata (smallest, largest, largest-seq#, ...)
25 //   in the table's meta section to speed up ScanTable.
26
27 #include "db/builder.h"
28 #include "db/db_impl.h"
29 #include "db/dbformat.h"
30 #include "db/filename.h"
31 #include "db/log_reader.h"
32 #include "db/log_writer.h"
33 #include "db/memtable.h"
34 #include "db/table_cache.h"
35 #include "db/version_edit.h"
36 #include "db/write_batch_internal.h"
37 #include "leveldb/comparator.h"
38 #include "leveldb/db.h"
39 #include "leveldb/env.h"
40
41 namespace leveldb {
42
43 namespace {
44
45 class Repairer {
46  public:
47   Repairer(const std::string& dbname, const Options& options)
48       : dbname_(dbname),
49         env_(options.env),
50         icmp_(options.comparator),
51         options_(SanitizeOptions(dbname, &icmp_, options)),
52         owns_info_log_(options_.info_log != options.info_log),
53         owns_cache_(options_.block_cache != options.block_cache),
54         next_file_number_(1) {
55     // TableCache can be small since we expect each table to be opened once.
56     table_cache_ = new TableCache(dbname_, &options_, 10);
57   }
58
59   ~Repairer() {
60     delete table_cache_;
61     if (owns_info_log_) {
62       delete options_.info_log;
63     }
64     if (owns_cache_) {
65       delete options_.block_cache;
66     }
67   }
68
69   Status Run() {
70     Status status = FindFiles();
71     if (status.ok()) {
72       ConvertLogFilesToTables();
73       ExtractMetaData();
74       status = WriteDescriptor();
75     }
76     if (status.ok()) {
77       unsigned long long bytes = 0;
78       for (size_t i = 0; i < tables_.size(); i++) {
79         bytes += tables_[i].meta.file_size;
80       }
81       Log(options_.info_log,
82           "**** Repaired leveldb %s; "
83           "recovered %d files; %llu bytes. "
84           "Some data may have been lost. "
85           "****",
86           dbname_.c_str(),
87           static_cast<int>(tables_.size()),
88           bytes);
89     }
90     return status;
91   }
92
93  private:
94   struct TableInfo {
95     FileMetaData meta;
96     SequenceNumber max_sequence;
97   };
98
99   std::string const dbname_;
100   Env* const env_;
101   InternalKeyComparator const icmp_;
102   Options const options_;
103   bool owns_info_log_;
104   bool owns_cache_;
105   TableCache* table_cache_;
106   VersionEdit edit_;
107
108   std::vector<std::string> manifests_;
109   std::vector<uint64_t> table_numbers_;
110   std::vector<uint64_t> logs_;
111   std::vector<TableInfo> tables_;
112   uint64_t next_file_number_;
113
114   Status FindFiles() {
115     std::vector<std::string> filenames;
116     Status status = env_->GetChildren(dbname_, &filenames);
117     if (!status.ok()) {
118       return status;
119     }
120     if (filenames.empty()) {
121       return Status::IOError(dbname_, "repair found no files");
122     }
123
124     uint64_t number;
125     FileType type;
126     for (size_t i = 0; i < filenames.size(); i++) {
127       if (ParseFileName(filenames[i], &number, &type)) {
128         if (type == kDescriptorFile) {
129           manifests_.push_back(filenames[i]);
130         } else {
131           if (number + 1 > next_file_number_) {
132             next_file_number_ = number + 1;
133           }
134           if (type == kLogFile) {
135             logs_.push_back(number);
136           } else if (type == kTableFile) {
137             table_numbers_.push_back(number);
138           } else {
139             // Ignore other files
140           }
141         }
142       }
143     }
144     return status;
145   }
146
147   void ConvertLogFilesToTables() {
148     for (size_t i = 0; i < logs_.size(); i++) {
149       std::string logname = LogFileName(dbname_, logs_[i]);
150       Status status = ConvertLogToTable(logs_[i]);
151       if (!status.ok()) {
152         Log(options_.info_log, "Log #%llu: ignoring conversion error: %s",
153             (unsigned long long) logs_[i],
154             status.ToString().c_str());
155       }
156       ArchiveFile(logname);
157     }
158   }
159
160   Status ConvertLogToTable(uint64_t log) {
161     struct LogReporter : public log::Reader::Reporter {
162       Env* env;
163       Logger* info_log;
164       uint64_t lognum;
165       virtual void Corruption(size_t bytes, const Status& s) {
166         // We print error messages for corruption, but continue repairing.
167         Log(info_log, "Log #%llu: dropping %d bytes; %s",
168             (unsigned long long) lognum,
169             static_cast<int>(bytes),
170             s.ToString().c_str());
171       }
172     };
173
174     // Open the log file
175     std::string logname = LogFileName(dbname_, log);
176     SequentialFile* lfile;
177     Status status = env_->NewSequentialFile(logname, &lfile);
178     if (!status.ok()) {
179       return status;
180     }
181
182     // Create the log reader.
183     LogReporter reporter;
184     reporter.env = env_;
185     reporter.info_log = options_.info_log;
186     reporter.lognum = log;
187     // We intentially make log::Reader do checksumming so that
188     // corruptions cause entire commits to be skipped instead of
189     // propagating bad information (like overly large sequence
190     // numbers).
191     log::Reader reader(lfile, &reporter, false/*do not checksum*/,
192                        0/*initial_offset*/);
193
194     // Read all the records and add to a memtable
195     std::string scratch;
196     Slice record;
197     WriteBatch batch;
198     MemTable* mem = new MemTable(icmp_);
199     mem->Ref();
200     int counter = 0;
201     while (reader.ReadRecord(&record, &scratch)) {
202       if (record.size() < 12) {
203         reporter.Corruption(
204             record.size(), Status::Corruption("log record too small"));
205         continue;
206       }
207       WriteBatchInternal::SetContents(&batch, record);
208       status = WriteBatchInternal::InsertInto(&batch, mem);
209       if (status.ok()) {
210         counter += WriteBatchInternal::Count(&batch);
211       } else {
212         Log(options_.info_log, "Log #%llu: ignoring %s",
213             (unsigned long long) log,
214             status.ToString().c_str());
215         status = Status::OK();  // Keep going with rest of file
216       }
217     }
218     delete lfile;
219
220     // Do not record a version edit for this conversion to a Table
221     // since ExtractMetaData() will also generate edits.
222     FileMetaData meta;
223     meta.number = next_file_number_++;
224     Iterator* iter = mem->NewIterator();
225     status = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
226     delete iter;
227     mem->Unref();
228     mem = NULL;
229     if (status.ok()) {
230       if (meta.file_size > 0) {
231         table_numbers_.push_back(meta.number);
232       }
233     }
234     Log(options_.info_log, "Log #%llu: %d ops saved to Table #%llu %s",
235         (unsigned long long) log,
236         counter,
237         (unsigned long long) meta.number,
238         status.ToString().c_str());
239     return status;
240   }
241
242   void ExtractMetaData() {
243     std::vector<TableInfo> kept;
244     for (size_t i = 0; i < table_numbers_.size(); i++) {
245       TableInfo t;
246       t.meta.number = table_numbers_[i];
247       Status status = ScanTable(&t);
248       if (!status.ok()) {
249         std::string fname = TableFileName(dbname_, table_numbers_[i]);
250         Log(options_.info_log, "Table #%llu: ignoring %s",
251             (unsigned long long) table_numbers_[i],
252             status.ToString().c_str());
253         ArchiveFile(fname);
254       } else {
255         tables_.push_back(t);
256       }
257     }
258   }
259
260   Status ScanTable(TableInfo* t) {
261     std::string fname = TableFileName(dbname_, t->meta.number);
262     int counter = 0;
263     Status status = env_->GetFileSize(fname, &t->meta.file_size);
264     if (status.ok()) {
265       Iterator* iter = table_cache_->NewIterator(
266           ReadOptions(), t->meta.number, t->meta.file_size);
267       bool empty = true;
268       ParsedInternalKey parsed;
269       t->max_sequence = 0;
270       for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
271         Slice key = iter->key();
272         if (!ParseInternalKey(key, &parsed)) {
273           Log(options_.info_log, "Table #%llu: unparsable key %s",
274               (unsigned long long) t->meta.number,
275               EscapeString(key).c_str());
276           continue;
277         }
278
279         counter++;
280         if (empty) {
281           empty = false;
282           t->meta.smallest.DecodeFrom(key);
283         }
284         t->meta.largest.DecodeFrom(key);
285         if (parsed.sequence > t->max_sequence) {
286           t->max_sequence = parsed.sequence;
287         }
288       }
289       if (!iter->status().ok()) {
290         status = iter->status();
291       }
292       delete iter;
293     }
294     Log(options_.info_log, "Table #%llu: %d entries %s",
295         (unsigned long long) t->meta.number,
296         counter,
297         status.ToString().c_str());
298     return status;
299   }
300
301   Status WriteDescriptor() {
302     std::string tmp = TempFileName(dbname_, 1);
303     WritableFile* file;
304     Status status = env_->NewWritableFile(tmp, &file);
305     if (!status.ok()) {
306       return status;
307     }
308
309     SequenceNumber max_sequence = 0;
310     for (size_t i = 0; i < tables_.size(); i++) {
311       if (max_sequence < tables_[i].max_sequence) {
312         max_sequence = tables_[i].max_sequence;
313       }
314     }
315
316     edit_.SetComparatorName(icmp_.user_comparator()->Name());
317     edit_.SetLogNumber(0);
318     edit_.SetNextFile(next_file_number_);
319     edit_.SetLastSequence(max_sequence);
320
321     for (size_t i = 0; i < tables_.size(); i++) {
322       // TODO(opt): separate out into multiple levels
323       const TableInfo& t = tables_[i];
324       edit_.AddFile(0, t.meta.number, t.meta.file_size,
325                     t.meta.smallest, t.meta.largest);
326     }
327
328     //fprintf(stderr, "NewDescriptor:\n%s\n", edit_.DebugString().c_str());
329     {
330       log::Writer log(file);
331       std::string record;
332       edit_.EncodeTo(&record);
333       status = log.AddRecord(record);
334     }
335     if (status.ok()) {
336       status = file->Close();
337     }
338     delete file;
339     file = NULL;
340
341     if (!status.ok()) {
342       env_->DeleteFile(tmp);
343     } else {
344       // Discard older manifests
345       for (size_t i = 0; i < manifests_.size(); i++) {
346         ArchiveFile(dbname_ + "/" + manifests_[i]);
347       }
348
349       // Install new manifest
350       status = env_->RenameFile(tmp, DescriptorFileName(dbname_, 1));
351       if (status.ok()) {
352         status = SetCurrentFile(env_, dbname_, 1);
353       } else {
354         env_->DeleteFile(tmp);
355       }
356     }
357     return status;
358   }
359
360   void ArchiveFile(const std::string& fname) {
361     // Move into another directory.  E.g., for
362     //    dir/foo
363     // rename to
364     //    dir/lost/foo
365     const char* slash = strrchr(fname.c_str(), '/');
366     std::string new_dir;
367     if (slash != NULL) {
368       new_dir.assign(fname.data(), slash - fname.data());
369     }
370     new_dir.append("/lost");
371     env_->CreateDir(new_dir);  // Ignore error
372     std::string new_file = new_dir;
373     new_file.append("/");
374     new_file.append((slash == NULL) ? fname.c_str() : slash + 1);
375     Status s = env_->RenameFile(fname, new_file);
376     Log(options_.info_log, "Archiving %s: %s\n",
377         fname.c_str(), s.ToString().c_str());
378   }
379 };
380 }  // namespace
381
382 Status RepairDB(const std::string& dbname, const Options& options) {
383   Repairer repairer(dbname, options);
384   return repairer.Run();
385 }
386
387 }  // namespace leveldb