From 18db0f55a026c08ca50c6b7e039945a3262762ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Wed, 10 Jul 2024 15:27:33 +0000 Subject: [PATCH] s/tests: added test validating concurrent operations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added a test validating concurrent operations of writing and prefix truncating a log. Signed-off-by: Michał Maślanka --- src/v/storage/tests/storage_e2e_test.cc | 113 +++++++++++++++++++++++- 1 file changed, 110 insertions(+), 3 deletions(-) diff --git a/src/v/storage/tests/storage_e2e_test.cc b/src/v/storage/tests/storage_e2e_test.cc index d0f5b06f1c59..734044cd7794 100644 --- a/src/v/storage/tests/storage_e2e_test.cc +++ b/src/v/storage/tests/storage_e2e_test.cc @@ -51,6 +51,7 @@ #include #include +#include #include #include #include @@ -897,7 +898,8 @@ ss::future append_exactly( ss::shared_ptr log, size_t batch_count, size_t batch_sz, - std::optional key = std::nullopt) { + std::optional key = std::nullopt, + model::record_batch_type batch_type = model::record_batch_type::raft_data) { vassert( batch_sz > model::packed_record_batch_header_size, "Batch size must be greater than {}, requested {}", @@ -935,8 +937,7 @@ ss::future append_exactly( val_sz -= real_batch_size; for (int i = 0; i < batch_count; ++i) { - storage::record_batch_builder builder( - model::record_batch_type::raft_data, model::offset{}); + storage::record_batch_builder builder(batch_type, model::offset{}); iobuf value = bytes_to_iobuf(random_generators::get_bytes(val_sz)); builder.add_raw_kv(key_buf.copy(), std::move(value)); @@ -1018,6 +1019,112 @@ FIXTURE_TEST(write_concurrently_with_gc, storage_test_fixture) { model::offset(9 + appends * batches_per_append)); }; +/** + * This test executes operations which may be executed by Raft layer without + * synchronization. i.e. appends, reads, flushes and prefix truncations. The + * test validates if the offsets are correctly assigned i.e. if any batch did + * not get the same offset assigned twice. + */ +FIXTURE_TEST(append_concurrent_with_prefix_truncate, storage_test_fixture) { + auto cfg = default_log_config(test_dir); + + ss::abort_source as; + storage::log_manager mgr = make_log_manager(cfg); + model::offset last_append_base_offset{}; + model::offset last_append_end_offset{}; + auto deferred = ss::defer([&mgr]() mutable { mgr.stop().get(); }); + auto ntp = model::controller_ntp; + + storage::ntp_config ntp_cfg(ntp, mgr.config().base_dir); + auto log = mgr.manage(std::move(ntp_cfg)).get0(); + + bool stop = false; + int cnt = 0; + std::vector types{ + model::record_batch_type::raft_data, + model::record_batch_type::raft_configuration}; +#ifndef NDEBUG + static constexpr size_t stop_after = 10; +#else + static constexpr size_t stop_after = 200; +#endif + auto append = [&] { + return append_exactly( + log, + 1, + random_generators::get_int(75, 237), + std::nullopt, + random_generators::random_choice(types)) + .then([&](storage::append_result result) { + info("append result: {}", result); + vassert( + result.base_offset > last_append_base_offset, + "Invalid append result base offset: {}. The same base offset " + "was already assigned.", + last_append_base_offset, + result.base_offset); + vassert( + result.last_offset > last_append_end_offset, + "Invalid append result last offset: {}. The same last offset " + "was already assigned.", + last_append_end_offset, + result.last_offset); + last_append_base_offset = result.base_offset; + last_append_end_offset = result.last_offset; + cnt++; + if (cnt >= stop_after) { + stop = true; + } + return ss::sleep( + std::chrono::milliseconds(random_generators::get_int(10, 100))); + }); + }; + + auto flush = [&] { return log->flush().discard_result(); }; + + auto read = [&] { + auto lstats = log->offsets(); + return log + ->make_reader(storage::log_reader_config( + lstats.start_offset, + model::offset::max(), + ss::default_priority_class())) + .then([](auto reader) { + return ss::sleep(std::chrono::milliseconds( + random_generators::get_int(15, 30))) + .then([r = std::move(reader)]() mutable { + return model::consume_reader_to_memory( + std::move(r), model::no_timeout); + }) + .discard_result(); + }); + }; + + auto prefix_truncate = [&] { + auto offset = model::next_offset(log->offsets().dirty_offset); + + return log + ->truncate_prefix(storage::truncate_prefix_config( + offset, ss::default_priority_class())) + .then([offset] { + info("prefix truncate at: {}", offset); + return ss::sleep( + std::chrono::milliseconds(random_generators::get_int(5, 20))); + }); + }; + /** + * Execute all operations concurrently + */ + auto f_1 = ss::do_until([&] { return stop; }, [&] { return append(); }); + auto f_2 = ss::do_until( + [&] { return stop; }, [&] { return prefix_truncate(); }); + auto f_3 = ss::do_until([&] { return stop; }, [&] { return read(); }); + auto f_4 = ss::do_until([&] { return stop; }, [&] { return flush(); }); + + ss::when_all(std::move(f_1), std::move(f_2), std::move(f_3), std::move(f_4)) + .get(); +}; + FIXTURE_TEST(empty_segment_recovery, storage_test_fixture) { auto cfg = default_log_config(test_dir); auto ntp = model::ntp("default", "test", 0);