Skip to content

Commit

Permalink
r/configuration: make configuration serde serializable
Browse files Browse the repository at this point in the history
Added support for serde serialization in raft::configuration.
To determine if configuration should use `serde` or `adl` serialization
we peek for the configuration version. In serde version it will be
version from envelope while in adl serialized object it is going to be
an object version. This way we will be able to deserialize older
configuration types.

Signed-off-by: Michal Maslanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Oct 3, 2023
1 parent 9e0cb80 commit b29235a
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 38 deletions.
6 changes: 3 additions & 3 deletions src/v/cluster/members_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "config/configuration.h"
#include "features/feature_table.h"
#include "model/metadata.h"
#include "raft/consensus_utils.h"
#include "raft/errc.h"
#include "raft/group_configuration.h"
#include "raft/types.h"
Expand Down Expand Up @@ -606,9 +607,8 @@ members_manager::apply_raft_configuration_batch(model::record_batch b) {
"raft configuration batches are expected to have exactly one record. "
"Current batch contains {} records",
b.record_count());

auto cfg = reflection::from_iobuf<raft::group_configuration>(
b.copy_records().front().release_value());
iobuf_parser parser(b.copy_records().front().release_value());
auto cfg = raft::details::deserialize_configuration(parser);

co_await handle_raft0_cfg_update(std::move(cfg), b.base_offset());

Expand Down
15 changes: 12 additions & 3 deletions src/v/raft/configuration_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
#include "raft/configuration_manager.h"

#include "bytes/iobuf_parser.h"
#include "features/feature_table.h"
#include "model/fundamental.h"
#include "raft/consensus_utils.h"
#include "raft/types.h"
#include "reflection/adl.h"
#include "serde/rw/rw.h"
#include "storage/api.h"
#include "storage/kvstore.h"
#include "vlog.h"
Expand Down Expand Up @@ -249,7 +251,12 @@ serialize_configurations(const configuration_manager::underlying_t& cfgs) {
cfgs.cbegin(),
cfgs.cend(),
[&ret](const auto& p) mutable {
reflection::serialize(ret, p.first, p.second.cfg);
reflection::serialize(ret, p.first);
if (p.second.cfg.version() >= group_configuration::v_6) {
serde::write(ret, p.second.cfg);
} else {
reflection::serialize(ret, p.second.cfg);
}
})
.then([&ret] { return std::move(ret); });
});
Expand All @@ -272,8 +279,10 @@ ss::future<configuration_manager::underlying_t> deserialize_configurations(
[&parser, &configs, initial](uint64_t i) mutable {
auto key = reflection::adl<model::offset>{}.from(
parser);
auto value = reflection::adl<group_configuration>{}
.from(parser);

auto value
= details::deserialize_nested_configuration(
parser);
auto [_, success] = configs.try_emplace(
key,
configuration_manager::indexed_configuration(
Expand Down
8 changes: 8 additions & 0 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2507,6 +2507,14 @@ ss::future<std::error_code> consensus::replicate_configuration(
return ss::with_gate(
_bg, [this, u = std::move(u), cfg = std::move(cfg)]() mutable {
maybe_upgrade_configuration_to_v4(cfg);
if (
cfg.version() == group_configuration::v_5
&& use_serde_configuration()) {
vlog(
_ctxlog.debug, "Upgrading configuration {} version to 6", cfg);
cfg.set_version(group_configuration::v_6);
}

auto batches = details::serialize_configuration_as_batches(
std::move(cfg));
for (auto& b : batches) {
Expand Down
4 changes: 4 additions & 0 deletions src/v/raft/consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,10 @@ class consensus {
model::offset leader_last_offset,
bool already_recovering);

bool use_serde_configuration() const {
return _features.is_active(features::feature::raft_config_serde);
}

// args
vnode _self;
raft::group_id _group;
Expand Down
43 changes: 34 additions & 9 deletions src/v/raft/consensus_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "random/generators.h"
#include "reflection/adl.h"
#include "resource_mgmt/io_priority.h"
#include "serde/rw/rw.h"
#include "ssx/future-util.h"
#include "storage/api.h"
#include "storage/fs_utils.h"
Expand Down Expand Up @@ -167,14 +168,22 @@ ss::future<configuration_bootstrap_state> read_bootstrap_state(
});
}

iobuf serialize_configuration(group_configuration cfg) {
if (likely(cfg.version() >= raft::group_configuration::v_6)) {
return serde::to_iobuf(std::move(cfg));
}

return reflection::to_iobuf(std::move(cfg));
}

ss::circular_buffer<model::record_batch>
serialize_configuration_as_batches(group_configuration cfg) {
auto batch = std::move(
storage::record_batch_builder(
model::record_batch_type::raft_configuration,
model::offset(0))
.add_raw_kv(iobuf(), reflection::to_iobuf(std::move(cfg))))
.build();
auto batch
= std::move(
storage::record_batch_builder(
model::record_batch_type::raft_configuration, model::offset(0))
.add_raw_kv(iobuf(), serialize_configuration(std::move(cfg))))
.build();
ss::circular_buffer<model::record_batch> batches;
batches.reserve(1);
batches.push_back(std::move(batch));
Expand Down Expand Up @@ -289,6 +298,22 @@ ss::future<> persist_snapshot(
});
});
}
group_configuration deserialize_configuration(iobuf_parser& parser) {
const auto version = serde::peek_version(parser);
if (likely(version >= group_configuration::v_6)) {
return serde::read<group_configuration>(parser);
}

return reflection::adl<group_configuration>{}.from(parser);
}
group_configuration deserialize_nested_configuration(iobuf_parser& parser) {
const auto version = serde::peek_version(parser);
if (likely(version >= group_configuration::v_6)) {
return serde::read_nested<group_configuration>(parser, 0UL);
}

return reflection::adl<group_configuration>{}.from(parser);
}

model::record_batch_reader make_config_extracting_reader(
model::offset base_offset,
Expand Down Expand Up @@ -352,9 +377,9 @@ model::record_batch_reader make_config_extracting_reader(
}

void extract_configuration(model::record_batch& batch) {
auto cfg = reflection::from_iobuf<group_configuration>(
batch.copy_records().begin()->value().copy());
_configurations.emplace_back(_next_offset, std::move(cfg));
iobuf_parser parser(batch.copy_records().begin()->release_value());
_configurations.emplace_back(
_next_offset, deserialize_configuration(parser));
}

private:
Expand Down
7 changes: 7 additions & 0 deletions src/v/raft/consensus_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ foreign_share_n(model::record_batch_reader&&, std::size_t);
ss::circular_buffer<model::record_batch>
serialize_configuration_as_batches(group_configuration cfg);

iobuf serialize_configuration(group_configuration cfg);
void write_configuration(group_configuration cfg, iobuf& out);

/// returns a fully parsed config state from a given storage log, starting at
/// given offset
ss::future<raft::configuration_bootstrap_state> read_bootstrap_state(
Expand All @@ -53,6 +56,10 @@ fragmented_vector<model::record_batch> make_ghost_batches_in_gaps(
ss::future<>
persist_snapshot(storage::simple_snapshot_manager&, snapshot_metadata, iobuf&&);

group_configuration deserialize_configuration(iobuf_parser&);

group_configuration deserialize_nested_configuration(iobuf_parser&);

/// looks up for the broker with request id in a vector of brokers
template<typename Iterator>
Iterator find_machine(Iterator begin, Iterator end, model::node_id id) {
Expand Down
45 changes: 40 additions & 5 deletions src/v/raft/group_configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,9 @@ group_configuration::group_configuration(
group_nodes current,
model::revision_id revision,
std::optional<configuration_update> update,
std::optional<group_nodes> old)
: _version(current_version)
std::optional<group_nodes> old,
version_t version)
: _version(version)
, _current(std::move(current))
, _configuration_update(std::move(update))
, _old(std::move(old))
Expand Down Expand Up @@ -1259,11 +1260,29 @@ std::ostream& operator<<(std::ostream& o, configuration_state t) {
std::ostream& operator<<(std::ostream& o, const configuration_update& u) {
fmt::print(
o,
"{{to_add: {}, to_remove: {}}}",
"{{to_add: {}, to_remove: {}, learner_start_offset: {}}}",
u.replicas_to_add,
u.replicas_to_remove);
u.replicas_to_remove,
u.learner_start_offset);
return o;
}
group_configuration group_configuration::serde_direct_read(
iobuf_parser& p, const serde::header& h) {
auto current = serde::read_nested<group_nodes>(p, h._bytes_left_limit);
auto update = serde::read_nested<std::optional<configuration_update>>(
p, h._bytes_left_limit);
auto old = serde::read_nested<std::optional<group_nodes>>(
p, h._bytes_left_limit);
auto rev = serde::read_nested<model::revision_id>(p, h._bytes_left_limit);
return {std::move(current), rev, std::move(update), std::move(old)};
}
void group_configuration::serde_write(iobuf& out) {
using serde::write;
write(out, _current);
write(out, _configuration_update);
write(out, _old);
write(out, _revision);
}
} // namespace raft

namespace reflection {
Expand Down Expand Up @@ -1336,6 +1355,7 @@ adl<raft::group_configuration>::from(iobuf_parser& p) {
* version 3 - model::broker with multiple endpoints
* version 4 - persist configuration update request
* version 5 - no brokers
* version 6 - serde
*/

std::vector<model::broker> brokers;
Expand Down Expand Up @@ -1381,7 +1401,12 @@ adl<raft::group_configuration>::from(iobuf_parser& p) {
}
if (likely(version >= raft::group_configuration::v_5)) {
return {
std::move(current), revision, std::move(update), std::move(old)};
std::move(current),
revision,
std::move(update),
std::move(old),
// adl configuration is limited to v_5
raft::group_configuration::v_5};
} else {
raft::group_configuration cfg{
std::move(brokers),
Expand Down Expand Up @@ -1422,4 +1447,14 @@ adl<raft::configuration_update>::from(iobuf_parser& in) {
};
}

void adl<raft::group_nodes>::to(iobuf& buffer, raft::group_nodes n) {
reflection::serialize(buffer, n.voters, n.learners);
}
raft::group_nodes adl<raft::group_nodes>::from(iobuf_parser& p) {
auto voters = adl<std::vector<raft::vnode>>{}.from(p);
auto learners = adl<std::vector<raft::vnode>>{}.from(p);
return raft::group_nodes{
.voters = std::move(voters), .learners = std::move(learners)};
}

} // namespace reflection
30 changes: 26 additions & 4 deletions src/v/raft/group_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ enum class configuration_state : uint8_t { simple, transitional, joint };

std::ostream& operator<<(std::ostream& o, configuration_state t);

struct group_nodes {
struct group_nodes
: serde::envelope<group_nodes, serde::version<0>, serde::compat_version<0>> {
std::vector<vnode> voters;
std::vector<vnode> learners;

Expand All @@ -99,6 +100,8 @@ struct group_nodes {

friend std::ostream& operator<<(std::ostream&, const group_nodes&);
friend bool operator==(const group_nodes&, const group_nodes&) = default;

auto serde_fields() { return std::tie(voters, learners); }
};

struct configuration_update
Expand Down Expand Up @@ -129,7 +132,11 @@ struct configuration_update
friend std::ostream& operator<<(std::ostream&, const configuration_update&);
};

class group_configuration final {
class group_configuration
: public serde::envelope<
group_configuration,
serde::version<6>,
serde::compat_version<6>> {
public:
using version_t
= named_type<int8_t, struct raft_group_configuration_version>;
Expand All @@ -140,7 +147,11 @@ class group_configuration final {
static constexpr version_t v_4{4};
// simplified configuration, not serializing brokers field
static constexpr version_t v_5{5};
static constexpr version_t current_version = v_5;

// serde serialized configuration
static constexpr version_t v_6{6};

static constexpr version_t current_version = v_6;

/**
* creates a configuration where all provided brokers are current
Expand Down Expand Up @@ -175,7 +186,8 @@ class group_configuration final {
group_nodes,
model::revision_id,
std::optional<configuration_update>,
std::optional<group_nodes> = std::nullopt);
std::optional<group_nodes> = std::nullopt,
version_t version = current_version);

group_configuration(const group_configuration&) = default;
group_configuration(group_configuration&&) = default;
Expand Down Expand Up @@ -395,6 +407,11 @@ class group_configuration final {
**/
bool is_with_brokers() const { return _version < v_5; }

void serde_write(iobuf& out);

static group_configuration
serde_direct_read(iobuf_parser&, const serde::header&);

private:
friend class configuration_change_strategy_v3;

Expand Down Expand Up @@ -551,4 +568,9 @@ struct adl<raft::configuration_update> {
void to(iobuf&, raft::configuration_update);
raft::configuration_update from(iobuf_parser&);
};
template<>
struct adl<raft::group_nodes> {
void to(iobuf&, raft::group_nodes);
raft::group_nodes from(iobuf_parser&);
};
} // namespace reflection
17 changes: 17 additions & 0 deletions src/v/raft/tests/configuration_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -303,3 +303,20 @@ FIXTURE_TEST(test_assigning_initial_revision, config_manager_fixture) {
BOOST_REQUIRE(
mgr.get_latest().contains(raft::vnode(model::node_id(1), new_revision)));
}

FIXTURE_TEST(test_mixed_configuration_versions, config_manager_fixture) {
auto cfg = random_configuration();
cfg.set_version(raft::group_configuration::v_5);
_cfg_mgr.add(model::offset(0), cfg).get0();
cfg = random_configuration();
cfg.set_version(raft::group_configuration::v_6);
_cfg_mgr.add(model::offset(1), cfg).get0();
cfg = random_configuration();
cfg.set_version(raft::group_configuration::v_6);
_cfg_mgr.add(model::offset(2), cfg).get0();
cfg = random_configuration();
cfg.set_version(raft::group_configuration::v_5);
_cfg_mgr.add(model::offset(3), cfg).get0();

validate_recovery();
}
Loading

0 comments on commit b29235a

Please sign in to comment.