Skip to content

Commit

Permalink
Merge pull request #11691 from mmaslankaprv/fix-11673
Browse files Browse the repository at this point in the history
Use `chunked_fifo` to retrieve deltas from `controller_backend`
  • Loading branch information
mmaslankaprv authored Jul 10, 2023
2 parents 1cd0828 + fddcb30 commit 3537064
Show file tree
Hide file tree
Showing 13 changed files with 139 additions and 55 deletions.
7 changes: 4 additions & 3 deletions src/v/cluster/controller_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "rpc/connection_cache.h"
#include "ssx/future-util.h"

#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/core/future.hh>
#include <seastar/core/loop.hh>
Expand Down Expand Up @@ -142,7 +143,7 @@ controller_api::get_reconciliation_state(model::topic_namespace_view tp_ns) {
co_return co_await get_reconciliation_state(std::move(ntps));
}

ss::future<std::vector<controller_backend::delta_metadata>>
ss::future<ss::chunked_fifo<controller_backend::delta_metadata>>
controller_api::get_remote_core_deltas(model::ntp ntp, ss::shard_id shard) {
return _backend.invoke_on(
shard, [ntp = std::move(ntp)](controller_backend& backend) {
Expand All @@ -164,7 +165,7 @@ controller_api::get_reconciliation_state(model::ntp ntp) {
std::move(ntp), errc::partition_not_exists);
}
// query controller backends for in progress operations
std::vector<backend_operation> ops;
ss::chunked_fifo<backend_operation> ops;
const auto shards = boost::irange<ss::shard_id>(0, ss::smp::count);
for (auto shard : shards) {
auto local_deltas = co_await get_remote_core_deltas(ntp, shard);
Expand Down Expand Up @@ -268,7 +269,7 @@ controller_api::get_reconciliation_state(
}
vassert(result.value().size() == 1, "result MUST contain single ntp");

return ret_t(result.value().front());
return ret_t(std::move(result.value().front()));
});
}

Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/controller_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "seastarx.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/sharded.hh>

#include <absl/container/node_hash_map.h>
Expand Down Expand Up @@ -88,7 +89,7 @@ class controller_api {
std::optional<ss::shard_id> shard_for(const model::ntp& ntp) const;

private:
ss::future<std::vector<controller_backend::delta_metadata>>
ss::future<ss::chunked_fifo<controller_backend::delta_metadata>>
get_remote_core_deltas(model::ntp, ss::shard_id);

model::node_id _self;
Expand Down
10 changes: 7 additions & 3 deletions src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include <absl/container/flat_hash_map.h>
#include <absl/container/flat_hash_set.h>
#include <absl/container/node_hash_map.h>
#include <fmt/ranges.h>

#include <algorithm>
#include <exception>
Expand Down Expand Up @@ -589,7 +590,6 @@ controller_backend::deltas_t calculate_bootstrap_deltas(

// *it is not included
auto start = it.base();
result_delta.reserve(std::distance(start, deltas.end()));
std::move(start, deltas.end(), std::back_inserter(result_delta));
return result_delta;
}
Expand Down Expand Up @@ -2017,10 +2017,14 @@ ss::future<> controller_backend::delete_partition(
co_await _partition_manager.local().remove(ntp, mode);
}

std::vector<controller_backend::delta_metadata>
ss::chunked_fifo<controller_backend::delta_metadata>
controller_backend::list_ntp_deltas(const model::ntp& ntp) const {
if (auto it = _topic_deltas.find(ntp); it != _topic_deltas.end()) {
return it->second;
ss::chunked_fifo<controller_backend::delta_metadata> ret;
ret.reserve(it->second.size());
std::copy(
it->second.begin(), it->second.end(), std::back_inserter(ret));
return ret;
}

return {};
Expand Down
10 changes: 6 additions & 4 deletions src/v/cluster/controller_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
#include "storage/api.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/sharded.hh>

#include <absl/container/btree_map.h>
#include <absl/container/node_hash_map.h>

#include <cstdint>
#include <deque>
#include <ostream>

namespace cluster {
Expand Down Expand Up @@ -231,7 +233,7 @@ class controller_backend
friend std::ostream& operator<<(std::ostream&, const delta_metadata&);
};

using deltas_t = std::vector<delta_metadata>;
using deltas_t = std::deque<delta_metadata>;
using results_t = std::vector<std::error_code>;
controller_backend(
ss::sharded<cluster::topic_table>&,
Expand All @@ -247,7 +249,7 @@ class controller_backend
ss::future<> stop();
ss::future<> start();

std::vector<delta_metadata> list_ntp_deltas(const model::ntp&) const;
ss::chunked_fifo<delta_metadata> list_ntp_deltas(const model::ntp&) const;

private:
struct cross_shard_move_request {
Expand Down Expand Up @@ -437,6 +439,6 @@ class controller_backend
ss::metrics::metric_groups _metrics;
};

std::vector<controller_backend::delta_metadata> calculate_bootstrap_deltas(
model::node_id self, const std::vector<controller_backend::delta_metadata>&);
controller_backend::deltas_t calculate_bootstrap_deltas(
model::node_id self, const controller_backend::deltas_t&);
} // namespace cluster
18 changes: 11 additions & 7 deletions src/v/cluster/tests/controller_backend_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ make_assignment(std::vector<model::broker_shard> replicas) {
using op_t = cluster::topic_table::delta::op_type;
using delta_t = cluster::topic_table::delta;
using meta_t = cluster::controller_backend::delta_metadata;
using deltas_t = std::vector<meta_t>;
using deltas_t = std::deque<meta_t>;

meta_t make_delta(
int64_t o,
Expand Down Expand Up @@ -142,11 +142,14 @@ SEASTAR_THREAD_TEST_CASE(update_including_current_node) {
current_node, std::move(d_1));

BOOST_REQUIRE_EQUAL(deltas.size(), 3);
BOOST_REQUIRE_EQUAL(deltas[0].delta.offset, recreate_current.delta.offset);
BOOST_REQUIRE_EQUAL(
deltas[1].delta.offset, update_with_current.delta.offset);
deltas.begin()->delta.offset, recreate_current.delta.offset);
BOOST_REQUIRE_EQUAL(
deltas[2].delta.offset, finish_update_with_current.delta.offset);
std::next(deltas.begin())->delta.offset,
update_with_current.delta.offset);
BOOST_REQUIRE_EQUAL(
std::next(deltas.begin(), 2)->delta.offset,
finish_update_with_current.delta.offset);
}

SEASTAR_THREAD_TEST_CASE(update_excluding_current_node) {
Expand Down Expand Up @@ -196,9 +199,10 @@ SEASTAR_THREAD_TEST_CASE(move_back_to_current_node) {

BOOST_REQUIRE_EQUAL(deltas.size(), 2);
BOOST_REQUIRE_EQUAL(
deltas[0].delta.offset, update_with_current_2.delta.offset);
deltas.begin()->delta.offset, update_with_current_2.delta.offset);
BOOST_REQUIRE_EQUAL(
deltas[1].delta.offset, finish_update_with_current_2.delta.offset);
std::next(deltas.begin())->delta.offset,
finish_update_with_current_2.delta.offset);
}

SEASTAR_THREAD_TEST_CASE(move_back_to_current_node_not_finished) {
Expand All @@ -218,5 +222,5 @@ SEASTAR_THREAD_TEST_CASE(move_back_to_current_node_not_finished) {

BOOST_REQUIRE_EQUAL(deltas.size(), 1);
BOOST_REQUIRE_EQUAL(
deltas[0].delta.offset, update_with_current_2.delta.offset);
deltas.begin()->delta.offset, update_with_current_2.delta.offset);
}
20 changes: 10 additions & 10 deletions src/v/cluster/tests/serialization_rt_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1428,44 +1428,44 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) {
roundtrip_test(data);
}
{
std::vector<cluster::backend_operation> backend_operations;
ss::chunked_fifo<cluster::backend_operation> backend_operations;
for (int i = 0, mi = random_generators::get_int(10); i < mi; i++) {
backend_operations.push_back(cluster::backend_operation{
.source_shard = random_generators::get_int<unsigned>(1000),
.p_as = random_partition_assignments().front(),
.type = cluster::topic_table_delta::op_type::del,
});
}

cluster::ntp_reconciliation_state data{
model::random_ntp(),
backend_operations,
std::move(backend_operations),
cluster::reconciliation_status::error,
cluster::errc::feature_disabled,
};
roundtrip_test(data);
roundtrip_test(std::move(data));
}
{
std::vector<cluster::ntp_reconciliation_state> results;
for (int i = 0, mi = random_generators::get_int(10); i < mi; i++) {
std::vector<cluster::backend_operation> backend_operations;
ss::chunked_fifo<cluster::backend_operation> backend_operations;
for (int j = 0, mj = random_generators::get_int(10); j < mj; j++) {
backend_operations.push_back(cluster::backend_operation{
.source_shard = random_generators::get_int<unsigned>(1000),
.p_as = random_partition_assignments().front(),
.type = cluster::topic_table_delta::op_type::del,
});
}
results.push_back(cluster::ntp_reconciliation_state{
results.emplace_back(
model::random_ntp(),
backend_operations,
std::move(backend_operations),
cluster::reconciliation_status::error,
cluster::errc::feature_disabled,
});
cluster::errc::feature_disabled);
}
cluster::reconciliation_state_reply data{
.results = results,
.results = std::move(results),
};
roundtrip_test(data);
roundtrip_test(std::move(data));
}
{
cluster::create_acls_cmd_data create_acls_data{};
Expand Down
9 changes: 4 additions & 5 deletions src/v/cluster/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,14 @@ topic_table_delta::get_replica_revision(model::node_id replica) const {

ntp_reconciliation_state::ntp_reconciliation_state(
model::ntp ntp,
std::vector<backend_operation> ops,
ss::chunked_fifo<backend_operation> ops,
reconciliation_status status)
: ntp_reconciliation_state(
std::move(ntp), std::move(ops), status, errc::success) {}

ntp_reconciliation_state::ntp_reconciliation_state(
model::ntp ntp,
std::vector<backend_operation> ops,
ss::chunked_fifo<backend_operation> ops,
reconciliation_status status,
errc ec)
: _ntp(std::move(ntp))
Expand Down Expand Up @@ -1546,12 +1546,11 @@ void adl<cluster::ntp_reconciliation_state>::to(
cluster::ntp_reconciliation_state
adl<cluster::ntp_reconciliation_state>::from(iobuf_parser& in) {
auto ntp = adl<model::ntp>{}.from(in);
auto ops = adl<std::vector<cluster::backend_operation>>{}.from(in);
auto ops = adl<ss::chunked_fifo<cluster::backend_operation>>{}.from(in);
auto status = adl<cluster::reconciliation_status>{}.from(in);
auto error = adl<cluster::errc>{}.from(in);

return cluster::ntp_reconciliation_state(
std::move(ntp), std::move(ops), status, error);
return {std::move(ntp), std::move(ops), status, error};
}

void adl<cluster::create_partitions_configuration>::to(
Expand Down
61 changes: 50 additions & 11 deletions src/v/cluster/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -2763,23 +2763,23 @@ class ntp_reconciliation_state

// success case
ntp_reconciliation_state(
model::ntp, std::vector<backend_operation>, reconciliation_status);
model::ntp, ss::chunked_fifo<backend_operation>, reconciliation_status);

// error
ntp_reconciliation_state(model::ntp, cluster::errc);

ntp_reconciliation_state(
model::ntp,
std::vector<backend_operation>,
ss::chunked_fifo<backend_operation>,
reconciliation_status,
cluster::errc);

const model::ntp& ntp() const { return _ntp; }
const std::vector<backend_operation>& pending_operations() const {
const ss::chunked_fifo<backend_operation>& pending_operations() const {
return _backend_operations;
}

std::vector<backend_operation>& pending_operations() {
ss::chunked_fifo<backend_operation>& pending_operations() {
return _backend_operations;
}

Expand All @@ -2788,9 +2788,28 @@ class ntp_reconciliation_state
std::error_code error() const { return make_error_code(_error); }
errc cluster_errc() const { return _error; }

friend bool
operator==(const ntp_reconciliation_state&, const ntp_reconciliation_state&)
= default;
friend bool operator==(
const ntp_reconciliation_state& lhs,
const ntp_reconciliation_state& rhs) {
return lhs._ntp == rhs._ntp && lhs._status == rhs._status
&& lhs._error == rhs._error
&& lhs._backend_operations.size()
== rhs._backend_operations.size()
&& std::equal(
lhs._backend_operations.begin(),
lhs._backend_operations.end(),
rhs._backend_operations.begin());
};

ntp_reconciliation_state copy() const {
ss::chunked_fifo<backend_operation> backend_operations;
backend_operations.reserve(_backend_operations.size());
std::copy(
_backend_operations.begin(),
_backend_operations.end(),
std::back_inserter(backend_operations));
return {_ntp, std::move(backend_operations), _status, _error};
}

friend std::ostream&
operator<<(std::ostream&, const ntp_reconciliation_state&);
Expand All @@ -2801,19 +2820,29 @@ class ntp_reconciliation_state

private:
model::ntp _ntp;
std::vector<backend_operation> _backend_operations;
ss::chunked_fifo<backend_operation> _backend_operations;
reconciliation_status _status;
errc _error;
};

struct node_backend_operations {
node_backend_operations(
model::node_id id, std::vector<backend_operation> ops)
model::node_id id, ss::chunked_fifo<backend_operation> ops)
: node_id(id)
, backend_operations(std::move(ops)) {}

model::node_id node_id;
std::vector<backend_operation> backend_operations;
ss::chunked_fifo<backend_operation> backend_operations;

node_backend_operations copy() const {
ss::chunked_fifo<backend_operation> b_ops;
b_ops.reserve(backend_operations.size());
std::copy(
backend_operations.begin(),
backend_operations.end(),
std::back_inserter(b_ops));
return {node_id, std::move(b_ops)};
}
};

struct node_error {
Expand All @@ -2826,7 +2855,7 @@ struct node_error {
};

struct global_reconciliation_state {
absl::node_hash_map<model::ntp, std::vector<node_backend_operations>>
absl::node_hash_map<model::ntp, ss::chunked_fifo<node_backend_operations>>
ntp_backend_operations;
std::vector<node_error> node_errors;
};
Expand Down Expand Up @@ -2868,6 +2897,16 @@ struct reconciliation_state_reply
return o;
}

reconciliation_state_reply copy() const {
std::vector<ntp_reconciliation_state> results_cp;
results_cp.reserve(results.size());
for (auto& r : results) {
results_cp.push_back(r.copy());
}

return reconciliation_state_reply{.results = std::move(results_cp)};
}

auto serde_fields() { return std::tie(results); }
};

Expand Down
Loading

0 comments on commit 3537064

Please sign in to comment.