Skip to content

Commit

Permalink
LevelDB now attempts to reuse the preceding MANIFEST and log file whe…
Browse files Browse the repository at this point in the history
…n re-opened.

(Based on a suggestion by cmumford.)

"open" benchmark on my workstation speeds up significantly since we
can now avoid three fdatasync calls and a compaction per open:

  Before: ~80000 microseconds
  After:    ~130 microseconds

Details:

(1) Added Options::reuse_logs (currently defaults to false) to control
new behavior.  The intention is to change the default to true after some
baking.

(2) Added Env::NewAppendableFile() whose default implementation returns
a not-supported error.

(3) VersionSet::Recovery attempts to reuse the MANIFEST from which
it is recovering.

(4) DBImpl recovery attempts to reuse the last log file and memtable.

(5) db_test.cc now tests a new configuration that sets reuse_logs to true.

(6) fault_injection_test also tests a reuse_logs==true config.

(7) Added a new recovery_test.
  • Loading branch information
ghemawat authored and cmumford committed Aug 11, 2015
1 parent 77948e7 commit ac1d69d
Show file tree
Hide file tree
Showing 22 changed files with 707 additions and 155 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ TESTS = \
issue200_test \
log_test \
memenv_test \
recovery_test \
skiplist_test \
table_test \
version_edit_test \
Expand Down Expand Up @@ -177,6 +178,9 @@ issue200_test: issues/issue200_test.o $(LIBOBJECTS) $(TESTHARNESS)
log_test: db/log_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/log_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)

recovery_test: db/recovery_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/recovery_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)

table_test: table/table_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) table/table_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)

Expand Down
2 changes: 1 addition & 1 deletion db/corruption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class CorruptionTest {
tiny_cache_ = NewLRUCache(100);
options_.env = &env_;
options_.block_cache = tiny_cache_;
dbname_ = test::TmpDir() + "/db_test";
dbname_ = test::TmpDir() + "/corruption_test";
DestroyDB(dbname_, options_);

db_ = NULL;
Expand Down
7 changes: 7 additions & 0 deletions db/db_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ static int FLAGS_bloom_bits = -1;
// benchmark will fail.
static bool FLAGS_use_existing_db = false;

// If true, reuse existing log/MANIFEST files when re-opening a database.
static bool FLAGS_reuse_logs = false;

// Use the db with the following name.
static const char* FLAGS_db = NULL;

Expand Down Expand Up @@ -700,6 +703,7 @@ class Benchmark {
options.write_buffer_size = FLAGS_write_buffer_size;
options.max_open_files = FLAGS_open_files;
options.filter_policy = filter_policy_;
options.reuse_logs = FLAGS_reuse_logs;
Status s = DB::Open(options, FLAGS_db, &db_);
if (!s.ok()) {
fprintf(stderr, "open error: %s\n", s.ToString().c_str());
Expand Down Expand Up @@ -954,6 +958,9 @@ int main(int argc, char** argv) {
} else if (sscanf(argv[i], "--use_existing_db=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) {
FLAGS_use_existing_db = n;
} else if (sscanf(argv[i], "--reuse_logs=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) {
FLAGS_reuse_logs = n;
} else if (sscanf(argv[i], "--num=%d%c", &n, &junk) == 1) {
FLAGS_num = n;
} else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) {
Expand Down
178 changes: 109 additions & 69 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
db_lock_(NULL),
shutting_down_(NULL),
bg_cv_(&mutex_),
mem_(new MemTable(internal_comparator_)),
mem_(NULL),
imm_(NULL),
logfile_(NULL),
logfile_number_(0),
Expand All @@ -134,7 +134,6 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
tmp_batch_(new WriteBatch),
bg_compaction_scheduled_(false),
manual_compaction_(NULL) {
mem_->Ref();
has_imm_.Release_Store(NULL);

// Reserve ten files or so for other uses and give the rest to TableCache.
Expand Down Expand Up @@ -271,7 +270,7 @@ void DBImpl::DeleteObsoleteFiles() {
}
}

Status DBImpl::Recover(VersionEdit* edit) {
Status DBImpl::Recover(VersionEdit* edit, bool *save_manifest) {
mutex_.AssertHeld();

// Ignore error from CreateDir since the creation of the DB is
Expand Down Expand Up @@ -301,66 +300,69 @@ Status DBImpl::Recover(VersionEdit* edit) {
}
}

s = versions_->Recover();
if (s.ok()) {
SequenceNumber max_sequence(0);

// Recover from all newer log files than the ones named in the
// descriptor (new log files may have been added by the previous
// incarnation without registering them in the descriptor).
//
// Note that PrevLogNumber() is no longer used, but we pay
// attention to it in case we are recovering a database
// produced by an older version of leveldb.
const uint64_t min_log = versions_->LogNumber();
const uint64_t prev_log = versions_->PrevLogNumber();
std::vector<std::string> filenames;
s = env_->GetChildren(dbname_, &filenames);
s = versions_->Recover(save_manifest);
if (!s.ok()) {
return s;
}
SequenceNumber max_sequence(0);

// Recover from all newer log files than the ones named in the
// descriptor (new log files may have been added by the previous
// incarnation without registering them in the descriptor).
//
// Note that PrevLogNumber() is no longer used, but we pay
// attention to it in case we are recovering a database
// produced by an older version of leveldb.
const uint64_t min_log = versions_->LogNumber();
const uint64_t prev_log = versions_->PrevLogNumber();
std::vector<std::string> filenames;
s = env_->GetChildren(dbname_, &filenames);
if (!s.ok()) {
return s;
}
std::set<uint64_t> expected;
versions_->AddLiveFiles(&expected);
uint64_t number;
FileType type;
std::vector<uint64_t> logs;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)) {
expected.erase(number);
if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
logs.push_back(number);
}
}
if (!expected.empty()) {
char buf[50];
snprintf(buf, sizeof(buf), "%d missing files; e.g.",
static_cast<int>(expected.size()));
return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
}

// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) {
s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
&max_sequence);
if (!s.ok()) {
return s;
}
std::set<uint64_t> expected;
versions_->AddLiveFiles(&expected);
uint64_t number;
FileType type;
std::vector<uint64_t> logs;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)) {
expected.erase(number);
if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
logs.push_back(number);
}
}
if (!expected.empty()) {
char buf[50];
snprintf(buf, sizeof(buf), "%d missing files; e.g.",
static_cast<int>(expected.size()));
return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
}

// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) {
s = RecoverLogFile(logs[i], edit, &max_sequence);

// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(logs[i]);
}
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(logs[i]);
}

if (s.ok()) {
if (versions_->LastSequence() < max_sequence) {
versions_->SetLastSequence(max_sequence);
}
}
if (versions_->LastSequence() < max_sequence) {
versions_->SetLastSequence(max_sequence);
}

return s;
return Status::OK();
}

Status DBImpl::RecoverLogFile(uint64_t log_number,
VersionEdit* edit,
Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
bool* save_manifest, VersionEdit* edit,
SequenceNumber* max_sequence) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
Expand Down Expand Up @@ -405,6 +407,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
std::string scratch;
Slice record;
WriteBatch batch;
int compactions = 0;
MemTable* mem = NULL;
while (reader.ReadRecord(&record, &scratch) &&
status.ok()) {
Expand Down Expand Up @@ -432,25 +435,52 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
}

if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
compactions++;
*save_manifest = true;
status = WriteLevel0Table(mem, edit, NULL);
mem->Unref();
mem = NULL;
if (!status.ok()) {
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
break;
}
mem->Unref();
mem = NULL;
}
}

if (status.ok() && mem != NULL) {
status = WriteLevel0Table(mem, edit, NULL);
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
delete file;

// See if we should keep reusing the last log file.
if (status.ok() && options_.reuse_logs && last_log && compactions == 0) {
assert(logfile_ == NULL);
assert(log_ == NULL);
assert(mem_ == NULL);
uint64_t lfile_size;
if (env_->GetFileSize(fname, &lfile_size).ok() &&
env_->NewAppendableFile(fname, &logfile_).ok()) {
Log(options_.info_log, "Reusing old log %s \n", fname.c_str());
log_ = new log::Writer(logfile_, lfile_size);
logfile_number_ = log_number;
if (mem != NULL) {
mem_ = mem;
mem = NULL;
} else {
// mem can be NULL if lognum exists but was empty.
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
}
}
}

if (mem != NULL) {
// mem did not get reused; compact it.
if (status.ok()) {
*save_manifest = true;
status = WriteLevel0Table(mem, edit, NULL);
}
mem->Unref();
}

if (mem != NULL) mem->Unref();
delete file;
return status;
}

Expand Down Expand Up @@ -1449,8 +1479,11 @@ Status DB::Open(const Options& options, const std::string& dbname,
DBImpl* impl = new DBImpl(options, dbname);
impl->mutex_.Lock();
VersionEdit edit;
Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
if (s.ok()) {
// Recover handles create_if_missing, error_if_exists
bool save_manifest = false;
Status s = impl->Recover(&edit, &save_manifest);
if (s.ok() && impl->mem_ == NULL) {
// Create new log and a corresponding memtable.
uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile* lfile;
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
Expand All @@ -1460,15 +1493,22 @@ Status DB::Open(const Options& options, const std::string& dbname,
impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}
if (s.ok()) {
impl->DeleteObsoleteFiles();
impl->MaybeScheduleCompaction();
impl->mem_ = new MemTable(impl->internal_comparator_);
impl->mem_->Ref();
}
}
if (s.ok() && save_manifest) {
edit.SetPrevLogNumber(0); // No older logs needed after recovery.
edit.SetLogNumber(impl->logfile_number_);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}
if (s.ok()) {
impl->DeleteObsoleteFiles();
impl->MaybeScheduleCompaction();
}
impl->mutex_.Unlock();
if (s.ok()) {
assert(impl->mem_ != NULL);
*dbptr = impl;
} else {
delete impl;
Expand Down
8 changes: 4 additions & 4 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ class DBImpl : public DB {
// Recover the descriptor from persistent storage. May do a significant
// amount of work to recover recently logged updates. Any changes to
// be made to the descriptor are added to *edit.
Status Recover(VersionEdit* edit) EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status Recover(VersionEdit* edit, bool* save_manifest)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);

void MaybeIgnoreError(Status* s) const;

Expand All @@ -90,9 +91,8 @@ class DBImpl : public DB {
// Errors are recorded in bg_error_.
void CompactMemTable() EXCLUSIVE_LOCKS_REQUIRED(mutex_);

Status RecoverLogFile(uint64_t log_number,
VersionEdit* edit,
SequenceNumber* max_sequence)
Status RecoverLogFile(uint64_t log_number, bool last_log, bool* save_manifest,
VersionEdit* edit, SequenceNumber* max_sequence)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);

Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base)
Expand Down
21 changes: 20 additions & 1 deletion db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ class DBTest {
// Sequence of option configurations to try
enum OptionConfig {
kDefault,
kReuse,
kFilter,
kUncompressed,
kEnd
Expand Down Expand Up @@ -237,7 +238,11 @@ class DBTest {
// Return the current option configuration.
Options CurrentOptions() {
Options options;
options.reuse_logs = false;
switch (option_config_) {
case kReuse:
options.reuse_logs = true;
break;
case kFilter:
options.filter_policy = filter_policy_;
break;
Expand Down Expand Up @@ -1080,6 +1085,14 @@ TEST(DBTest, ApproximateSizes) {
// 0 because GetApproximateSizes() does not account for memtable space
ASSERT_TRUE(Between(Size("", Key(50)), 0, 0));

if (options.reuse_logs) {
// Recovery will reuse memtable, and GetApproximateSizes() does not
// account for memtable usage;
Reopen(&options);
ASSERT_TRUE(Between(Size("", Key(50)), 0, 0));
continue;
}

// Check sizes across recovery by reopening a few times
for (int run = 0; run < 3; run++) {
Reopen(&options);
Expand Down Expand Up @@ -1123,6 +1136,11 @@ TEST(DBTest, ApproximateSizes_MixOfSmallAndLarge) {
ASSERT_OK(Put(Key(6), RandomString(&rnd, 300000)));
ASSERT_OK(Put(Key(7), RandomString(&rnd, 10000)));

if (options.reuse_logs) {
// Need to force a memtable compaction since recovery does not do so.
ASSERT_OK(dbfull()->TEST_CompactMemTable());
}

// Check sizes across recovery by reopening a few times
for (int run = 0; run < 3; run++) {
Reopen(&options);
Expand Down Expand Up @@ -2084,7 +2102,8 @@ void BM_LogAndApply(int iters, int num_base_files) {
InternalKeyComparator cmp(BytewiseComparator());
Options options;
VersionSet vset(dbname, &options, NULL, &cmp);
ASSERT_OK(vset.Recover());
bool save_manifest;
ASSERT_OK(vset.Recover(&save_manifest));
VersionEdit vbase;
uint64_t fnum = 1;
for (int i = 0; i < num_base_files; i++) {
Expand Down
Loading

0 comments on commit ac1d69d

Please sign in to comment.