From 18658f435759ac17386a404a3c963828e6cac2dc Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Tue, 4 Dec 2018 16:57:03 -0800 Subject: [PATCH] Universal Compaction with TTL --- db/compaction_picker.cc | 75 +++++++++++++------------- db/compaction_picker.h | 5 ++ db/compaction_picker_universal.cc | 88 +++++++++++++++++++++---------- db/compaction_picker_universal.h | 13 ++++- db/listener_test.cc | 30 +++++++++++ 5 files changed, 145 insertions(+), 66 deletions(-) diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 6510d4bc0c9..2300c4566e4 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -996,6 +996,41 @@ void CompactionPicker::UnregisterCompaction(Compaction* c) { compactions_in_progress_.erase(c); } +void CompactionPicker::PickExpiredTtlFiles( + const std::string& cf_name, VersionStorageInfo* vstorage, int* start_level, + int* output_level, CompactionInputFiles* start_level_inputs) { + if (vstorage->ExpiredTtlFiles().empty()) { + return; + } + + auto continuation = [&, cf_name](std::pair level_file) { + // If it's being compacted it has nothing to do here. + // If this assert() fails that means that some function marked some + // files as being_compacted, but didn't call ComputeCompactionScore() + assert(!level_file.second->being_compacted); + *start_level = level_file.first; + *output_level = + (*start_level == 0) ? vstorage->base_level() : *start_level + 1; + + if (*start_level == 0 && !level0_compactions_in_progress()->empty()) { + return false; + } + + start_level_inputs->files = {level_file.second}; + start_level_inputs->level = *start_level; + return ExpandInputsToCleanCut(cf_name, vstorage, start_level_inputs); + }; + + for (auto& level_file : vstorage->ExpiredTtlFiles()) { + if (continuation(level_file)) { + // found the compaction! + return; + } + } + + start_level_inputs->files.clear(); +} + void CompactionPicker::PickFilesMarkedForCompaction( const std::string& cf_name, VersionStorageInfo* vstorage, int* start_level, int* output_level, CompactionInputFiles* start_level_inputs) { @@ -1167,42 +1202,6 @@ class LevelCompactionBuilder { static const int kMinFilesForIntraL0Compaction = 4; }; -void LevelCompactionBuilder::PickExpiredTtlFiles() { - if (vstorage_->ExpiredTtlFiles().empty()) { - return; - } - - auto continuation = [&](std::pair level_file) { - // If it's being compacted it has nothing to do here. - // If this assert() fails that means that some function marked some - // files as being_compacted, but didn't call ComputeCompactionScore() - assert(!level_file.second->being_compacted); - start_level_ = level_file.first; - output_level_ = - (start_level_ == 0) ? vstorage_->base_level() : start_level_ + 1; - - if ((start_level_ == vstorage_->num_non_empty_levels() - 1) || - (start_level_ == 0 && - !compaction_picker_->level0_compactions_in_progress()->empty())) { - return false; - } - - start_level_inputs_.files = {level_file.second}; - start_level_inputs_.level = start_level_; - return compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_, - &start_level_inputs_); - }; - - for (auto& level_file : vstorage_->ExpiredTtlFiles()) { - if (continuation(level_file)) { - // found the compaction! - return; - } - } - - start_level_inputs_.files.clear(); -} - void LevelCompactionBuilder::SetupInitialFiles() { // Find the compactions by size on all levels. bool skipped_l0_to_base = false; @@ -1287,7 +1286,9 @@ void LevelCompactionBuilder::SetupInitialFiles() { } assert(start_level_inputs_.empty()); - PickExpiredTtlFiles(); + compaction_picker_->PickExpiredTtlFiles(cf_name_, vstorage_, &start_level_, + &output_level_, + &start_level_inputs_); if (!start_level_inputs_.empty()) { compaction_reason_ = CompactionReason::kTtl; } diff --git a/db/compaction_picker.h b/db/compaction_picker.h index c60d792852d..8906f8fa179 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -181,6 +181,11 @@ class CompactionPicker { int* start_level, int* output_level, CompactionInputFiles* start_level_inputs); + void PickExpiredTtlFiles(const std::string& cf_name, + VersionStorageInfo* vstorage, int* start_level, + int* output_level, + CompactionInputFiles* start_level_inputs); + bool GetOverlappingL0Files(VersionStorageInfo* vstorage, CompactionInputFiles* start_level_inputs, int output_level, int* parent_index); diff --git a/db/compaction_picker_universal.cc b/db/compaction_picker_universal.cc index c0cf2b06221..c51daea6376 100644 --- a/db/compaction_picker_universal.cc +++ b/db/compaction_picker_universal.cc @@ -168,6 +168,9 @@ bool UniversalCompactionPicker::NeedsCompaction( if (!vstorage->FilesMarkedForCompaction().empty()) { return true; } + if (!vstorage->ExpiredTtlFiles().empty()) { + return true; + } return false; } @@ -266,11 +269,14 @@ Compaction* UniversalCompactionPicker::PickCompaction( (vstorage->FilesMarkedForCompaction().empty() && sorted_runs.size() < (unsigned int)mutable_cf_options .level0_file_num_compaction_trigger)) { - ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: nothing to do\n", - cf_name.c_str()); - TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return", - nullptr); - return nullptr; + if ((mutable_cf_options.ttl == 0) || + (mutable_cf_options.ttl > 0 && vstorage->ExpiredTtlFiles().empty())) { + ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: nothing to do\n", + cf_name.c_str()); + TEST_SYNC_POINT_CALLBACK( + "UniversalCompactionPicker::PickCompaction:Return", nullptr); + return nullptr; + } } VersionStorageInfo::LevelSummaryStorage tmp; ROCKS_LOG_BUFFER_MAX_SZ( @@ -337,15 +343,24 @@ Compaction* UniversalCompactionPicker::PickCompaction( } if (c == nullptr) { - if ((c = PickDeleteTriggeredCompaction(cf_name, mutable_cf_options, - vstorage, score, sorted_runs, - log_buffer)) != nullptr) { + if ((c = PickSomeTriggeredCompaction(cf_name, mutable_cf_options, vstorage, + score, sorted_runs, log_buffer, + MARKED_FILES)) != nullptr) { ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: delete triggered compaction\n", cf_name.c_str()); } } + if (c == nullptr) { + if ((c = PickSomeTriggeredCompaction(cf_name, mutable_cf_options, vstorage, + score, sorted_runs, log_buffer, + TTL)) != nullptr) { + ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: ttl triggered compaction\n", + cf_name.c_str()); + } + } + if (c == nullptr) { TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return", nullptr); @@ -783,12 +798,11 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSizeAmp( CompactionReason::kUniversalSizeAmplification); } -// Pick files marked for compaction. Typically, files are marked by -// CompactOnDeleteCollector due to the presence of tombstones. -Compaction* UniversalCompactionPicker::PickDeleteTriggeredCompaction( +Compaction* UniversalCompactionPicker::PickSomeTriggeredCompaction( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, double score, - const std::vector& /*sorted_runs*/, LogBuffer* /*log_buffer*/) { + const std::vector& /*sorted_runs*/, LogBuffer* /*log_buffer*/, + const TriggerStrategy strategy) { CompactionInputFiles start_level_inputs; int output_level; std::vector inputs; @@ -796,34 +810,46 @@ Compaction* UniversalCompactionPicker::PickDeleteTriggeredCompaction( if (vstorage->num_levels() == 1) { // This is single level universal. Since we're basically trying to reclaim // space by processing files marked for compaction due to high tombstone - // density, let's do the same thing as compaction to reduce size amp which - // has the same goals. - bool compact = false; + // density or ttl-expired files, let's do the same thing as compaction to + // reduce size amp which has the same goals. start_level_inputs.level = 0; start_level_inputs.files.clear(); output_level = 0; - for (FileMetaData* f : vstorage->LevelFiles(0)) { - if (f->marked_for_compaction) { - compact = true; + if (strategy == MARKED_FILES) { + bool compact = false; + for (FileMetaData* f : vstorage->LevelFiles(0)) { + if (f->marked_for_compaction) { + compact = true; + } + if (compact) { + start_level_inputs.files.push_back(f); + } } - if (compact) { - start_level_inputs.files.push_back(f); + } else if (strategy == TTL) { + for (std::pair f : vstorage->ExpiredTtlFiles()) { + start_level_inputs.files.push_back(f.second); } } if (start_level_inputs.size() <= 1) { // If only the last file in L0 is marked for compaction, ignore it return nullptr; } + inputs.push_back(start_level_inputs); } else { int start_level; // For multi-level universal, the strategy is to make this look more like - // leveled. We pick one of the files marked for compaction and compact with - // overlapping files in the adjacent level. - PickFilesMarkedForCompaction(cf_name, vstorage, &start_level, &output_level, - &start_level_inputs); + // leveled. We pick one of the files marked for compaction or ttl-expired + // files and compact with overlapping files in the adjacent level. + if (strategy == MARKED_FILES) { + PickFilesMarkedForCompaction(cf_name, vstorage, &start_level, + &output_level, &start_level_inputs); + } else if (strategy == TTL) { + PickExpiredTtlFiles(cf_name, vstorage, &start_level, &output_level, + &start_level_inputs); + } if (start_level_inputs.empty()) { return nullptr; } @@ -888,6 +914,14 @@ Compaction* UniversalCompactionPicker::PickDeleteTriggeredCompaction( for (FileMetaData* f : vstorage->LevelFiles(output_level)) { estimated_total_size += f->fd.GetFileSize(); } + + // defaults for marked-files (delete triggered) compaction + bool is_manual_compaction = true; + auto compaction_reason = CompactionReason::kFilesMarkedForCompaction; + if (strategy == TTL) { + is_manual_compaction = false; + compaction_reason = CompactionReason::kTtl; + } uint32_t path_id = GetPathId(ioptions_, mutable_cf_options, estimated_total_size); return new Compaction( @@ -898,10 +932,10 @@ Compaction* UniversalCompactionPicker::PickDeleteTriggeredCompaction( GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level, 1), GetCompressionOptions(ioptions_, vstorage, output_level), - /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ true, - score, false /* deletion_compaction */, - CompactionReason::kFilesMarkedForCompaction); + /* max_subcompactions */ 0, /* grandparents */ {}, is_manual_compaction, + score, false /* deletion_compaction */, compaction_reason); } + } // namespace rocksdb #endif // !ROCKSDB_LITE diff --git a/db/compaction_picker_universal.h b/db/compaction_picker_universal.h index 375e5998e25..efe1b4e355b 100644 --- a/db/compaction_picker_universal.h +++ b/db/compaction_picker_universal.h @@ -73,10 +73,19 @@ class UniversalCompactionPicker : public CompactionPicker { VersionStorageInfo* vstorage, double score, const std::vector& sorted_runs, LogBuffer* log_buffer); - Compaction* PickDeleteTriggeredCompaction( + enum TriggerStrategy { + // Pick files marked for compaction. Typically, files are marked by + // CompactOnDeletionCollector due to the presence of tombstones. + MARKED_FILES, + // Pick ttl expired files. + TTL + }; + + Compaction* PickSomeTriggeredCompaction( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, double score, - const std::vector& sorted_runs, LogBuffer* log_buffer); + const std::vector& sorted_runs, LogBuffer* log_buffer, + const TriggerStrategy strategy); // Used in universal compaction when the enabled_trivial_move // option is set. Checks whether there are any overlapping files diff --git a/db/listener_test.cc b/db/listener_test.cc index cbbffc8cb75..a892645a5a1 100644 --- a/db/listener_test.cc +++ b/db/listener_test.cc @@ -551,7 +551,37 @@ TEST_F(EventListenerTest, CompactionReasonUniversal) { ASSERT_EQ(compaction_reason, CompactionReason::kUniversalSizeAmplification); } + options.level0_file_num_compaction_trigger = 8; + options.compaction_options_universal.max_size_amplification_percent = 100000; + options.compaction_options_universal.size_ratio = 100000; + options.ttl = 24 * 60 * 60; // 24 hours + env_->time_elapse_only_sleep_ = false; + options.env = env_; + env_->addon_time_.store(0); + + DestroyAndReopen(options); + listener->compaction_reasons_.clear(); + + // Write 2 files in L0 + for (int i = 0; i < 2; i++) { + GenerateNewRandomFile(&rnd); + } + + // forward time by 36 hours + env_->addon_time_.fetch_add(36 * 60 * 60); + for (int i = 0; i < 1; i++) { + GenerateNewRandomFile(&rnd); + } + dbfull()->TEST_WaitForCompact(); + + ASSERT_GT(listener->compaction_reasons_.size(), 0); + for (auto compaction_reason : listener->compaction_reasons_) { + ASSERT_EQ(compaction_reason, CompactionReason::kTtl); + } + options.disable_auto_compactions = true; + options.compaction_options_universal.max_size_amplification_percent = 1; + options.ttl = 0; Close(); listener->compaction_reasons_.clear(); Reopen(options);