Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Universal Compaction with TTL #4749

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 38 additions & 37 deletions db/compaction_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,41 @@ void CompactionPicker::UnregisterCompaction(Compaction* c) {
compactions_in_progress_.erase(c);
}

void CompactionPicker::PickExpiredTtlFiles(
const std::string& cf_name, VersionStorageInfo* vstorage, int* start_level,
int* output_level, CompactionInputFiles* start_level_inputs) {
if (vstorage->ExpiredTtlFiles().empty()) {
return;
}

auto continuation = [&, cf_name](std::pair<int, FileMetaData*> level_file) {
// If it's being compacted it has nothing to do here.
// If this assert() fails that means that some function marked some
// files as being_compacted, but didn't call ComputeCompactionScore()
assert(!level_file.second->being_compacted);
*start_level = level_file.first;
*output_level =
(*start_level == 0) ? vstorage->base_level() : *start_level + 1;

if (*start_level == 0 && !level0_compactions_in_progress()->empty()) {
return false;
}

start_level_inputs->files = {level_file.second};
start_level_inputs->level = *start_level;
return ExpandInputsToCleanCut(cf_name, vstorage, start_level_inputs);
};

for (auto& level_file : vstorage->ExpiredTtlFiles()) {
if (continuation(level_file)) {
// found the compaction!
return;
}
}

start_level_inputs->files.clear();
}

void CompactionPicker::PickFilesMarkedForCompaction(
const std::string& cf_name, VersionStorageInfo* vstorage, int* start_level,
int* output_level, CompactionInputFiles* start_level_inputs) {
Expand Down Expand Up @@ -1167,42 +1202,6 @@ class LevelCompactionBuilder {
static const int kMinFilesForIntraL0Compaction = 4;
};

void LevelCompactionBuilder::PickExpiredTtlFiles() {
if (vstorage_->ExpiredTtlFiles().empty()) {
return;
}

auto continuation = [&](std::pair<int, FileMetaData*> level_file) {
// If it's being compacted it has nothing to do here.
// If this assert() fails that means that some function marked some
// files as being_compacted, but didn't call ComputeCompactionScore()
assert(!level_file.second->being_compacted);
start_level_ = level_file.first;
output_level_ =
(start_level_ == 0) ? vstorage_->base_level() : start_level_ + 1;

if ((start_level_ == vstorage_->num_non_empty_levels() - 1) ||
(start_level_ == 0 &&
!compaction_picker_->level0_compactions_in_progress()->empty())) {
return false;
}

start_level_inputs_.files = {level_file.second};
start_level_inputs_.level = start_level_;
return compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
&start_level_inputs_);
};

for (auto& level_file : vstorage_->ExpiredTtlFiles()) {
if (continuation(level_file)) {
// found the compaction!
return;
}
}

start_level_inputs_.files.clear();
}

void LevelCompactionBuilder::SetupInitialFiles() {
// Find the compactions by size on all levels.
bool skipped_l0_to_base = false;
Expand Down Expand Up @@ -1287,7 +1286,9 @@ void LevelCompactionBuilder::SetupInitialFiles() {
}

assert(start_level_inputs_.empty());
PickExpiredTtlFiles();
compaction_picker_->PickExpiredTtlFiles(cf_name_, vstorage_, &start_level_,
&output_level_,
&start_level_inputs_);
if (!start_level_inputs_.empty()) {
compaction_reason_ = CompactionReason::kTtl;
}
Expand Down
5 changes: 5 additions & 0 deletions db/compaction_picker.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ class CompactionPicker {
int* start_level, int* output_level,
CompactionInputFiles* start_level_inputs);

void PickExpiredTtlFiles(const std::string& cf_name,
VersionStorageInfo* vstorage, int* start_level,
int* output_level,
CompactionInputFiles* start_level_inputs);

bool GetOverlappingL0Files(VersionStorageInfo* vstorage,
CompactionInputFiles* start_level_inputs,
int output_level, int* parent_index);
Expand Down
88 changes: 61 additions & 27 deletions db/compaction_picker_universal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ bool UniversalCompactionPicker::NeedsCompaction(
if (!vstorage->FilesMarkedForCompaction().empty()) {
return true;
}
if (!vstorage->ExpiredTtlFiles().empty()) {
return true;
}
return false;
}

Expand Down Expand Up @@ -266,11 +269,14 @@ Compaction* UniversalCompactionPicker::PickCompaction(
(vstorage->FilesMarkedForCompaction().empty() &&
sorted_runs.size() < (unsigned int)mutable_cf_options
.level0_file_num_compaction_trigger)) {
ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: nothing to do\n",
cf_name.c_str());
TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return",
nullptr);
return nullptr;
if ((mutable_cf_options.ttl == 0) ||
(mutable_cf_options.ttl > 0 && vstorage->ExpiredTtlFiles().empty())) {
ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: nothing to do\n",
cf_name.c_str());
TEST_SYNC_POINT_CALLBACK(
"UniversalCompactionPicker::PickCompaction:Return", nullptr);
return nullptr;
}
}
VersionStorageInfo::LevelSummaryStorage tmp;
ROCKS_LOG_BUFFER_MAX_SZ(
Expand Down Expand Up @@ -337,15 +343,24 @@ Compaction* UniversalCompactionPicker::PickCompaction(
}

if (c == nullptr) {
if ((c = PickDeleteTriggeredCompaction(cf_name, mutable_cf_options,
vstorage, score, sorted_runs,
log_buffer)) != nullptr) {
if ((c = PickSomeTriggeredCompaction(cf_name, mutable_cf_options, vstorage,
score, sorted_runs, log_buffer,
MARKED_FILES)) != nullptr) {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Universal: delete triggered compaction\n",
cf_name.c_str());
}
}

if (c == nullptr) {
if ((c = PickSomeTriggeredCompaction(cf_name, mutable_cf_options, vstorage,
score, sorted_runs, log_buffer,
TTL)) != nullptr) {
ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: ttl triggered compaction\n",
cf_name.c_str());
}
}

if (c == nullptr) {
TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return",
nullptr);
Expand Down Expand Up @@ -783,47 +798,58 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSizeAmp(
CompactionReason::kUniversalSizeAmplification);
}

// Pick files marked for compaction. Typically, files are marked by
// CompactOnDeleteCollector due to the presence of tombstones.
Compaction* UniversalCompactionPicker::PickDeleteTriggeredCompaction(
Compaction* UniversalCompactionPicker::PickSomeTriggeredCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, double score,
const std::vector<SortedRun>& /*sorted_runs*/, LogBuffer* /*log_buffer*/) {
const std::vector<SortedRun>& /*sorted_runs*/, LogBuffer* /*log_buffer*/,
const TriggerStrategy strategy) {
CompactionInputFiles start_level_inputs;
int output_level;
std::vector<CompactionInputFiles> inputs;

if (vstorage->num_levels() == 1) {
// This is single level universal. Since we're basically trying to reclaim
// space by processing files marked for compaction due to high tombstone
// density, let's do the same thing as compaction to reduce size amp which
// has the same goals.
bool compact = false;
// density or ttl-expired files, let's do the same thing as compaction to
// reduce size amp which has the same goals.

start_level_inputs.level = 0;
start_level_inputs.files.clear();
output_level = 0;
for (FileMetaData* f : vstorage->LevelFiles(0)) {
if (f->marked_for_compaction) {
compact = true;
if (strategy == MARKED_FILES) {
bool compact = false;
for (FileMetaData* f : vstorage->LevelFiles(0)) {
if (f->marked_for_compaction) {
compact = true;
}
if (compact) {
start_level_inputs.files.push_back(f);
}
}
if (compact) {
start_level_inputs.files.push_back(f);
} else if (strategy == TTL) {
for (std::pair<int, FileMetaData*> f : vstorage->ExpiredTtlFiles()) {
start_level_inputs.files.push_back(f.second);
}
}
if (start_level_inputs.size() <= 1) {
// If only the last file in L0 is marked for compaction, ignore it
return nullptr;
}

inputs.push_back(start_level_inputs);
} else {
int start_level;

// For multi-level universal, the strategy is to make this look more like
// leveled. We pick one of the files marked for compaction and compact with
// overlapping files in the adjacent level.
PickFilesMarkedForCompaction(cf_name, vstorage, &start_level, &output_level,
&start_level_inputs);
// leveled. We pick one of the files marked for compaction or ttl-expired
// files and compact with overlapping files in the adjacent level.
if (strategy == MARKED_FILES) {
PickFilesMarkedForCompaction(cf_name, vstorage, &start_level,
&output_level, &start_level_inputs);
} else if (strategy == TTL) {
PickExpiredTtlFiles(cf_name, vstorage, &start_level, &output_level,
&start_level_inputs);
}
if (start_level_inputs.empty()) {
return nullptr;
}
Expand Down Expand Up @@ -888,6 +914,14 @@ Compaction* UniversalCompactionPicker::PickDeleteTriggeredCompaction(
for (FileMetaData* f : vstorage->LevelFiles(output_level)) {
estimated_total_size += f->fd.GetFileSize();
}

// defaults for marked-files (delete triggered) compaction
bool is_manual_compaction = true;
auto compaction_reason = CompactionReason::kFilesMarkedForCompaction;
if (strategy == TTL) {
is_manual_compaction = false;
compaction_reason = CompactionReason::kTtl;
}
uint32_t path_id =
GetPathId(ioptions_, mutable_cf_options, estimated_total_size);
return new Compaction(
Expand All @@ -898,10 +932,10 @@ Compaction* UniversalCompactionPicker::PickDeleteTriggeredCompaction(
GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level,
1),
GetCompressionOptions(ioptions_, vstorage, output_level),
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ true,
score, false /* deletion_compaction */,
CompactionReason::kFilesMarkedForCompaction);
/* max_subcompactions */ 0, /* grandparents */ {}, is_manual_compaction,
score, false /* deletion_compaction */, compaction_reason);
}

} // namespace rocksdb

#endif // !ROCKSDB_LITE
13 changes: 11 additions & 2 deletions db/compaction_picker_universal.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,19 @@ class UniversalCompactionPicker : public CompactionPicker {
VersionStorageInfo* vstorage, double score,
const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer);

Compaction* PickDeleteTriggeredCompaction(
enum TriggerStrategy {
// Pick files marked for compaction. Typically, files are marked by
// CompactOnDeletionCollector due to the presence of tombstones.
MARKED_FILES,
// Pick ttl expired files.
TTL
};

Compaction* PickSomeTriggeredCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, double score,
const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer);
const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer,
const TriggerStrategy strategy);

// Used in universal compaction when the enabled_trivial_move
// option is set. Checks whether there are any overlapping files
Expand Down
30 changes: 30 additions & 0 deletions db/listener_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,37 @@ TEST_F(EventListenerTest, CompactionReasonUniversal) {
ASSERT_EQ(compaction_reason, CompactionReason::kUniversalSizeAmplification);
}

options.level0_file_num_compaction_trigger = 8;
options.compaction_options_universal.max_size_amplification_percent = 100000;
options.compaction_options_universal.size_ratio = 100000;
options.ttl = 24 * 60 * 60; // 24 hours
env_->time_elapse_only_sleep_ = false;
options.env = env_;
env_->addon_time_.store(0);

DestroyAndReopen(options);
listener->compaction_reasons_.clear();

// Write 2 files in L0
for (int i = 0; i < 2; i++) {
GenerateNewRandomFile(&rnd);
}

// forward time by 36 hours
env_->addon_time_.fetch_add(36 * 60 * 60);
for (int i = 0; i < 1; i++) {
GenerateNewRandomFile(&rnd);
}
dbfull()->TEST_WaitForCompact();

ASSERT_GT(listener->compaction_reasons_.size(), 0);
for (auto compaction_reason : listener->compaction_reasons_) {
ASSERT_EQ(compaction_reason, CompactionReason::kTtl);
}

options.disable_auto_compactions = true;
options.compaction_options_universal.max_size_amplification_percent = 1;
options.ttl = 0;
Close();
listener->compaction_reasons_.clear();
Reopen(options);
Expand Down