Skip to content

Commit

Permalink
raft: get rid of config_extracting_reader
Browse files Browse the repository at this point in the history
Extract configurations using a wrapping batch consumer instead.
  • Loading branch information
ztlpn committed Apr 16, 2024
1 parent 9342811 commit 2f432c2
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 115 deletions.
78 changes: 0 additions & 78 deletions src/v/raft/consensus_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -230,84 +230,6 @@ group_configuration deserialize_nested_configuration(iobuf_parser& parser) {
return reflection::adl<group_configuration>{}.from(parser);
}

model::record_batch_reader make_config_extracting_reader(
model::offset base_offset,
std::vector<offset_configuration>& target,
model::record_batch_reader&& source) {
class extracting_reader final : public model::record_batch_reader::impl {
private:
using storage_t = model::record_batch_reader::storage_t;
using data_t = model::record_batch_reader::data_t;
using foreign_t = model::record_batch_reader::foreign_data_t;

public:
explicit extracting_reader(
model::offset o,
std::vector<offset_configuration>& target,
std::unique_ptr<model::record_batch_reader::impl> src)
: _next_offset(
o < model::offset(0) ? model::offset(0) : o + model::offset(1))
, _configurations(target)
, _ptr(std::move(src)) {}
extracting_reader(const extracting_reader&) = delete;
extracting_reader& operator=(const extracting_reader&) = delete;
extracting_reader(extracting_reader&&) = delete;
extracting_reader& operator=(extracting_reader&&) = delete;
~extracting_reader() override = default;

bool is_end_of_stream() const final {
// ok to copy a bool
return _ptr->is_end_of_stream();
}

void print(std::ostream& os) final {
fmt::print(os, "configuration extracting reader, proxy for ");
_ptr->print(os);
}

data_t& get_batches(storage_t& st) {
if (std::holds_alternative<data_t>(st)) {
return std::get<data_t>(st);
} else {
return *std::get<foreign_t>(st).buffer;
}
}

ss::future<storage_t>
do_load_slice(model::timeout_clock::time_point t) final {
return _ptr->do_load_slice(t).then([this](storage_t recs) {
for (auto& batch : get_batches(recs)) {
if (
batch.header().type
== model::record_batch_type::raft_configuration) {
extract_configuration(batch);
}
// calculate next offset
_next_offset += model::offset(
batch.header().last_offset_delta)
+ model::offset(1);
}
return recs;
});
}

void extract_configuration(model::record_batch& batch) {
iobuf_parser parser(batch.copy_records().begin()->release_value());
_configurations.emplace_back(
_next_offset, deserialize_configuration(parser));
}

private:
model::offset _next_offset;
std::vector<offset_configuration>& _configurations;
std::unique_ptr<model::record_batch_reader::impl> _ptr;
};
auto reader = std::make_unique<extracting_reader>(
base_offset, target, std::move(source).release());

return model::record_batch_reader(std::move(reader));
}

bytes serialize_group_key(raft::group_id group, metadata_key key_type) {
iobuf buf;
reflection::serialize(buf, key_type, group);
Expand Down
62 changes: 37 additions & 25 deletions src/v/raft/consensus_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,17 +129,6 @@ class do_for_each_batch_consumer {
Func _f;
};

/**
* Extracts all configurations from underlying reader. Configuration are stored
* in a vector passed as a reference to reader. The reader can will
* automatically assing offsets to following batches using provided base offset
* as a staring point
*/
model::record_batch_reader make_config_extracting_reader(
model::offset,
std::vector<offset_configuration>&,
model::record_batch_reader&&);

/**
* Function that allow consuming batches with given consumer while lazily
* extracting raft::group_configuration from the reader.
Expand All @@ -152,20 +141,43 @@ auto for_each_ref_extract_configuration(
model::record_batch_reader&& rdr,
ReferenceConsumer c,
model::timeout_clock::time_point tm) {
using conf_t = std::vector<offset_configuration>;

return ss::do_with(
conf_t{},
[tm, c = std::move(c), base_offset, rdr = std::move(rdr)](
conf_t& configurations) mutable {
return make_config_extracting_reader(
base_offset, configurations, std::move(rdr))
.for_each_ref(std::move(c), tm)
.then([&configurations](auto res) {
return std::make_tuple(
std::move(res), std::move(configurations));
});
});
struct extracting_consumer {
ss::future<ss::stop_iteration> operator()(model::record_batch& batch) {
if (
batch.header().type
== model::record_batch_type::raft_configuration) {
iobuf_parser parser(
batch.copy_records().begin()->release_value());
configurations.emplace_back(
next_offset, deserialize_configuration(parser));
}

// we have to calculate offsets manually because the batch may not
// yet have the base offset assigned.
next_offset += model::offset(batch.header().last_offset_delta)
+ model::offset(1);

return wrapped(batch);
}

auto end_of_stream() {
return ss::futurize_invoke(
[this] { return wrapped.end_of_stream(); })
.then([confs = std::move(configurations)](auto ret) mutable {
return std::make_tuple(std::move(ret), std::move(confs));
});
}

ReferenceConsumer wrapped;
model::offset next_offset;
std::vector<offset_configuration> configurations;
};

return std::move(rdr).for_each_ref(
extracting_consumer{
.wrapped = std::move(c),
.next_offset = model::next_offset(base_offset)},
tm);
}

bytes serialize_group_key(raft::group_id, metadata_key);
Expand Down
23 changes: 11 additions & 12 deletions src/v/raft/tests/foreign_entry_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,17 @@ struct foreign_entry_fixture {

ss::future<raft::group_configuration>
extract_configuration(model::record_batch_reader&& rdr) {
using cfgs_t = std::vector<raft::offset_configuration>;
return ss::do_with(cfgs_t{}, [rdr = std::move(rdr)](cfgs_t& cfgs) mutable {
auto wrapping_rdr = raft::details::make_config_extracting_reader(
model::offset(0), cfgs, std::move(rdr));

return model::consume_reader_to_memory(
std::move(wrapping_rdr), model::no_timeout)
.then([&cfgs](ss::circular_buffer<model::record_batch>) {
BOOST_REQUIRE(!cfgs.empty());
return cfgs.begin()->cfg;
});
});
struct noop_consumer {
ss::future<ss::stop_iteration> operator()(model::record_batch&) {
co_return ss::stop_iteration::no;
}
int end_of_stream() { return 0; }
};

auto [_, cfgs] = co_await raft::details::for_each_ref_extract_configuration(
model::offset(0), std::move(rdr), noop_consumer{}, model::no_timeout);
BOOST_REQUIRE(!cfgs.empty());
co_return cfgs.begin()->cfg;
}

FIXTURE_TEST(sharing_one_reader, foreign_entry_fixture) {
Expand Down

0 comments on commit 2f432c2

Please sign in to comment.