Skip to content

Commit

Permalink
tests: update tests to consider sliding window compaction
Browse files Browse the repository at this point in the history
Now that sliding window compaction is the default, this updates several
tests to accommodate.

The general trend of the updated tests are:
- Many tests expect that running housekeeping self compacts a single
  segment at a time. This isn't true with sliding window compaction. To
  accommodate, this commite updates the compaction configs to force a
  single segment compaction.
- In some cases, similar to above, tests would repeatedly run
  housekeeping to ensure N segments would be self compacted. Instead,
  these tests are updated to run housekeeping once with an appropriate
  collectible offset.
- Updated archival tests to mark compacted segments with the
  window-compacted bit.
  • Loading branch information
andrwng committed Nov 10, 2023
1 parent 1fd242a commit 619f26f
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 57 deletions.
65 changes: 36 additions & 29 deletions src/v/archival/tests/ntp_archiver_reupload_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ struct reupload_fixture : public archiver_fixture {
}
wait_for_partition_leadership(manifest_ntp);
auto part = app.partition_manager.local().get(manifest_ntp);
tests::cooperative_spin_wait_with_timeout(10s, [part]() mutable {
RPTEST_REQUIRE_EVENTUALLY(10s, [part]() mutable {
return part->high_watermark() >= model::offset(1);
}).get();
});
init_archiver();
}

Expand Down Expand Up @@ -218,15 +218,16 @@ struct reupload_fixture : public archiver_fixture {
manifest_view);
}

ss::lw_shared_ptr<storage::segment> self_compact_next_segment() {
ss::lw_shared_ptr<storage::segment> self_compact_next_segment(
model::offset max_collectible = model::offset::max()) {
auto& seg_set = disk_log_impl()->segments();
auto size_before = seg_set.size();

disk_log_impl()
->housekeeping(storage::housekeeping_config{
model::timestamp::max(),
std::nullopt,
model::offset::max(),
max_collectible,
ss::default_priority_class(),
abort_source})
.get();
Expand Down Expand Up @@ -313,7 +314,8 @@ FIXTURE_TEST(test_upload_compacted_segments, reupload_fixture) {
// Mark first segment compacted, and re-upload, now only one segment is
// uploaded.
reset_http_call_state();
auto seg = self_compact_next_segment();
auto seg = self_compact_next_segment(
stm_manifest.first_addressable_segment()->committed_offset);

expected = archival::ntp_archiver::batch_result{{0, 0, 0}, {1, 0, 0}};
upload_and_verify(archiver.value(), expected);
Expand All @@ -330,7 +332,8 @@ FIXTURE_TEST(test_upload_compacted_segments, reupload_fixture) {
BOOST_REQUIRE_EQUAL(replaced[0].base_offset, model::offset{0});

// Mark second segment as compacted and re-upload.
seg = self_compact_next_segment();
seg = self_compact_next_segment(
std::next(stm_manifest.first_addressable_segment())->committed_offset);

reset_http_call_state();

Expand Down Expand Up @@ -386,7 +389,6 @@ FIXTURE_TEST(test_upload_compacted_segments_concat, reupload_fixture) {
// Mark both segments compacted, and re-upload. One concatenated segment is
// uploaded.
reset_http_call_state();
self_compact_next_segment();
auto seg = self_compact_next_segment();

expected = archival::ntp_archiver::batch_result{{0, 0, 0}, {1, 0, 0}};
Expand Down Expand Up @@ -427,8 +429,8 @@ FIXTURE_TEST(

listen();

self_compact_next_segment();
auto seg = self_compact_next_segment();
// Self-compact just the first couple segments.
self_compact_next_segment(model::offset{999});

archival::ntp_archiver::batch_result expected{{0, 0, 0}, {1, 0, 0}};
upload_and_verify(archiver.value(), expected);
Expand Down Expand Up @@ -527,8 +529,12 @@ FIXTURE_TEST(test_upload_both_compacted_and_non_compacted, reupload_fixture) {

// Self-compact the first segment and re-upload. One
// compacted and one non-compacted segments are uploaded.
//
// NOTE: we can only compact up to what's been uploaded, since that
// determines the max collectible offset.
reset_http_call_state();
auto seg = self_compact_next_segment();
auto seg = self_compact_next_segment(
manifest.first_addressable_segment()->committed_offset);

expected = archival::ntp_archiver::batch_result{{1, 0, 0}, {1, 0, 0}};
upload_and_verify(archiver.value(), expected, model::offset::max());
Expand Down Expand Up @@ -593,7 +599,8 @@ FIXTURE_TEST(test_both_uploads_with_one_failing, reupload_fixture) {
// Self-compact the first segment and re-upload. One compacted
// and one non-compacted segments are uploaded.
reset_http_call_state();
auto seg = self_compact_next_segment();
auto seg = self_compact_next_segment(
disk_log_impl()->segments().begin()->get()->offsets().committed_offset);

// Fail the first compacted upload
fail_request_if(
Expand Down Expand Up @@ -723,12 +730,13 @@ FIXTURE_TEST(test_upload_when_reupload_disabled, reupload_fixture) {
}

FIXTURE_TEST(test_upload_limit, reupload_fixture) {
// NOTE: different terms so compaction leaves one segment each.
std::vector<segment_desc> segments = {
{manifest_ntp, model::offset(0), model::term_id(1), 10, 2},
{manifest_ntp, model::offset(10), model::term_id(1), 10, 2},
{manifest_ntp, model::offset(20), model::term_id(1), 10, 2},
{manifest_ntp, model::offset(30), model::term_id(1), 10, 2},
{manifest_ntp, model::offset(40), model::term_id(1), 10, 2},
{manifest_ntp, model::offset(10), model::term_id(2), 10, 2},
{manifest_ntp, model::offset(20), model::term_id(3), 10, 2},
{manifest_ntp, model::offset(30), model::term_id(4), 10, 2},
{manifest_ntp, model::offset(40), model::term_id(5), 10, 2},
};

initialize(segments);
Expand All @@ -747,9 +755,9 @@ FIXTURE_TEST(test_upload_limit, reupload_fixture) {
get_targets().find(manifest_url)->second.content);

verify_segment_request("0-1-v1.log", manifest);
verify_segment_request("10-1-v1.log", manifest);
verify_segment_request("20-1-v1.log", manifest);
verify_segment_request("30-1-v1.log", manifest);
verify_segment_request("10-2-v1.log", manifest);
verify_segment_request("20-3-v1.log", manifest);
verify_segment_request("30-4-v1.log", manifest);

BOOST_REQUIRE(part->archival_meta_stm());
const cloud_storage::partition_manifest& stm_manifest
Expand All @@ -759,6 +767,7 @@ FIXTURE_TEST(test_upload_limit, reupload_fixture) {
stm_manifest.get_last_uploaded_compacted_offset(), model::offset{});

// Create four non-compacted segments to starve out the upload limit.
// NOTE: uploaded 4 segments, so offset is 54 at the start
for (auto i = 0; i < 3; ++i) {
auto& last_segment = disk_log_impl()->segments().back();
write_random_batches(last_segment, 10, 2);
Expand All @@ -768,30 +777,28 @@ FIXTURE_TEST(test_upload_limit, reupload_fixture) {
create_segment(
{manifest_ntp,
last_segment->offsets().committed_offset + model::offset{1},
model::term_id{2},
model::term_id{6},
10});
}

reset_http_call_state();

// Mark four segments as compacted, so they are valid for upload
ss::lw_shared_ptr<storage::segment> seg;
for (size_t i = 0; i < 4; ++i) {
seg = self_compact_next_segment();
}
seg = self_compact_next_segment(model::offset(39));

expected = archival::ntp_archiver::batch_result{{4, 0, 0}, {0, 0, 0}};
upload_and_verify(archiver.value(), expected, model::offset::max());
BOOST_REQUIRE_EQUAL(get_requests().size(), 9);

verify_segment_request(
"40-1-v1.log", part->archival_meta_stm()->manifest());
"40-5-v1.log", part->archival_meta_stm()->manifest());
verify_segment_request(
"50-2-v1.log", part->archival_meta_stm()->manifest());
"50-6-v1.log", part->archival_meta_stm()->manifest());
verify_segment_request(
"60-2-v1.log", part->archival_meta_stm()->manifest());
"65-6-v1.log", part->archival_meta_stm()->manifest());
verify_segment_request(
"70-2-v1.log", part->archival_meta_stm()->manifest());
"85-6-v1.log", part->archival_meta_stm()->manifest());

BOOST_REQUIRE_EQUAL(
stm_manifest.get_last_uploaded_compacted_offset(), model::offset{});
Expand All @@ -808,9 +815,9 @@ FIXTURE_TEST(test_upload_limit, reupload_fixture) {
verify_concat_segment_request(
{
"0-1-v1.log",
"10-1-v1.log",
"20-1-v1.log",
"30-1-v1.log",
"10-2-v1.log",
"20-3-v1.log",
"30-4-v1.log",
},
part->archival_meta_stm()->manifest());

Expand Down
2 changes: 1 addition & 1 deletion src/v/archival/tests/ntp_archiver_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ FIXTURE_TEST(
->housekeeping(storage::housekeeping_config(
model::timestamp::now(),
std::nullopt,
model::offset::max(),
model::offset{999},
ss::default_priority_class(),
as))
.get0();
Expand Down
9 changes: 5 additions & 4 deletions src/v/archival/tests/segment_reupload_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,7 @@ SEASTAR_THREAD_TEST_CASE(test_upload_candidate_generation) {

for (auto i : spec.compacted_segment_indices) {
b.get_segment(i).mark_as_finished_self_compaction();
b.get_segment(i).mark_as_finished_windowed_compaction();
}

size_t max_size = b.get_segment(0).size_bytes()
Expand Down Expand Up @@ -837,10 +838,8 @@ SEASTAR_THREAD_TEST_CASE(test_upload_aligned_to_non_existent_offset) {
b | storage::add_segment(*first)
| storage::add_random_batch(*first, spec.last_segment_num_records);

// Compaction only works one segment at a time
for (auto i = 0; i < spec.compacted_segment_indices.size(); ++i) {
b.gc(model::timestamp::max(), std::nullopt).get();
}
// Compaction will rewrite each segment.
b.gc(model::timestamp::max(), std::nullopt).get();

size_t max_size = b.get_segment(0).size_bytes()
+ b.get_segment(1).size_bytes()
Expand Down Expand Up @@ -918,6 +917,7 @@ SEASTAR_THREAD_TEST_CASE(test_same_size_reupload_skipped) {
// segments for re-upload. The upload candidate should be a noop
// since the selected reupload has the same size as the existing segment.
b.get_segment(0).mark_as_finished_self_compaction();
b.get_segment(0).mark_as_finished_windowed_compaction();

{
archival::segment_collector collector{
Expand Down Expand Up @@ -957,6 +957,7 @@ SEASTAR_THREAD_TEST_CASE(test_same_size_reupload_skipped) {
// should be a no-op since the reupload of the two local segments
// results in a segment of the same size as the one that should be replaced.
b.get_segment(1).mark_as_finished_self_compaction();
b.get_segment(1).mark_as_finished_windowed_compaction();

{
archival::segment_collector collector{
Expand Down
7 changes: 6 additions & 1 deletion src/v/archival/tests/service_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,13 @@ segment_layout write_random_batches(

auto leftover_records = records % records_per_batch;
int full_batches_count = records / records_per_batch;
// If the segment already includes records, start at the next offset.
auto full_batches = model::test::make_random_batches(
model::test::record_batch_spec{
.offset = seg->offsets().base_offset,
.offset = seg->offsets().committed_offset == model::offset{}
? seg->offsets().base_offset
: (
seg->offsets().committed_offset + model::offset_delta{1}),
.allow_compression = true,
.count = full_batches_count,
.records = records_per_batch,
Expand Down Expand Up @@ -521,6 +525,7 @@ void populate_log(storage::disk_log_builder& b, const log_spec& spec) {

for (auto i : spec.compacted_segment_indices) {
b.get_segment(i).mark_as_finished_self_compaction();
b.get_segment(i).mark_as_finished_windowed_compaction();
}
}

Expand Down
35 changes: 13 additions & 22 deletions src/v/storage/tests/storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1544,10 +1544,9 @@ FIXTURE_TEST(adjacent_segment_compaction, storage_test_fixture) {
ss::default_priority_class(),
as);

// There are 4 segments, and the last is the active segments. The first two
// will merge, and the third will be compacted but not merged.
log->housekeeping(c_cfg).get0();
log->housekeeping(c_cfg).get0();
log->housekeeping(c_cfg).get0();
// Self compactions complete.
BOOST_REQUIRE_EQUAL(log->segment_count(), 4);

// Check if it honors max_compactible offset by resetting it to the base
Expand All @@ -1557,16 +1556,15 @@ FIXTURE_TEST(adjacent_segment_compaction, storage_test_fixture) {
log->housekeeping(c_cfg).get0();
BOOST_REQUIRE_EQUAL(log->segment_count(), 4);

// reset
// The segment count will be reduced again.
c_cfg.compact.max_collectible_offset = model::offset::max();

log->housekeeping(c_cfg).get0();
BOOST_REQUIRE_EQUAL(log->segment_count(), 3);

log->housekeeping(c_cfg).get0();
BOOST_REQUIRE_EQUAL(log->segment_count(), 2);

// no change since we can't combine with appender segment
// No change since we can't combine with appender segment.
log->housekeeping(c_cfg).get0();
BOOST_REQUIRE_EQUAL(log->segment_count(), 2);
all_have_broker_timestamp();
Expand Down Expand Up @@ -1613,10 +1611,7 @@ FIXTURE_TEST(adjacent_segment_compaction_terms, storage_test_fixture) {
ss::default_priority_class(),
as);

log->housekeeping(c_cfg).get0();
log->housekeeping(c_cfg).get0();
log->housekeeping(c_cfg).get0();
log->housekeeping(c_cfg).get0();
// compact all the individual segments
log->housekeeping(c_cfg).get0();
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 6);

Expand All @@ -1629,7 +1624,6 @@ FIXTURE_TEST(adjacent_segment_compaction_terms, storage_test_fixture) {
log->housekeeping(c_cfg).get0();
log->housekeeping(c_cfg).get0();
log->housekeeping(c_cfg).get0();
log->housekeeping(c_cfg).get0();
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 5);

for (int i = 0; i < 5; i++) {
Expand Down Expand Up @@ -1692,10 +1686,6 @@ FIXTURE_TEST(max_adjacent_segment_compaction, storage_test_fixture) {

// self compaction steps
log->housekeeping(c_cfg).get0();
log->housekeeping(c_cfg).get0();
log->housekeeping(c_cfg).get0();
log->housekeeping(c_cfg).get0();
log->housekeeping(c_cfg).get0();
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 6);

// the first two segments are combined 2+2=4 < 6 MB
Expand Down Expand Up @@ -1906,9 +1896,6 @@ FIXTURE_TEST(compaction_backlog_calculation, storage_test_fixture) {
+ self_seg_compaction_sz);
// self compaction steps
log->housekeeping(c_cfg).get0();
log->housekeeping(c_cfg).get0();
log->housekeeping(c_cfg).get0();
log->housekeeping(c_cfg).get0();

BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 5);
auto new_backlog_size = log->compaction_backlog();
Expand Down Expand Up @@ -3027,6 +3014,7 @@ struct compact_test_args {
long num_compactable_msg;
long msg_per_segment;
long segments;
long expected_compacted_segments;
};

static void
Expand Down Expand Up @@ -3105,9 +3093,10 @@ do_compact_test(const compact_test_args args, storage_test_fixture& f) {
tlog.info("post-compact stats: {}, analysis: {}", final_stats, final_gaps);
BOOST_REQUIRE_EQUAL(
final_stats.committed_offset, args.segments * args.msg_per_segment);

// we used the same key for all messages, so we should have one huge gap at
// the beginning of the log
BOOST_REQUIRE_EQUAL(final_gaps.num_gaps, 1);
// the beginning of each compacted segment
BOOST_REQUIRE_EQUAL(final_gaps.num_gaps, args.expected_compacted_segments);
BOOST_REQUIRE_EQUAL(final_gaps.first_gap_start, model::offset(0));

// If adjacent segment compaction worked in order from oldest to newest, we
Expand All @@ -3132,7 +3121,8 @@ FIXTURE_TEST(test_max_compact_offset_mid_segment, storage_test_fixture) {
{.max_compact_offs = model::offset(150),
.num_compactable_msg = 100,
.msg_per_segment = 100,
.segments = 3},
.segments = 3,
.expected_compacted_segments = 1},
*this);
}

Expand All @@ -3144,7 +3134,8 @@ FIXTURE_TEST(test_max_compact_offset_unset, storage_test_fixture) {
// after writing the third segment.
.num_compactable_msg = 200,
.msg_per_segment = 100,
.segments = 3},
.segments = 3,
.expected_compacted_segments = 3},
*this);
}

Expand Down

0 comments on commit 619f26f

Please sign in to comment.