Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented raft::state_machine_manager #12685

Merged
merged 14 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 39 additions & 38 deletions src/v/archival/tests/archival_metadata_stm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@
#include "model/metadata.h"
#include "model/record.h"
#include "model/timestamp.h"
#include "raft/tests/mux_state_machine_fixture.h"
#include "raft/state_machine_manager.h"
#include "raft/tests/raft_group_fixture.h"
#include "raft/tests/simple_raft_fixture.h"
#include "raft/types.h"
#include "storage/tests/utils/disk_log_builder.h"
#include "test_utils/async.h"

#include <seastar/core/io_priority_class.hh>
#include <seastar/core/lowres_clock.hh>
#include <seastar/core/seastar.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/util/defer.hh>
#include <seastar/util/noncopyable_function.hh>

Expand All @@ -44,11 +46,11 @@ ss::logger logger{"archival_metadata_stm_test"};
static ss::abort_source never_abort;

struct archival_metadata_stm_base_fixture
: mux_state_machine_fixture
: simple_raft_fixture
, http_imposter_fixture {
using mux_state_machine_fixture::start_raft;
using mux_state_machine_fixture::wait_for_becoming_leader;
using mux_state_machine_fixture::wait_for_confirmed_leader;
using simple_raft_fixture::start_raft;
using simple_raft_fixture::wait_for_becoming_leader;
using simple_raft_fixture::wait_for_confirmed_leader;

archival_metadata_stm_base_fixture(
const archival_metadata_stm_base_fixture&)
Expand Down Expand Up @@ -112,6 +114,7 @@ struct archival_metadata_stm_base_fixture
}

~archival_metadata_stm_base_fixture() override {
stop_all();
cloud_conn_pool.local().shutdown_connections();
cloud_api.stop().get();
cloud_conn_pool.stop().get();
Expand All @@ -128,16 +131,16 @@ struct archival_metadata_stm_base_fixture
struct archival_metadata_stm_fixture : archival_metadata_stm_base_fixture {
archival_metadata_stm_fixture() {
// Archival metadata STM
start_raft();
archival_stm = std::make_unique<cluster::archival_metadata_stm>(
create_raft();
raft::state_machine_manager_builder builder;
archival_stm = builder.create_stm<cluster::archival_metadata_stm>(
_raft.get(), cloud_api.local(), feature_table.local(), logger);

archival_stm->start().get();
_raft->start(std::move(builder)).get();
_started = true;
}

~archival_metadata_stm_fixture() override { archival_stm->stop().get(); }

std::unique_ptr<cluster::archival_metadata_stm> archival_stm;
ss::shared_ptr<cluster::archival_metadata_stm> archival_stm;
};

using cloud_storage::partition_manifest;
Expand Down Expand Up @@ -265,12 +268,12 @@ void check_snapshot_size(
BOOST_REQUIRE(snapshot_exists);

BOOST_REQUIRE(
archival_stm.get_snapshot_size()
archival_stm.get_local_snapshot_size()
== ss::file_size(snapshot_file_path.string()).get());
}

FIXTURE_TEST(test_snapshot_loading, archival_metadata_stm_base_fixture) {
start_raft();
create_raft();
auto& ntp_cfg = _raft->log_config();
partition_manifest m(ntp_cfg.ntp(), ntp_cfg.get_initial_revision());
m.add(
Expand Down Expand Up @@ -318,34 +321,32 @@ FIXTURE_TEST(test_snapshot_loading, archival_metadata_stm_base_fixture) {
cluster::archival_metadata_stm::make_snapshot(ntp_cfg, m, model::offset{42})
.get();

cluster::archival_metadata_stm archival_stm(
raft::state_machine_manager_builder builder;
auto archival_stm = builder.create_stm<cluster::archival_metadata_stm>(
_raft.get(), cloud_api.local(), feature_table.local(), logger);

archival_stm.start().get();
_raft->start(std::move(builder)).get();
_started = true;
wait_for_confirmed_leader();

{
std::stringstream s1, s2;
m.serialize_json(s1);
archival_stm.manifest().serialize_json(s2);
archival_stm->manifest().serialize_json(s2);
vlog(logger.info, "original manifest: {}", s1.str());
vlog(logger.info, "restored manifest: {}", s2.str());
}

BOOST_REQUIRE_EQUAL(archival_stm.get_start_offset(), model::offset{100});
BOOST_REQUIRE(archival_stm.manifest() == m);
check_snapshot_size(archival_stm, ntp_cfg);
BOOST_REQUIRE_EQUAL(archival_stm->get_start_offset(), model::offset{100});
BOOST_REQUIRE(archival_stm->manifest() == m);
check_snapshot_size(*archival_stm, ntp_cfg);

// A snapshot constructed with make_snapshot is always clean
BOOST_REQUIRE(
archival_stm.get_dirty()
archival_stm->get_dirty()
== cluster::archival_metadata_stm::state_dirty::clean);

archival_stm.stop().get();
}

FIXTURE_TEST(test_sname_derivation, archival_metadata_stm_base_fixture) {
start_raft();
create_raft();
auto& ntp_cfg = _raft->log_config();
partition_manifest m(ntp_cfg.ntp(), ntp_cfg.get_initial_revision());

Expand Down Expand Up @@ -415,14 +416,15 @@ FIXTURE_TEST(test_sname_derivation, archival_metadata_stm_base_fixture) {
cluster::archival_metadata_stm::make_snapshot(ntp_cfg, m, model::offset{42})
.get();

cluster::archival_metadata_stm archival_stm(
raft::state_machine_manager_builder builder;
auto archival_stm = builder.create_stm<cluster::archival_metadata_stm>(
_raft.get(), cloud_api.local(), feature_table.local(), logger);

archival_stm.start().get();
auto action = ss::defer([&archival_stm] { archival_stm.stop().get(); });
_raft->start(std::move(builder)).get();
_started = true;
wait_for_confirmed_leader();

auto replaced = archival_stm.manifest().replaced_segments();
auto replaced = archival_stm->manifest().replaced_segments();
BOOST_REQUIRE_EQUAL(
replaced[0].sname_format, cloud_storage::segment_name_format::v1);
BOOST_REQUIRE_EQUAL(
Expand Down Expand Up @@ -556,7 +558,7 @@ class archival_metadata_stm_accessor {
public:
static ss::future<> persist_snapshot(
storage::simple_snapshot_manager& mgr, cluster::stm_snapshot&& snapshot) {
return file_backed_stm_snapshot::persist_snapshot(
return file_backed_stm_snapshot::persist_local_snapshot(
mgr, std::move(snapshot));
}
};
Expand Down Expand Up @@ -586,7 +588,7 @@ ss::future<> make_old_snapshot(
FIXTURE_TEST(
test_archival_metadata_stm_snapshot_version_compatibility,
archival_metadata_stm_base_fixture) {
start_raft();
create_raft();
auto& ntp_cfg = _raft->log_config();
partition_manifest m(ntp_cfg.ntp(), ntp_cfg.get_initial_revision());
m.add(
Expand Down Expand Up @@ -617,16 +619,15 @@ FIXTURE_TEST(

make_old_snapshot(ntp_cfg, m, model::offset{3}).get();

cluster::archival_metadata_stm archival_stm(
raft::state_machine_manager_builder builder;
auto archival_stm = builder.create_stm<cluster::archival_metadata_stm>(
_raft.get(), cloud_api.local(), feature_table.local(), logger);

archival_stm.start().get();
_raft->start(std::move(builder)).get();
_started = true;
wait_for_confirmed_leader();

BOOST_REQUIRE(archival_stm.manifest() == m);
check_snapshot_size(archival_stm, ntp_cfg);

archival_stm.stop().get();
BOOST_REQUIRE(archival_stm->manifest() == m);
check_snapshot_size(*archival_stm, ntp_cfg);
}

FIXTURE_TEST(test_archival_stm_batching, archival_metadata_stm_fixture) {
Expand Down
45 changes: 19 additions & 26 deletions src/v/cluster/archival_metadata_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "cluster/archival_metadata_stm.h"

#include "bytes/iobuf.h"
#include "bytes/iostream.h"
#include "cloud_storage/partition_manifest.h"
#include "cloud_storage/remote.h"
Expand Down Expand Up @@ -491,7 +492,7 @@ ss::future<> archival_metadata_stm::make_snapshot(
"archival_metadata.snapshot",
raft_priority());

co_await file_backed_stm_snapshot::persist_snapshot(
co_await file_backed_stm_snapshot::persist_local_snapshot(
tmp_snapshot_mgr, std::move(snapshot));
}

Expand Down Expand Up @@ -601,8 +602,8 @@ ss::future<std::error_code> archival_metadata_stm::do_replicate_commands(
// assumptions about whether batches were replicated or not. Explicitly
// step down if we're still leader and force callers to re-sync in a
// new term with a new leader.
if (_c->is_leader() && _c->term() == current_term) {
co_await _c->step_down(ssx::sformat(
if (_raft->is_leader() && _raft->term() == current_term) {
co_await _raft->step_down(ssx::sformat(
"failed to replicate archival batch in term {}", current_term));
}
co_return result.error();
Expand All @@ -615,8 +616,8 @@ ss::future<std::error_code> archival_metadata_stm::do_replicate_commands(
co_return errc::shutting_down;
}

if (_c->is_leader() && _c->term() == current_term) {
co_await _c->step_down(ssx::sformat(
if (_raft->is_leader() && _raft->term() == current_term) {
co_await _raft->step_down(ssx::sformat(
"failed to replicate archival batch in term {}", current_term));
}
co_return errc::replication_error;
Expand Down Expand Up @@ -708,7 +709,7 @@ ss::future<std::error_code> archival_metadata_stm::do_add_segments(
co_return errc::success;
}

ss::future<> archival_metadata_stm::apply(model::record_batch b) {
ss::future<> archival_metadata_stm::apply(const model::record_batch& b) {
if (b.header().type == model::record_batch_type::prefix_truncate) {
// Special case handling for prefix_truncate batches: these originate
// in log_eviction_stm, but affect the entire partition, local and
Expand All @@ -727,11 +728,10 @@ ss::future<> archival_metadata_stm::apply(model::record_batch b) {
apply_update_start_kafka_offset(val.kafka_start_offset);
}
});
_insync_offset = b.last_offset();

co_return;
}
if (b.header().type != model::record_batch_type::archival_metadata) {
_insync_offset = b.last_offset();
co_return;
}

Expand Down Expand Up @@ -797,11 +797,10 @@ ss::future<> archival_metadata_stm::apply(model::record_batch b) {
};
});

_insync_offset = b.last_offset();
_manifest->advance_insync_offset(b.last_offset());
}

ss::future<> archival_metadata_stm::handle_raft_snapshot() {
ss::future<> archival_metadata_stm::apply_raft_snapshot(const iobuf&) {
cloud_storage::partition_manifest new_manifest{
_manifest->get_ntp(), _manifest->get_revision_id()};

Expand All @@ -821,7 +820,6 @@ ss::future<> archival_metadata_stm::handle_raft_snapshot() {
cloud_storage_clients::bucket_name{*bucket}, new_manifest, rc_node);

if (res == cloud_storage::download_result::notfound) {
_insync_offset = model::prev_offset(_raft->start_offset());
set_next(_raft->start_offset());
vlog(_logger.info, "handled log eviction, the manifest is absent");
co_return;
Expand All @@ -842,12 +840,9 @@ ss::future<> archival_metadata_stm::handle_raft_snapshot() {
if (iso == model::offset{}) {
// Handle legacy manifests which don't have the 'insync_offset'
// field.
_insync_offset = _manifest->get_last_offset();
} else {
_insync_offset = iso;
iso = _manifest->get_last_offset();
}
auto next_offset = std::max(
_raft->start_offset(), model::next_offset(_insync_offset));
auto next_offset = std::max(_raft->start_offset(), model::next_offset(iso));
set_next(next_offset);

vlog(
Expand All @@ -859,7 +854,7 @@ ss::future<> archival_metadata_stm::handle_raft_snapshot() {
get_last_offset());
}

ss::future<> archival_metadata_stm::apply_snapshot(
ss::future<> archival_metadata_stm::apply_local_snapshot(
stm_snapshot_header header, iobuf&& data) {
auto snap = serde::from_iobuf<snapshot>(std::move(data));

Expand Down Expand Up @@ -914,17 +909,15 @@ ss::future<> archival_metadata_stm::apply_snapshot(
get_last_offset(),
_manifest->get_spillover_map().size());

_last_snapshot_offset = header.offset;
_insync_offset = header.offset;
if (snap.dirty == state_dirty::dirty) {
_last_clean_at = model::offset{0};
} else {
_last_clean_at = _insync_offset;
_last_clean_at = header.offset;
}
co_return;
}

ss::future<stm_snapshot> archival_metadata_stm::take_snapshot() {
ss::future<stm_snapshot> archival_metadata_stm::take_local_snapshot() {
auto segments = segments_from_manifest(*_manifest);
auto replaced = replaced_segments_from_manifest(*_manifest);
auto spillover = spillover_from_manifest(*_manifest);
Expand All @@ -946,12 +939,12 @@ ss::future<stm_snapshot> archival_metadata_stm::take_snapshot() {
vlog(
_logger.debug,
"creating snapshot at offset: {}, remote start_offset: {}, "
"last_offset: "
"{}",
_insync_offset,
"last_offset: {}",
last_applied_offset(),
get_start_offset(),
get_last_offset());
co_return stm_snapshot::create(0, _insync_offset, std::move(snap_data));
co_return stm_snapshot::create(
0, last_applied_offset(), std::move(snap_data));
}

model::offset archival_metadata_stm::max_collectible_offset() {
Expand Down Expand Up @@ -1204,7 +1197,7 @@ archival_metadata_stm::get_segments_to_cleanup() const {

ss::future<> archival_metadata_stm::stop() {
_download_as.request_abort();
co_await raft::state_machine::stop();
co_await persisted_stm<>::stop();
}

const cloud_storage::partition_manifest&
Expand Down
15 changes: 9 additions & 6 deletions src/v/cluster/archival_metadata_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,15 @@ class archival_metadata_stm final : public persisted_stm<> {

// Users of the stm need to know insync offset in order to pass
// the proper value to mark_clean
model::offset get_insync_offset() const { return _insync_offset; }
model::offset get_insync_offset() const { return last_applied_offset(); }

model::offset get_last_clean_at() const { return _last_clean_at; };

model::offset max_collectible_offset() override;

std::string_view get_name() const final { return "archival_metadata_stm"; }
ss::future<iobuf> take_snapshot(model::offset) final { co_return iobuf{}; }

private:
ss::future<std::error_code> do_add_segments(
std::vector<cloud_storage::segment_meta>,
Expand All @@ -235,11 +238,11 @@ class archival_metadata_stm final : public persisted_stm<> {
ss::future<std::error_code>
do_replicate_commands(model::record_batch, ss::abort_source&);

ss::future<> apply(model::record_batch batch) override;
ss::future<> handle_raft_snapshot() override;
ss::future<> apply(const model::record_batch& batch) override;
ss::future<> apply_raft_snapshot(const iobuf&) override;

ss::future<> apply_snapshot(stm_snapshot_header, iobuf&&) override;
ss::future<stm_snapshot> take_snapshot() override;
ss::future<> apply_local_snapshot(stm_snapshot_header, iobuf&&) override;
ss::future<stm_snapshot> take_local_snapshot() override;

struct segment;
struct start_offset;
Expand Down Expand Up @@ -289,7 +292,7 @@ class archival_metadata_stm final : public persisted_stm<> {
ss::shared_ptr<cloud_storage::partition_manifest> _manifest;

// The offset of the last mark_clean_cmd applied: if the manifest is
// clean, this will equal _insync_offset.
// clean, this will equal last_applied_offset.
model::offset _last_clean_at;

// The offset of the last record that modified this stm
Expand Down
Loading
Loading