Skip to content

Commit

Permalink
c/api: use chunked_fifo to retrieve deltas from controller_backend
Browse files Browse the repository at this point in the history
Using `ss::chunked_fifo` to return deltas processed by controller
backend. Previously used `std::vector` may lead to large allocations as
it allocated large chunks of contiguous memory.

Fixes: #11673

Signed-off-by: Michal Maslanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv authored and Michal Maslanka committed Jul 4, 2023
1 parent 6ec20d3 commit c4826fd
Show file tree
Hide file tree
Showing 11 changed files with 117 additions and 44 deletions.
7 changes: 4 additions & 3 deletions src/v/cluster/controller_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "rpc/connection_cache.h"
#include "ssx/future-util.h"

#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/core/future.hh>
#include <seastar/core/loop.hh>
Expand Down Expand Up @@ -142,7 +143,7 @@ controller_api::get_reconciliation_state(model::topic_namespace_view tp_ns) {
co_return co_await get_reconciliation_state(std::move(ntps));
}

ss::future<std::vector<controller_backend::delta_metadata>>
ss::future<ss::chunked_fifo<controller_backend::delta_metadata>>
controller_api::get_remote_core_deltas(model::ntp ntp, ss::shard_id shard) {
return _backend.invoke_on(
shard, [ntp = std::move(ntp)](controller_backend& backend) {
Expand All @@ -164,7 +165,7 @@ controller_api::get_reconciliation_state(model::ntp ntp) {
std::move(ntp), errc::partition_not_exists);
}
// query controller backends for in progress operations
std::vector<backend_operation> ops;
ss::chunked_fifo<backend_operation> ops;
const auto shards = boost::irange<ss::shard_id>(0, ss::smp::count);
for (auto shard : shards) {
auto local_deltas = co_await get_remote_core_deltas(ntp, shard);
Expand Down Expand Up @@ -268,7 +269,7 @@ controller_api::get_reconciliation_state(
}
vassert(result.value().size() == 1, "result MUST contain single ntp");

return ret_t(result.value().front());
return ret_t(std::move(result.value().front()));
});
}

Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/controller_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "seastarx.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/sharded.hh>

#include <absl/container/node_hash_map.h>
Expand Down Expand Up @@ -92,7 +93,7 @@ class controller_api {
absl::node_hash_map<model::node_id, std::vector<model::ntp>>,
model::timeout_clock::time_point);

ss::future<std::vector<controller_backend::delta_metadata>>
ss::future<ss::chunked_fifo<controller_backend::delta_metadata>>
get_remote_core_deltas(model::ntp, ss::shard_id);

model::node_id _self;
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2020,10 +2020,10 @@ ss::future<> controller_backend::delete_partition(
co_await _partition_manager.local().remove(ntp, mode);
}

std::vector<controller_backend::delta_metadata>
ss::chunked_fifo<controller_backend::delta_metadata>
controller_backend::list_ntp_deltas(const model::ntp& ntp) const {
if (auto it = _topic_deltas.find(ntp); it != _topic_deltas.end()) {
std::vector<controller_backend::delta_metadata> ret;
ss::chunked_fifo<controller_backend::delta_metadata> ret;
ret.reserve(it->second.size());
std::copy(
it->second.begin(), it->second.end(), std::back_inserter(ret));
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/controller_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "storage/api.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/sharded.hh>

Expand Down Expand Up @@ -248,7 +249,7 @@ class controller_backend
ss::future<> stop();
ss::future<> start();

std::vector<delta_metadata> list_ntp_deltas(const model::ntp&) const;
ss::chunked_fifo<delta_metadata> list_ntp_deltas(const model::ntp&) const;

private:
struct cross_shard_move_request {
Expand Down
20 changes: 10 additions & 10 deletions src/v/cluster/tests/serialization_rt_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1428,44 +1428,44 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) {
roundtrip_test(data);
}
{
std::vector<cluster::backend_operation> backend_operations;
ss::chunked_fifo<cluster::backend_operation> backend_operations;
for (int i = 0, mi = random_generators::get_int(10); i < mi; i++) {
backend_operations.push_back(cluster::backend_operation{
.source_shard = random_generators::get_int<unsigned>(1000),
.p_as = random_partition_assignments().front(),
.type = cluster::topic_table_delta::op_type::del,
});
}

cluster::ntp_reconciliation_state data{
model::random_ntp(),
backend_operations,
std::move(backend_operations),
cluster::reconciliation_status::error,
cluster::errc::feature_disabled,
};
roundtrip_test(data);
roundtrip_test(std::move(data));
}
{
std::vector<cluster::ntp_reconciliation_state> results;
for (int i = 0, mi = random_generators::get_int(10); i < mi; i++) {
std::vector<cluster::backend_operation> backend_operations;
ss::chunked_fifo<cluster::backend_operation> backend_operations;
for (int j = 0, mj = random_generators::get_int(10); j < mj; j++) {
backend_operations.push_back(cluster::backend_operation{
.source_shard = random_generators::get_int<unsigned>(1000),
.p_as = random_partition_assignments().front(),
.type = cluster::topic_table_delta::op_type::del,
});
}
results.push_back(cluster::ntp_reconciliation_state{
results.emplace_back(
model::random_ntp(),
backend_operations,
std::move(backend_operations),
cluster::reconciliation_status::error,
cluster::errc::feature_disabled,
});
cluster::errc::feature_disabled);
}
cluster::reconciliation_state_reply data{
.results = results,
.results = std::move(results),
};
roundtrip_test(data);
roundtrip_test(std::move(data));
}
{
cluster::create_acls_cmd_data create_acls_data{};
Expand Down
9 changes: 4 additions & 5 deletions src/v/cluster/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,14 @@ topic_table_delta::get_replica_revision(model::node_id replica) const {

ntp_reconciliation_state::ntp_reconciliation_state(
model::ntp ntp,
std::vector<backend_operation> ops,
ss::chunked_fifo<backend_operation> ops,
reconciliation_status status)
: ntp_reconciliation_state(
std::move(ntp), std::move(ops), status, errc::success) {}

ntp_reconciliation_state::ntp_reconciliation_state(
model::ntp ntp,
std::vector<backend_operation> ops,
ss::chunked_fifo<backend_operation> ops,
reconciliation_status status,
errc ec)
: _ntp(std::move(ntp))
Expand Down Expand Up @@ -1546,12 +1546,11 @@ void adl<cluster::ntp_reconciliation_state>::to(
cluster::ntp_reconciliation_state
adl<cluster::ntp_reconciliation_state>::from(iobuf_parser& in) {
auto ntp = adl<model::ntp>{}.from(in);
auto ops = adl<std::vector<cluster::backend_operation>>{}.from(in);
auto ops = adl<ss::chunked_fifo<cluster::backend_operation>>{}.from(in);
auto status = adl<cluster::reconciliation_status>{}.from(in);
auto error = adl<cluster::errc>{}.from(in);

return cluster::ntp_reconciliation_state(
std::move(ntp), std::move(ops), status, error);
return {std::move(ntp), std::move(ops), status, error};
}

void adl<cluster::create_partitions_configuration>::to(
Expand Down
61 changes: 50 additions & 11 deletions src/v/cluster/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -2763,23 +2763,23 @@ class ntp_reconciliation_state

// success case
ntp_reconciliation_state(
model::ntp, std::vector<backend_operation>, reconciliation_status);
model::ntp, ss::chunked_fifo<backend_operation>, reconciliation_status);

// error
ntp_reconciliation_state(model::ntp, cluster::errc);

ntp_reconciliation_state(
model::ntp,
std::vector<backend_operation>,
ss::chunked_fifo<backend_operation>,
reconciliation_status,
cluster::errc);

const model::ntp& ntp() const { return _ntp; }
const std::vector<backend_operation>& pending_operations() const {
const ss::chunked_fifo<backend_operation>& pending_operations() const {
return _backend_operations;
}

std::vector<backend_operation>& pending_operations() {
ss::chunked_fifo<backend_operation>& pending_operations() {
return _backend_operations;
}

Expand All @@ -2788,9 +2788,28 @@ class ntp_reconciliation_state
std::error_code error() const { return make_error_code(_error); }
errc cluster_errc() const { return _error; }

friend bool
operator==(const ntp_reconciliation_state&, const ntp_reconciliation_state&)
= default;
friend bool operator==(
const ntp_reconciliation_state& lhs,
const ntp_reconciliation_state& rhs) {
return lhs._ntp == rhs._ntp && lhs._status == rhs._status
&& lhs._error == rhs._error
&& lhs._backend_operations.size()
== rhs._backend_operations.size()
&& std::equal(
lhs._backend_operations.begin(),
lhs._backend_operations.end(),
rhs._backend_operations.begin());
};

ntp_reconciliation_state copy() const {
ss::chunked_fifo<backend_operation> backend_operations;
backend_operations.reserve(_backend_operations.size());
std::copy(
_backend_operations.begin(),
_backend_operations.end(),
std::back_inserter(backend_operations));
return {_ntp, std::move(backend_operations), _status, _error};
}

friend std::ostream&
operator<<(std::ostream&, const ntp_reconciliation_state&);
Expand All @@ -2801,19 +2820,29 @@ class ntp_reconciliation_state

private:
model::ntp _ntp;
std::vector<backend_operation> _backend_operations;
ss::chunked_fifo<backend_operation> _backend_operations;
reconciliation_status _status;
errc _error;
};

struct node_backend_operations {
node_backend_operations(
model::node_id id, std::vector<backend_operation> ops)
model::node_id id, ss::chunked_fifo<backend_operation> ops)
: node_id(id)
, backend_operations(std::move(ops)) {}

model::node_id node_id;
std::vector<backend_operation> backend_operations;
ss::chunked_fifo<backend_operation> backend_operations;

node_backend_operations copy() const {
ss::chunked_fifo<backend_operation> b_ops;
b_ops.reserve(backend_operations.size());
std::copy(
backend_operations.begin(),
backend_operations.end(),
std::back_inserter(b_ops));
return {node_id, std::move(b_ops)};
}
};

struct node_error {
Expand All @@ -2826,7 +2855,7 @@ struct node_error {
};

struct global_reconciliation_state {
absl::node_hash_map<model::ntp, std::vector<node_backend_operations>>
absl::node_hash_map<model::ntp, ss::chunked_fifo<node_backend_operations>>
ntp_backend_operations;
std::vector<node_error> node_errors;
};
Expand Down Expand Up @@ -2868,6 +2897,16 @@ struct reconciliation_state_reply
return o;
}

reconciliation_state_reply copy() const {
std::vector<ntp_reconciliation_state> results_cp;
results_cp.reserve(results.size());
for (auto& r : results) {
results_cp.push_back(r.copy());
}

return reconciliation_state_reply{.results = std::move(results_cp)};
}

auto serde_fields() { return std::tie(results); }
};

Expand Down
35 changes: 31 additions & 4 deletions src/v/compat/cluster_compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,37 @@ GEN_COMPAT_CHECK(
{ json_write(ntps); },
{ json_read(ntps); });

GEN_COMPAT_CHECK(
cluster::reconciliation_state_reply,
{ json_write(results); },
{ json_read(results); });
template<>
struct compat_check<cluster::reconciliation_state_reply> {
static constexpr std::string_view name = "reconciliation_state_reply";

static std::vector<cluster::reconciliation_state_reply>
create_test_cases() {
return generate_instances<cluster::reconciliation_state_reply>();
}

static void to_json(
cluster::reconciliation_state_reply obj,
json::Writer<json::StringBuffer>& wr) {
json::write_member(wr, "results", obj.results);
}

static cluster::reconciliation_state_reply from_json(json::Value& rd) {
cluster::reconciliation_state_reply obj;
json::read_member(rd, "results", obj.results);
return obj;
}

static std::vector<compat_binary>
to_binary(cluster::reconciliation_state_reply obj) {
return compat_binary::serde_and_adl(std::move(obj));
}

static void
check(cluster::reconciliation_state_reply obj, compat_binary test) {
verify_adl_or_serde(std::move(obj), std::move(test));
}
};

GEN_COMPAT_CHECK(
cluster::finish_partition_update_request,
Expand Down
6 changes: 3 additions & 3 deletions src/v/compat/cluster_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ struct instance_generator<cluster::ntp_reconciliation_state> {
static cluster::ntp_reconciliation_state random() {
return {
model::random_ntp(),
tests::random_vector([] {
tests::random_chunked_fifo([] {
return instance_generator<cluster::backend_operation>::random();
}),
random_generators::random_choice(
Expand All @@ -548,7 +548,7 @@ struct instance_generator<cluster::ntp_reconciliation_state> {
}

static std::vector<cluster::ntp_reconciliation_state> limits() {
return {{}};
return {};
}
};

Expand All @@ -564,7 +564,7 @@ struct instance_generator<cluster::reconciliation_state_reply> {
}

static std::vector<cluster::reconciliation_state_reply> limits() {
return {{}};
return {};
}
};

Expand Down
2 changes: 1 addition & 1 deletion src/v/compat/cluster_json.h
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ inline void rjson_serialize(
inline void
read_value(json::Value const& rd, cluster::ntp_reconciliation_state& obj) {
model::ntp ntp;
std::vector<cluster::backend_operation> operations;
ss::chunked_fifo<cluster::backend_operation> operations;
cluster::reconciliation_status status;
cluster::errc error;

Expand Down
11 changes: 8 additions & 3 deletions src/v/redpanda/admin_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2758,9 +2758,14 @@ admin_server::get_reconfigurations_handler(std::unique_ptr<ss::http::request>) {
reconfiguration_states.error()),
ss::http::reply::status_type::service_unavailable);
}
// we are forced to use shared pointer as underlying chunked_fifo is not
// copyable
auto reconciliations_ptr
= ss::make_lw_shared<cluster::global_reconciliation_state>(
std::move(reconciliations));
co_return ss::json::json_return_type(ss::json::stream_range_as_array(
std::move(reconfiguration_states.value()),
[reconciliations = std::move(reconciliations)](auto& s) {
[reconciliations = std::move(reconciliations_ptr)](auto& s) {
reconfiguration r;
r.ns = s.ntp.ns;
r.topic = s.ntp.tp.topic;
Expand Down Expand Up @@ -2795,8 +2800,8 @@ admin_server::get_reconfigurations_handler(std::unique_ptr<ss::http::request>) {
r.bytes_left_to_move = s.current_partition_size;
}

auto it = reconciliations.ntp_backend_operations.find(s.ntp);
if (it != reconciliations.ntp_backend_operations.end()) {
auto it = reconciliations->ntp_backend_operations.find(s.ntp);
if (it != reconciliations->ntp_backend_operations.end()) {
for (auto& node_ops : it->second) {
seastar::httpd::partition_json::
partition_reconciliation_status per_node_status;
Expand Down

0 comments on commit c4826fd

Please sign in to comment.