Skip to content

Commit

Permalink
storage: add tombstone_retention_ms to compaction_config
Browse files Browse the repository at this point in the history
  • Loading branch information
WillemKauf committed Sep 6, 2024
1 parent 1ad0a0e commit 832a996
Show file tree
Hide file tree
Showing 20 changed files with 118 additions and 14 deletions.
3 changes: 3 additions & 0 deletions src/v/cloud_storage/tests/cloud_storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ TEST_P(EndToEndFixture, TestProduceConsumeFromCloud) {
model::timestamp::min(),
1,
log->stm_manager()->max_collectible_offset(),
std::nullopt,
ss::default_priority_class(),
as);
partition->log()->housekeeping(housekeeping_conf).get();
Expand Down Expand Up @@ -516,6 +517,7 @@ TEST_P(CloudStorageEndToEndManualTest, TestTimequeryAfterArchivalGC) {
model::timestamp::min(),
1, // max_bytes_in_log
log->stm_manager()->max_collectible_offset(),
std::nullopt,
ss::default_priority_class(),
as);
partition->log()->housekeeping(housekeeping_conf).get();
Expand Down Expand Up @@ -809,6 +811,7 @@ TEST_P(EndToEndFixture, TestCloudStorageTimequery) {
model::timestamp::max(),
0,
log->stm_manager()->max_collectible_offset(),
std::nullopt,
ss::default_priority_class(),
as);
partition->log()->housekeeping(housekeeping_conf).get();
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/archival/tests/async_data_uploader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ class async_data_uploader_fixture : public redpanda_thread_fixture {
model::timestamp::min(),
std::nullopt,
max_collect_offset,
std::nullopt,
ss::default_priority_class(),
as);

Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/archival/tests/ntp_archiver_reupload_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ struct reupload_fixture : public archiver_fixture {
model::timestamp::max(),
std::nullopt,
max_collectible,
std::nullopt,
ss::default_priority_class(),
abort_source})
.get();
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/archival/tests/ntp_archiver_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,7 @@ FIXTURE_TEST(
model::timestamp::now(),
std::nullopt,
model::offset{999},
std::nullopt,
ss::default_priority_class(),
as))
.get0();
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/tests/manual_log_deletion_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ struct manual_deletion_fixture : public raft_test_fixture {
retention_timestamp,
100_MiB,
model::offset::max(),
std::nullopt,
ss::default_priority_class(),
as,
storage::ntp_sanitizer_config{.sanitize_only = true}))
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/tests/tx_compaction_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class tx_executor {
model::timestamp::now().value() - ret_duration.count()),
std::nullopt,
log->stm_manager()->max_collectible_offset(),
std::nullopt,
ss::default_priority_class(),
dummy_as,
})
Expand Down Expand Up @@ -228,6 +229,7 @@ class tx_executor {
model::timestamp::min(),
std::nullopt,
model::offset::max(),
std::nullopt,
ss::default_priority_class(),
as);
// Compacts until a single sealed segment remains, other than the
Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/server/tests/group_tx_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ ss::future<> run_workload(
model::timestamp::max(),
std::nullopt,
log->stm_manager()->max_collectible_offset(),
std::nullopt,
ss::default_priority_class(),
dummy_as,
})
Expand Down
1 change: 1 addition & 0 deletions src/v/raft/tests/append_entries_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,7 @@ FIXTURE_TEST(test_collected_log_recovery, raft_test_fixture) {
first_ts,
100_MiB,
model::offset::max(),
std::nullopt,
ss::default_priority_class(),
as,
storage::ntp_sanitizer_config{.sanitize_only = true}))
Expand Down
2 changes: 2 additions & 0 deletions src/v/storage/log_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ log_manager::housekeeping_scan(model::timestamp collection_threshold) {
collection_threshold,
_config.retention_bytes(),
current_log.handle->stm_manager()->max_collectible_offset(),
/*TODO: current_log.handle->config().tombstone_retention_ms()*/
std::nullopt,
_config.compaction_priority,
_abort_source,
std::move(ntp_sanitizer_cfg),
Expand Down
1 change: 1 addition & 0 deletions src/v/storage/opfuzz/opfuzz.cc
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ struct compact_op final : opfuzz::op {
model::timestamp::max(),
std::nullopt,
model::offset::max(),
std::nullopt,
ss::default_priority_class(),
*(ctx._as),
storage::ntp_sanitizer_config{.sanitize_only = true});
Expand Down
4 changes: 4 additions & 0 deletions src/v/storage/tests/compaction_e2e_multinode_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ FIXTURE_TEST(replicate_after_compaction, compaction_multinode_test) {
model::timestamp::min(),
std::nullopt,
first_log->stm_manager()->max_collectible_offset(),
std::nullopt,
ss::default_priority_class(),
as);
first_log->housekeeping(conf).get();
Expand Down Expand Up @@ -125,6 +126,7 @@ FIXTURE_TEST(replicate_after_compaction, compaction_multinode_test) {
model::timestamp::min(),
std::nullopt,
new_log->stm_manager()->max_collectible_offset(),
std::nullopt,
ss::default_priority_class(),
as);
new_log->housekeeping(conf2).get();
Expand Down Expand Up @@ -197,6 +199,7 @@ FIXTURE_TEST(compact_transactions_and_replicate, compaction_multinode_test) {
model::timestamp::min(),
std::nullopt,
first_log->stm_manager()->max_collectible_offset(),
std::nullopt,
ss::default_priority_class(),
as);
first_log->housekeeping(conf).get();
Expand Down Expand Up @@ -225,6 +228,7 @@ FIXTURE_TEST(compact_transactions_and_replicate, compaction_multinode_test) {
model::timestamp::min(),
std::nullopt,
new_log->stm_manager()->max_collectible_offset(),
std::nullopt,
ss::default_priority_class(),
as);
new_log->housekeeping(conf2).get();
Expand Down
7 changes: 7 additions & 0 deletions src/v/storage/tests/compaction_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ TEST_P(CompactionFixtureParamTest, TestDedupeOnePass) {
auto& disk_log = dynamic_cast<storage::disk_log_impl&>(*log);
storage::compaction_config cfg(
disk_log.segments().back()->offsets().get_base_offset(),
std::nullopt,
ss::default_priority_class(),
never_abort,
std::nullopt,
Expand Down Expand Up @@ -239,6 +240,7 @@ TEST_F(CompactionFixtureTest, TestDedupeMultiPass) {
auto& disk_log = dynamic_cast<storage::disk_log_impl&>(*log);
storage::compaction_config cfg(
disk_log.segments().back()->offsets().get_base_offset(),
std::nullopt,
ss::default_priority_class(),
never_abort,
std::nullopt,
Expand Down Expand Up @@ -281,6 +283,7 @@ TEST_P(CompactionFixtureBatchSizeParamTest, TestRecompactWithNewData) {
auto& disk_log = dynamic_cast<storage::disk_log_impl&>(*log);
storage::compaction_config cfg(
disk_log.segments().back()->offsets().get_base_offset(),
std::nullopt,
ss::default_priority_class(),
never_abort,
std::nullopt,
Expand All @@ -300,6 +303,7 @@ TEST_P(CompactionFixtureBatchSizeParamTest, TestRecompactWithNewData) {
generate_data(1, cardinality, records_per_segment).get();
storage::compaction_config new_cfg(
disk_log.segments().back()->offsets().get_base_offset(),
std::nullopt,
ss::default_priority_class(),
never_abort,
std::nullopt,
Expand Down Expand Up @@ -348,6 +352,7 @@ TEST_F(CompactionFixtureTest, TestCompactWithNonDataBatches) {
= disk_log.get_probe().get_segments_compacted();
storage::compaction_config new_cfg(
disk_log.segments().back()->offsets().get_base_offset(),
std::nullopt,
ss::default_priority_class(),
never_abort,
std::nullopt);
Expand Down Expand Up @@ -435,6 +440,7 @@ TEST_P(CompactionFilledReaderTest, ReadFilledGaps) {
// when reading.
storage::compaction_config cfg(
disk_log.segments().back()->offsets().get_base_offset(),
std::nullopt,
ss::default_priority_class(),
never_abort,
std::nullopt,
Expand Down Expand Up @@ -493,6 +499,7 @@ TEST_F(CompactionFixtureTest, TestReadFilledGapsWithTerms) {

storage::compaction_config cfg(
disk_log.segments().back()->offsets().get_base_offset(),
std::nullopt,
ss::default_priority_class(),
never_abort,
std::nullopt,
Expand Down
13 changes: 10 additions & 3 deletions src/v/storage/tests/log_truncate_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ FIXTURE_TEST(test_truncate_last_single_record_batch, storage_test_fixture) {
}

FIXTURE_TEST(
test_truncate_whole_log_when_logs_are_garbadge_collected,
test_truncate_whole_log_when_logs_are_garbage_collected,
storage_test_fixture) {
auto cfg = default_log_config(test_dir);
storage::log_manager mgr = make_log_manager(cfg);
Expand Down Expand Up @@ -364,6 +364,7 @@ FIXTURE_TEST(
ts,
std::nullopt,
model::offset::max(),
std::nullopt,
ss::default_priority_class(),
as))
.get0();
Expand Down Expand Up @@ -545,6 +546,7 @@ FIXTURE_TEST(test_concurrent_prefix_truncate_and_gc, storage_test_fixture) {
ts,
std::nullopt,
model::offset::max(),
std::nullopt,
ss::default_priority_class(),
as));

Expand Down Expand Up @@ -591,7 +593,7 @@ FIXTURE_TEST(test_concurrent_truncate_and_compaction, storage_test_fixture) {
// leaving room for further windowed compaction.
ss::abort_source as;
compaction_config compaction_cfg(
model::offset::max(), ss::default_priority_class(), as);
model::offset::max(), std::nullopt, ss::default_priority_class(), as);
auto& disk_log = *dynamic_cast<disk_log_impl*>(log.get());
disk_log.adjacent_merge_compact(compaction_cfg).get();
disk_log.adjacent_merge_compact(compaction_cfg).get();
Expand All @@ -608,7 +610,12 @@ FIXTURE_TEST(test_concurrent_truncate_and_compaction, storage_test_fixture) {
auto ts = now();
auto sleep_ms1 = random_generators::get_int(0, 100);
housekeeping_config housekeeping_cfg(
ts, std::nullopt, model::offset::max(), ss::default_priority_class(), as);
ts,
std::nullopt,
model::offset::max(),
std::nullopt,
ss::default_priority_class(),
as);
auto f1 = ss::sleep(sleep_ms1 * 1ms).then([&] {
return log->housekeeping(housekeeping_cfg);
});
Expand Down
30 changes: 24 additions & 6 deletions src/v/storage/tests/segment_deduplication_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ TEST(FindSlidingRangeTest, TestCollectSegments) {
for (int start = 0; start < 30; start += 5) {
for (int end = start; end < 30; end += 5) {
compaction_config cfg(
model::offset{end}, ss::default_priority_class(), never_abort);
model::offset{end},
std::nullopt,
ss::default_priority_class(),
never_abort);
auto segs = disk_log.find_sliding_range(cfg, model::offset{start});
if (end - start < 10) {
// If the compactible range isn't a full segment, we can't
Expand All @@ -98,7 +101,10 @@ TEST(FindSlidingRangeTest, TestCollectExcludesPrevious) {
auto cleanup = ss::defer([&] { b.stop().get(); });
auto& disk_log = b.get_disk_log_impl();
compaction_config cfg(
model::offset{30}, ss::default_priority_class(), never_abort);
model::offset{30},
std::nullopt,
ss::default_priority_class(),
never_abort);
auto segs = disk_log.find_sliding_range(cfg);
ASSERT_EQ(3, segs.size());
ASSERT_EQ(segs.front()->offsets().get_base_offset(), model::offset{0});
Expand Down Expand Up @@ -129,7 +135,10 @@ TEST(FindSlidingRangeTest, TestCollectOneRecordSegments) {
auto cleanup = ss::defer([&] { b.stop().get(); });
auto& disk_log = b.get_disk_log_impl();
compaction_config cfg(
model::offset{30}, ss::default_priority_class(), never_abort);
model::offset{30},
std::nullopt,
ss::default_priority_class(),
never_abort);
auto segs = disk_log.find_sliding_range(cfg);
// Even though these segments don't have compactible records, they should
// be collected. E.g., they should still be self compacted to rebuild
Expand Down Expand Up @@ -164,7 +173,10 @@ TEST(BuildOffsetMap, TestBuildSimpleMap) {
auto& disk_log = b.get_disk_log_impl();
auto& segs = disk_log.segments();
compaction_config cfg(
model::offset{30}, ss::default_priority_class(), never_abort);
model::offset{30},
std::nullopt,
ss::default_priority_class(),
never_abort);
probe pb;

feature_table.start().get();
Expand Down Expand Up @@ -237,7 +249,10 @@ TEST(BuildOffsetMap, TestBuildMapWithMissingCompactedIndex) {
auto& disk_log = b.get_disk_log_impl();
auto& segs = disk_log.segments();
compaction_config cfg(
model::offset{30}, ss::default_priority_class(), never_abort);
model::offset{30},
std::nullopt,
ss::default_priority_class(),
never_abort);
for (const auto& s : segs) {
auto idx_path = s->path().to_compacted_index();
ASSERT_FALSE(ss::file_exists(idx_path.string()).get());
Expand Down Expand Up @@ -279,7 +294,10 @@ TEST(DeduplicateSegmentsTest, TestBadReader) {

// Build an offset map for our log.
compaction_config cfg(
model::offset{0}, ss::default_priority_class(), never_abort);
model::offset{0},
std::nullopt,
ss::default_priority_class(),
never_abort);
simple_key_offset_map all_segs_map(50);
auto map_start_offset = build_offset_map(
cfg,
Expand Down
1 change: 1 addition & 0 deletions src/v/storage/tests/storage_e2e_fixture_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ FIXTURE_TEST(test_concurrent_log_eviction_and_append, storage_e2e_fixture) {
/*gc_upper=*/model::timestamp::max(),
/*max_bytes_in_log=*/1,
/*max_collect_offset=*/model::offset::min(),
/*tombstone_retention_ms=*/std::nullopt,
ss::default_priority_class(),
as);

Expand Down
Loading

0 comments on commit 832a996

Please sign in to comment.