Skip to content

Commit

Permalink
s/tests: added test validating concurrent operations
Browse files Browse the repository at this point in the history
Added a test validating concurrent operations of writing and prefix
truncating a log.

Signed-off-by: Michał Maślanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Jul 22, 2024
1 parent ffc6a16 commit cc27381
Showing 1 changed file with 110 additions and 3 deletions.
113 changes: 110 additions & 3 deletions src/v/storage/tests/storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include <fmt/chrono.h>

#include <algorithm>
#include <chrono>
#include <iterator>
#include <numeric>
#include <optional>
Expand Down Expand Up @@ -887,7 +888,8 @@ ss::future<storage::append_result> append_exactly(
ss::shared_ptr<storage::log> log,
size_t batch_count,
size_t batch_sz,
std::optional<bytes> key = std::nullopt) {
std::optional<bytes> key = std::nullopt,
model::record_batch_type batch_type = model::record_batch_type::raft_data) {
vassert(
batch_sz > model::packed_record_batch_header_size,
"Batch size must be greater than {}, requested {}",
Expand Down Expand Up @@ -925,8 +927,7 @@ ss::future<storage::append_result> append_exactly(
val_sz -= real_batch_size;

for (int i = 0; i < batch_count; ++i) {
storage::record_batch_builder builder(
model::record_batch_type::raft_data, model::offset{});
storage::record_batch_builder builder(batch_type, model::offset{});
iobuf value = bytes_to_iobuf(random_generators::get_bytes(val_sz));
builder.add_raw_kv(key_buf.copy(), std::move(value));

Expand Down Expand Up @@ -1007,6 +1008,112 @@ FIXTURE_TEST(write_concurrently_with_gc, storage_test_fixture) {
model::offset(9 + appends * batches_per_append));
};

/**
* This test executes operations which may be executed by Raft layer without
* synchronization. i.e. appends, reads, flushes and prefix truncations. The
* test validates if the offsets are correctly assigned i.e. if any batch did
* not get the same offset assigned twice.
*/
FIXTURE_TEST(append_concurrent_with_prefix_truncate, storage_test_fixture) {
auto cfg = default_log_config(test_dir);

ss::abort_source as;
storage::log_manager mgr = make_log_manager(cfg);
model::offset last_append_base_offset{};
model::offset last_append_end_offset{};
auto deferred = ss::defer([&mgr]() mutable { mgr.stop().get(); });
auto ntp = model::controller_ntp;

storage::ntp_config ntp_cfg(ntp, mgr.config().base_dir);
auto log = mgr.manage(std::move(ntp_cfg)).get0();

bool stop = false;
int cnt = 0;
std::vector<model::record_batch_type> types{
model::record_batch_type::raft_data,
model::record_batch_type::raft_configuration};
#ifndef NDEBUG
static constexpr size_t stop_after = 50;
#else
static constexpr size_t stop_after = 1000;
#endif
auto append = [&] {
return append_exactly(
log,
1,
random_generators::get_int(75, 237),
std::nullopt,
random_generators::random_choice(types))
.then([&](storage::append_result result) {
info("append result: {}", result);
vassert(
result.base_offset > last_append_base_offset,
"Invalid append result base offset: {}. The same base offset "
"was already assigned.",
last_append_base_offset,
result.base_offset);
vassert(
result.last_offset > last_append_end_offset,
"Invalid append result last offset: {}. The same last offset "
"was already assigned.",
last_append_end_offset,
result.last_offset);
last_append_base_offset = result.base_offset;
last_append_end_offset = result.last_offset;
cnt++;
if (cnt >= stop_after) {
stop = true;
}
return ss::sleep(
std::chrono::milliseconds(random_generators::get_int(10, 100)));
});
};

auto flush = [&] { return log->flush().discard_result(); };

auto read = [&] {
auto lstats = log->offsets();
return log
->make_reader(storage::log_reader_config(
lstats.start_offset,
model::offset::max(),
ss::default_priority_class()))
.then([](auto reader) {
return ss::sleep(std::chrono::milliseconds(
random_generators::get_int(15, 30)))
.then([r = std::move(reader)]() mutable {
return model::consume_reader_to_memory(
std::move(r), model::no_timeout);
})
.discard_result();
});
};

auto prefix_truncate = [&] {
auto offset = model::next_offset(log->offsets().dirty_offset);

return log
->truncate_prefix(storage::truncate_prefix_config(
offset, ss::default_priority_class()))
.then([offset] {
info("prefix truncate at: {}", offset);
return ss::sleep(
std::chrono::milliseconds(random_generators::get_int(5, 20)));
});
};
/**
* Execute all operations concurrently
*/
auto f_1 = ss::do_until([&] { return stop; }, [&] { return append(); });
auto f_2 = ss::do_until(
[&] { return stop; }, [&] { return prefix_truncate(); });
auto f_3 = ss::do_until([&] { return stop; }, [&] { return read(); });
auto f_4 = ss::do_until([&] { return stop; }, [&] { return flush(); });

ss::when_all(std::move(f_1), std::move(f_2), std::move(f_3), std::move(f_4))
.get();
};

FIXTURE_TEST(empty_segment_recovery, storage_test_fixture) {
auto cfg = default_log_config(test_dir);
auto ntp = model::ntp("default", "test", 0);
Expand Down

0 comments on commit cc27381

Please sign in to comment.