Skip to content

Commit

Permalink
Merge pull request #24540 from bharathv/manual-backport-24490-v24.2.x…
Browse files Browse the repository at this point in the history
…-691

[v24.2.x] rm_stm/tests: add a stress test for concurrent eviction / replication / snapshot
  • Loading branch information
bharathv authored Dec 12, 2024
2 parents ee0c765 + 163b669 commit c19a267
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 21 deletions.
10 changes: 8 additions & 2 deletions src/v/cluster/producer_state_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ producer_state_manager::producer_state_manager(

ss::future<> producer_state_manager::start() {
_reaper.set_callback([this] { evict_excess_producers(); });
_reaper.arm(period);
_reaper.arm(_reaper_period);
vlog(clusterlog.info, "Started producer state manager");
return ss::now();
}
Expand Down Expand Up @@ -69,6 +69,12 @@ void producer_state_manager::setup_metrics() {
sm::description("Number of evicted producers so far."))});
}

void producer_state_manager::rearm_eviction_timer_for_testing(
std::chrono::milliseconds new_period) {
_reaper_period = new_period;
_reaper.rearm(ss::lowres_clock::now() + _reaper_period);
}

void producer_state_manager::register_producer(
producer_state& state, std::optional<model::vcluster_id> vcluster) {
vlog(
Expand Down Expand Up @@ -97,7 +103,7 @@ void producer_state_manager::evict_excess_producers() {
_cache.evict_older_than<ss::lowres_system_clock>(
ss::lowres_system_clock::now() - _producer_expiration_ms());
if (!_gate.is_closed()) {
_reaper.arm(period);
_reaper.arm(_reaper_period);
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/v/cluster/producer_state_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ class producer_state_manager {
*/
void touch(producer_state&, std::optional<model::vcluster_id>);

void rearm_eviction_timer_for_testing(std::chrono::milliseconds);

private:
static constexpr std::chrono::seconds period{5};
std::chrono::milliseconds _reaper_period{5000};
/**
* Constant to be used when a partition has no vcluster_id assigned.
*/
Expand Down Expand Up @@ -83,7 +85,7 @@ class producer_state_manager {
config::binding<size_t> _virtual_cluster_min_producer_ids;
// cache of all producers on this shard
cache_t _cache;
ss::timer<ss::steady_clock_type> _reaper;
ss::timer<ss::lowres_clock> _reaper;
ss::gate _gate;
metrics::internal_metric_groups _metrics;

Expand Down
59 changes: 46 additions & 13 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ rm_stm::rm_stm(
, _feature_table(feature_table)
, _ctx_log(txlog, ssx::sformat("[{}]", c->ntp()))
, _producer_state_manager(producer_state_manager)
, _vcluster_id(vcluster_id) {
, _vcluster_id(vcluster_id)
, _producers_pending_cleanup(std::numeric_limits<size_t>::max()) {
setup_metrics();
if (!_is_tx_enabled) {
_is_autoabort_enabled = false;
Expand All @@ -114,6 +115,18 @@ rm_stm::rm_stm(
e);
});
});

ssx::repeat_until_gate_closed_or_aborted(_gate, _as, [this] {
return cleanup_evicted_producers().handle_exception(
[h = _gate.hold(), this](const std::exception_ptr& ex) {
if (!ssx::is_shutdown_exception(ex)) {
vlog(
_ctx_log.warn,
"encountered an exception while cleaning producers: {}",
ex);
}
});
});
}

ss::future<model::offset> rm_stm::bootstrap_committed_offset() {
Expand All @@ -129,6 +142,8 @@ ss::future<model::offset> rm_stm::bootstrap_committed_offset() {

std::pair<producer_ptr, rm_stm::producer_previously_known>
rm_stm::maybe_create_producer(model::producer_identity pid) {
// note: must be called under state_lock in shared/read mode.

// Double lookup because of two reasons
// 1. we are forced to use a ptr as map value_type because producer_state is
// not movable
Expand All @@ -148,23 +163,37 @@ rm_stm::maybe_create_producer(model::producer_identity pid) {
return std::make_pair(producer, producer_previously_known::no);
}

void rm_stm::cleanup_producer_state(model::producer_identity pid) {
auto it = _producers.find(pid.get_id());
if (it != _producers.end() && it->second->id() == pid) {
const auto& producer = *(it->second);
if (producer._active_transaction_hook.is_linked()) {
vlog(
_ctx_log.error,
"Ignoring cleanup request of producer {} due to in progress "
"transaction.",
producer);
return;
ss::future<> rm_stm::cleanup_evicted_producers() {
while (!_as.abort_requested() && !_gate.is_closed()) {
auto pid = co_await _producers_pending_cleanup.pop_eventually();
auto units = co_await _state_lock.hold_read_lock();
auto it = _producers.find(pid.get_id());
if (it != _producers.end() && it->second->id() == pid) {
const auto& producer = *(it->second);
if (producer._active_transaction_hook.is_linked()) {
vlog(
_ctx_log.error,
"Ignoring cleanup request of producer {} due to in progress "
"transaction.",
producer);
co_return;
}
_producers.erase(it);
vlog(_ctx_log.trace, "removed producer: {}", pid);
}
_producers.erase(it);
}
}

void rm_stm::cleanup_producer_state(model::producer_identity pid) noexcept {
if (_as.abort_requested() || _gate.is_closed()) {
return;
}
_producers_pending_cleanup.push(std::move(pid));
};

ss::future<> rm_stm::reset_producers() {
// note: must always be called under exlusive write lock to
// avoid concurrrent state changes to _producers.
co_await ss::max_concurrent_for_each(
_producers.begin(), _producers.end(), 32, [this](auto& it) {
auto& producer = it.second;
Expand Down Expand Up @@ -738,6 +767,8 @@ ss::future<result<kafka_result>> rm_stm::do_replicate(

ss::future<> rm_stm::stop() {
_as.request_abort();
_producers_pending_cleanup.abort(
std::make_exception_ptr(ss::abort_requested_exception{}));
auto_abort_timer.cancel();
co_await _gate.close();
co_await reset_producers();
Expand Down Expand Up @@ -1637,6 +1668,8 @@ model::offset rm_stm::to_log_offset(kafka::offset k_offset) const {

ss::future<>
rm_stm::apply_local_snapshot(raft::stm_snapshot_header hdr, iobuf&& tx_ss_buf) {
auto units = co_await _state_lock.hold_write_lock();

vlog(
_ctx_log.trace,
"applying snapshot with last included offset: {}",
Expand Down
5 changes: 4 additions & 1 deletion src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ class rm_stm final : public raft::persisted_stm<> {
= ss::bool_class<struct new_producer_created_tag>;
std::pair<tx::producer_ptr, producer_previously_known>
maybe_create_producer(model::producer_identity);
void cleanup_producer_state(model::producer_identity);
void cleanup_producer_state(model::producer_identity) noexcept;
ss::future<> cleanup_evicted_producers();
ss::future<> reset_producers();
ss::future<checked<model::term_id, tx::errc>> do_begin_tx(
model::term_id,
Expand Down Expand Up @@ -412,6 +413,8 @@ class rm_stm final : public raft::persisted_stm<> {
// producers because epoch is unused.
producers_t _producers;

ss::queue<model::producer_identity> _producers_pending_cleanup;

// All the producers with open transactions in this partition.
// The list is sorted by the open transaction begin offset, so
// the first entry in the list is the earliest open transaction
Expand Down
48 changes: 45 additions & 3 deletions src/v/cluster/tests/rm_stm_test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#pragma once
#include "cluster/producer_state_manager.h"
#include "cluster/rm_stm.h"
#include "config/property.h"
#include "config/mock_property.h"
#include "raft/tests/simple_raft_fixture.h"

#include <seastar/core/sharded.hh>
Expand All @@ -21,10 +21,14 @@ static prefix_logger ctx_logger{logger, ""};
struct rm_stm_test_fixture : simple_raft_fixture {
void create_stm_and_start_raft(
storage::ntp_config::default_overrides overrides = {}) {
max_concurent_producers.start(std::numeric_limits<size_t>::max()).get();
producer_expiration_ms.start(std::chrono::milliseconds::max()).get();
producer_state_manager
.start(
config::mock_binding(std::numeric_limits<uint64_t>::max()),
config::mock_binding(std::chrono::milliseconds::max()),
ss::sharded_parameter(
[this] { return max_concurent_producers.local().bind(); }),
ss::sharded_parameter(
[this] { return producer_expiration_ms.local().bind(); }),
config::mock_binding(std::numeric_limits<uint64_t>::max()))
.get();
producer_state_manager
Expand All @@ -51,6 +55,8 @@ struct rm_stm_test_fixture : simple_raft_fixture {
if (_started) {
stop_all();
producer_state_manager.stop().get();
producer_expiration_ms.stop().get();
max_concurent_producers.stop().get();
}
}

Expand All @@ -62,6 +68,17 @@ struct rm_stm_test_fixture : simple_raft_fixture {
return _stm->do_take_local_snapshot(version, {});
}

void update_producer_expiration(std::chrono::milliseconds value) {
producer_expiration_ms
.invoke_on_all(
[value](auto& local) mutable { local.update(std::move(value)); })
.get();
}

auto apply_raft_snapshot(const iobuf& buf) {
return _stm->apply_raft_snapshot(buf);
}

auto apply_snapshot(raft::stm_snapshot_header hdr, iobuf buf) {
return _stm->apply_local_snapshot(hdr, std::move(buf));
}
Expand All @@ -73,6 +90,31 @@ struct rm_stm_test_fixture : simple_raft_fixture {

auto get_expired_producers() const { return _stm->get_expired_producers(); }

auto stm_read_lock() { return _stm->_state_lock.hold_read_lock(); }

auto maybe_create_producer(model::producer_identity pid) {
return stm_read_lock().then([pid, this](auto /*units*/) {
return _stm->maybe_create_producer(pid);
});
}

auto reset_producers() {
return _stm->_state_lock.hold_write_lock().then([this](auto units) {
return _stm->reset_producers().then([units = std::move(units)] {});
});
}

auto rearm_eviction_timer(std::chrono::milliseconds period) {
return producer_state_manager
.invoke_on_all([period](auto& mgr) {
return mgr.rearm_eviction_timer_for_testing(period);
})
.get();
}

ss::sharded<config::mock_property<size_t>> max_concurent_producers;
ss::sharded<config::mock_property<std::chrono::milliseconds>>
producer_expiration_ms;
ss::sharded<cluster::tx_gateway_frontend> tx_gateway_frontend;
ss::sharded<cluster::tx::producer_state_manager> producer_state_manager;
ss::shared_ptr<cluster::rm_stm> _stm;
Expand Down
100 changes: 100 additions & 0 deletions src/v/cluster/tests/rm_stm_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "cluster/tests/randoms.h"
#include "cluster/tests/rm_stm_test_fixture.h"
#include "finjector/hbadger.h"
#include "finjector/stress_fiber.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/record.h"
Expand Down Expand Up @@ -931,3 +932,102 @@ FIXTURE_TEST(test_tx_expiration_without_data_batches, rm_stm_test_fixture) {
!= expired.end();
}).get0();
}

/*
* This test ensures concurrent evictions can happen in the presence of
* replication operations and operations that reset the state (snapshots,
* partition stop).
*/
FIXTURE_TEST(test_concurrent_producer_evictions, rm_stm_test_fixture) {
create_stm_and_start_raft();
auto& stm = *_stm;
stm.start().get0();
stm.testing_only_disable_auto_abort();

wait_for_confirmed_leader();
wait_for_meta_initialized();

// Ensure eviction runs with higher frequency
// and evicts everything possible.
update_producer_expiration(0ms);
rearm_eviction_timer(1ms);

stress_fiber_manager stress_mgr;
stress_mgr.start(
{.min_spins_per_scheduling_point = random_generators::get_int(50, 100),
.max_spins_per_scheduling_point = random_generators::get_int(500, 1000),
.num_fibers = random_generators::get_int<size_t>(5, 10)});
auto stop = ss::defer([&stress_mgr] { stress_mgr.stop().get(); });

int64_t counter = 0;
ss::abort_source as;
ss::gate gate;
size_t max_replication_fibers = 1000;

// simulates replication.
// In each iteration of the loop, we create some producers and randomly
// hold the producer state lock on some of them(thus preventing eviction).
// This is roughly the lifecycle of replicate requests using a producer
// state. This creates stream of producer states in a tight loop, some
// evictable and some non evictable while eviction constantly runs in the
// background.
auto replicate_f = ss::do_until(
[&as] { return as.abort_requested(); },
[&, this] {
std::vector<ss::future<>> spawn_replicate_futures;
for (int i = 0; i < 5; i++) {
auto maybe_replicate_f
= maybe_create_producer(model::producer_identity{counter++, 0})
.then([&, this](auto result) {
auto producer = result.first;
if (
gate.get_count() < max_replication_fibers
&& tests::random_bool()) {
// simulates replication.
ssx::spawn_with_gate(gate, [this, producer] {
return stm_read_lock().then([producer](
auto stm_units) {
return producer
->run_with_lock([](auto units) {
auto sleep_ms
= std::chrono::milliseconds{
random_generators::get_int(3)};
return ss::sleep(sleep_ms).finally(
[units = std::move(units)] {});
})
.handle_exception_type(
[producer](
const ss::gate_closed_exception&) {
vlog(
logger.info,
"producer {} already evicted, "
"ignoring",
producer->id());
})
.finally(
[producer,
stm_units = std::move(stm_units)] {});
});
});
}
});
spawn_replicate_futures.push_back(std::move(maybe_replicate_f));
}

return ss::when_all_succeed(std::move(spawn_replicate_futures))
.then([]() { return ss::sleep(1ms); });
});

// simulates raft snapshot application / partition shutdown
// applying a snapshot is stop the world operation that resets
// all the producers.
auto reset_f = ss::do_until(
[&as] { return as.abort_requested(); },
[&, this] {
return reset_producers().then([] { return ss::sleep(3ms); });
});

ss::sleep(20s).finally([&as] { as.request_abort(); }).get();
ss::when_all_succeed(std::move(replicate_f), std::move(reset_f)).get();
gate.close().get();
}
2 changes: 2 additions & 0 deletions src/v/cluster/tests/tx_compaction_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ using cluster::tx_executor;
_data_dir = "test_dir_" + random_generators::gen_alphanum_string(6); \
stop_all(); \
producer_state_manager.stop().get(); \
producer_expiration_ms.stop().get(); \
max_concurent_producers.stop().get(); \
_stm = nullptr; \
}); \
wait_for_confirmed_leader(); \
Expand Down

0 comments on commit c19a267

Please sign in to comment.