Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use chunked_fifo to retrieve deltas from controller_backend #11691

Merged
merged 3 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/v/cluster/controller_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "rpc/connection_cache.h"
#include "ssx/future-util.h"

#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/core/future.hh>
#include <seastar/core/loop.hh>
Expand Down Expand Up @@ -142,7 +143,7 @@ controller_api::get_reconciliation_state(model::topic_namespace_view tp_ns) {
co_return co_await get_reconciliation_state(std::move(ntps));
}

ss::future<std::vector<controller_backend::delta_metadata>>
ss::future<ss::chunked_fifo<controller_backend::delta_metadata>>
controller_api::get_remote_core_deltas(model::ntp ntp, ss::shard_id shard) {
return _backend.invoke_on(
shard, [ntp = std::move(ntp)](controller_backend& backend) {
Expand All @@ -164,7 +165,7 @@ controller_api::get_reconciliation_state(model::ntp ntp) {
std::move(ntp), errc::partition_not_exists);
}
// query controller backends for in progress operations
std::vector<backend_operation> ops;
ss::chunked_fifo<backend_operation> ops;
const auto shards = boost::irange<ss::shard_id>(0, ss::smp::count);
for (auto shard : shards) {
auto local_deltas = co_await get_remote_core_deltas(ntp, shard);
Expand Down Expand Up @@ -268,7 +269,7 @@ controller_api::get_reconciliation_state(
}
vassert(result.value().size() == 1, "result MUST contain single ntp");

return ret_t(result.value().front());
return ret_t(std::move(result.value().front()));
});
}

Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/controller_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "seastarx.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/sharded.hh>

#include <absl/container/node_hash_map.h>
Expand Down Expand Up @@ -88,7 +89,7 @@ class controller_api {
std::optional<ss::shard_id> shard_for(const model::ntp& ntp) const;

private:
ss::future<std::vector<controller_backend::delta_metadata>>
ss::future<ss::chunked_fifo<controller_backend::delta_metadata>>
get_remote_core_deltas(model::ntp, ss::shard_id);

model::node_id _self;
Expand Down
10 changes: 7 additions & 3 deletions src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include <absl/container/flat_hash_map.h>
#include <absl/container/flat_hash_set.h>
#include <absl/container/node_hash_map.h>
#include <fmt/ranges.h>
mmaslankaprv marked this conversation as resolved.
Show resolved Hide resolved

#include <algorithm>
#include <exception>
Expand Down Expand Up @@ -589,7 +590,6 @@ controller_backend::deltas_t calculate_bootstrap_deltas(

// *it is not included
auto start = it.base();
result_delta.reserve(std::distance(start, deltas.end()));
std::move(start, deltas.end(), std::back_inserter(result_delta));
return result_delta;
}
Expand Down Expand Up @@ -2020,10 +2020,14 @@ ss::future<> controller_backend::delete_partition(
co_await _partition_manager.local().remove(ntp, mode);
}

std::vector<controller_backend::delta_metadata>
ss::chunked_fifo<controller_backend::delta_metadata>
controller_backend::list_ntp_deltas(const model::ntp& ntp) const {
if (auto it = _topic_deltas.find(ntp); it != _topic_deltas.end()) {
return it->second;
ss::chunked_fifo<controller_backend::delta_metadata> ret;
ret.reserve(it->second.size());
std::copy(
it->second.begin(), it->second.end(), std::back_inserter(ret));
return ret;
}

return {};
Expand Down
10 changes: 6 additions & 4 deletions src/v/cluster/controller_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
#include "storage/api.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/sharded.hh>

#include <absl/container/btree_map.h>
#include <absl/container/node_hash_map.h>

#include <cstdint>
#include <deque>
#include <ostream>

namespace cluster {
Expand Down Expand Up @@ -231,7 +233,7 @@ class controller_backend
friend std::ostream& operator<<(std::ostream&, const delta_metadata&);
};

using deltas_t = std::vector<delta_metadata>;
using deltas_t = std::deque<delta_metadata>;
using results_t = std::vector<std::error_code>;
controller_backend(
ss::sharded<cluster::topic_table>&,
Expand All @@ -247,7 +249,7 @@ class controller_backend
ss::future<> stop();
ss::future<> start();

std::vector<delta_metadata> list_ntp_deltas(const model::ntp&) const;
ss::chunked_fifo<delta_metadata> list_ntp_deltas(const model::ntp&) const;

private:
struct cross_shard_move_request {
Expand Down Expand Up @@ -437,6 +439,6 @@ class controller_backend
ss::metrics::metric_groups _metrics;
};

std::vector<controller_backend::delta_metadata> calculate_bootstrap_deltas(
model::node_id self, const std::vector<controller_backend::delta_metadata>&);
controller_backend::deltas_t calculate_bootstrap_deltas(
model::node_id self, const controller_backend::deltas_t&);
} // namespace cluster
18 changes: 11 additions & 7 deletions src/v/cluster/tests/controller_backend_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ make_assignment(std::vector<model::broker_shard> replicas) {
using op_t = cluster::topic_table::delta::op_type;
using delta_t = cluster::topic_table::delta;
using meta_t = cluster::controller_backend::delta_metadata;
using deltas_t = std::vector<meta_t>;
using deltas_t = std::deque<meta_t>;

meta_t make_delta(
int64_t o,
Expand Down Expand Up @@ -142,11 +142,14 @@ SEASTAR_THREAD_TEST_CASE(update_including_current_node) {
current_node, std::move(d_1));

BOOST_REQUIRE_EQUAL(deltas.size(), 3);
BOOST_REQUIRE_EQUAL(deltas[0].delta.offset, recreate_current.delta.offset);
BOOST_REQUIRE_EQUAL(
deltas[1].delta.offset, update_with_current.delta.offset);
deltas.begin()->delta.offset, recreate_current.delta.offset);
BOOST_REQUIRE_EQUAL(
deltas[2].delta.offset, finish_update_with_current.delta.offset);
std::next(deltas.begin())->delta.offset,
update_with_current.delta.offset);
BOOST_REQUIRE_EQUAL(
std::next(deltas.begin(), 2)->delta.offset,
finish_update_with_current.delta.offset);
}

SEASTAR_THREAD_TEST_CASE(update_excluding_current_node) {
Expand Down Expand Up @@ -196,9 +199,10 @@ SEASTAR_THREAD_TEST_CASE(move_back_to_current_node) {

BOOST_REQUIRE_EQUAL(deltas.size(), 2);
BOOST_REQUIRE_EQUAL(
deltas[0].delta.offset, update_with_current_2.delta.offset);
deltas.begin()->delta.offset, update_with_current_2.delta.offset);
BOOST_REQUIRE_EQUAL(
deltas[1].delta.offset, finish_update_with_current_2.delta.offset);
std::next(deltas.begin())->delta.offset,
finish_update_with_current_2.delta.offset);
}

SEASTAR_THREAD_TEST_CASE(move_back_to_current_node_not_finished) {
Expand All @@ -218,5 +222,5 @@ SEASTAR_THREAD_TEST_CASE(move_back_to_current_node_not_finished) {

BOOST_REQUIRE_EQUAL(deltas.size(), 1);
BOOST_REQUIRE_EQUAL(
deltas[0].delta.offset, update_with_current_2.delta.offset);
deltas.begin()->delta.offset, update_with_current_2.delta.offset);
}
20 changes: 10 additions & 10 deletions src/v/cluster/tests/serialization_rt_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1428,44 +1428,44 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) {
roundtrip_test(data);
}
{
std::vector<cluster::backend_operation> backend_operations;
ss::chunked_fifo<cluster::backend_operation> backend_operations;
for (int i = 0, mi = random_generators::get_int(10); i < mi; i++) {
backend_operations.push_back(cluster::backend_operation{
.source_shard = random_generators::get_int<unsigned>(1000),
.p_as = random_partition_assignments().front(),
.type = cluster::topic_table_delta::op_type::del,
});
}

cluster::ntp_reconciliation_state data{
model::random_ntp(),
backend_operations,
std::move(backend_operations),
cluster::reconciliation_status::error,
cluster::errc::feature_disabled,
};
roundtrip_test(data);
roundtrip_test(std::move(data));
}
{
std::vector<cluster::ntp_reconciliation_state> results;
for (int i = 0, mi = random_generators::get_int(10); i < mi; i++) {
std::vector<cluster::backend_operation> backend_operations;
ss::chunked_fifo<cluster::backend_operation> backend_operations;
for (int j = 0, mj = random_generators::get_int(10); j < mj; j++) {
backend_operations.push_back(cluster::backend_operation{
.source_shard = random_generators::get_int<unsigned>(1000),
.p_as = random_partition_assignments().front(),
.type = cluster::topic_table_delta::op_type::del,
});
}
results.push_back(cluster::ntp_reconciliation_state{
results.emplace_back(
model::random_ntp(),
backend_operations,
std::move(backend_operations),
cluster::reconciliation_status::error,
cluster::errc::feature_disabled,
});
cluster::errc::feature_disabled);
}
cluster::reconciliation_state_reply data{
.results = results,
.results = std::move(results),
};
roundtrip_test(data);
roundtrip_test(std::move(data));
}
{
cluster::create_acls_cmd_data create_acls_data{};
Expand Down
9 changes: 4 additions & 5 deletions src/v/cluster/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,14 @@ topic_table_delta::get_replica_revision(model::node_id replica) const {

ntp_reconciliation_state::ntp_reconciliation_state(
model::ntp ntp,
std::vector<backend_operation> ops,
ss::chunked_fifo<backend_operation> ops,
reconciliation_status status)
: ntp_reconciliation_state(
std::move(ntp), std::move(ops), status, errc::success) {}

ntp_reconciliation_state::ntp_reconciliation_state(
model::ntp ntp,
std::vector<backend_operation> ops,
ss::chunked_fifo<backend_operation> ops,
reconciliation_status status,
errc ec)
: _ntp(std::move(ntp))
Expand Down Expand Up @@ -1546,12 +1546,11 @@ void adl<cluster::ntp_reconciliation_state>::to(
cluster::ntp_reconciliation_state
adl<cluster::ntp_reconciliation_state>::from(iobuf_parser& in) {
auto ntp = adl<model::ntp>{}.from(in);
auto ops = adl<std::vector<cluster::backend_operation>>{}.from(in);
auto ops = adl<ss::chunked_fifo<cluster::backend_operation>>{}.from(in);
auto status = adl<cluster::reconciliation_status>{}.from(in);
auto error = adl<cluster::errc>{}.from(in);

return cluster::ntp_reconciliation_state(
std::move(ntp), std::move(ops), status, error);
return {std::move(ntp), std::move(ops), status, error};
}

void adl<cluster::create_partitions_configuration>::to(
Expand Down
61 changes: 50 additions & 11 deletions src/v/cluster/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -2763,23 +2763,23 @@ class ntp_reconciliation_state

// success case
ntp_reconciliation_state(
model::ntp, std::vector<backend_operation>, reconciliation_status);
model::ntp, ss::chunked_fifo<backend_operation>, reconciliation_status);

// error
ntp_reconciliation_state(model::ntp, cluster::errc);

ntp_reconciliation_state(
model::ntp,
std::vector<backend_operation>,
ss::chunked_fifo<backend_operation>,
reconciliation_status,
cluster::errc);

const model::ntp& ntp() const { return _ntp; }
const std::vector<backend_operation>& pending_operations() const {
const ss::chunked_fifo<backend_operation>& pending_operations() const {
return _backend_operations;
}

std::vector<backend_operation>& pending_operations() {
ss::chunked_fifo<backend_operation>& pending_operations() {
return _backend_operations;
}

Expand All @@ -2788,9 +2788,28 @@ class ntp_reconciliation_state
std::error_code error() const { return make_error_code(_error); }
errc cluster_errc() const { return _error; }

friend bool
operator==(const ntp_reconciliation_state&, const ntp_reconciliation_state&)
= default;
friend bool operator==(
const ntp_reconciliation_state& lhs,
const ntp_reconciliation_state& rhs) {
return lhs._ntp == rhs._ntp && lhs._status == rhs._status
&& lhs._error == rhs._error
&& lhs._backend_operations.size()
== rhs._backend_operations.size()
&& std::equal(
mmaslankaprv marked this conversation as resolved.
Show resolved Hide resolved
lhs._backend_operations.begin(),
lhs._backend_operations.end(),
rhs._backend_operations.begin());
};

ntp_reconciliation_state copy() const {
ss::chunked_fifo<backend_operation> backend_operations;
backend_operations.reserve(_backend_operations.size());
std::copy(
_backend_operations.begin(),
_backend_operations.end(),
std::back_inserter(backend_operations));
return {_ntp, std::move(backend_operations), _status, _error};
}

friend std::ostream&
operator<<(std::ostream&, const ntp_reconciliation_state&);
Expand All @@ -2801,19 +2820,29 @@ class ntp_reconciliation_state

private:
model::ntp _ntp;
std::vector<backend_operation> _backend_operations;
ss::chunked_fifo<backend_operation> _backend_operations;
reconciliation_status _status;
errc _error;
};

struct node_backend_operations {
node_backend_operations(
model::node_id id, std::vector<backend_operation> ops)
model::node_id id, ss::chunked_fifo<backend_operation> ops)
: node_id(id)
, backend_operations(std::move(ops)) {}

model::node_id node_id;
std::vector<backend_operation> backend_operations;
ss::chunked_fifo<backend_operation> backend_operations;

node_backend_operations copy() const {
ss::chunked_fifo<backend_operation> b_ops;
b_ops.reserve(backend_operations.size());
std::copy(
backend_operations.begin(),
backend_operations.end(),
std::back_inserter(b_ops));
return {node_id, std::move(b_ops)};
}
};

struct node_error {
Expand All @@ -2826,7 +2855,7 @@ struct node_error {
};

struct global_reconciliation_state {
absl::node_hash_map<model::ntp, std::vector<node_backend_operations>>
absl::node_hash_map<model::ntp, ss::chunked_fifo<node_backend_operations>>
ntp_backend_operations;
std::vector<node_error> node_errors;
};
Expand Down Expand Up @@ -2868,6 +2897,16 @@ struct reconciliation_state_reply
return o;
}

reconciliation_state_reply copy() const {
std::vector<ntp_reconciliation_state> results_cp;
results_cp.reserve(results.size());
for (auto& r : results) {
results_cp.push_back(r.copy());
}

return reconciliation_state_reply{.results = std::move(results_cp)};
}

auto serde_fields() { return std::tie(results); }
};

Expand Down
Loading