diff --git a/src/v/cluster/controller_api.cc b/src/v/cluster/controller_api.cc index 544e76a8988d9..88d13f42b336e 100644 --- a/src/v/cluster/controller_api.cc +++ b/src/v/cluster/controller_api.cc @@ -27,6 +27,7 @@ #include "rpc/connection_cache.h" #include "ssx/future-util.h" +#include #include #include #include @@ -142,7 +143,7 @@ controller_api::get_reconciliation_state(model::topic_namespace_view tp_ns) { co_return co_await get_reconciliation_state(std::move(ntps)); } -ss::future> +ss::future> controller_api::get_remote_core_deltas(model::ntp ntp, ss::shard_id shard) { return _backend.invoke_on( shard, [ntp = std::move(ntp)](controller_backend& backend) { @@ -164,7 +165,7 @@ controller_api::get_reconciliation_state(model::ntp ntp) { std::move(ntp), errc::partition_not_exists); } // query controller backends for in progress operations - std::vector ops; + ss::chunked_fifo ops; const auto shards = boost::irange(0, ss::smp::count); for (auto shard : shards) { auto local_deltas = co_await get_remote_core_deltas(ntp, shard); @@ -268,7 +269,7 @@ controller_api::get_reconciliation_state( } vassert(result.value().size() == 1, "result MUST contain single ntp"); - return ret_t(result.value().front()); + return ret_t(std::move(result.value().front())); }); } diff --git a/src/v/cluster/controller_api.h b/src/v/cluster/controller_api.h index 5bc317da24306..7026468ac71c4 100644 --- a/src/v/cluster/controller_api.h +++ b/src/v/cluster/controller_api.h @@ -20,6 +20,7 @@ #include "seastarx.h" #include +#include #include #include @@ -88,7 +89,7 @@ class controller_api { std::optional shard_for(const model::ntp& ntp) const; private: - ss::future> + ss::future> get_remote_core_deltas(model::ntp, ss::shard_id); model::node_id _self; diff --git a/src/v/cluster/controller_backend.cc b/src/v/cluster/controller_backend.cc index 9525445b74603..eccba78a13045 100644 --- a/src/v/cluster/controller_backend.cc +++ b/src/v/cluster/controller_backend.cc @@ -46,6 +46,7 @@ #include #include #include +#include #include #include @@ -589,7 +590,6 @@ controller_backend::deltas_t calculate_bootstrap_deltas( // *it is not included auto start = it.base(); - result_delta.reserve(std::distance(start, deltas.end())); std::move(start, deltas.end(), std::back_inserter(result_delta)); return result_delta; } @@ -2020,10 +2020,14 @@ ss::future<> controller_backend::delete_partition( co_await _partition_manager.local().remove(ntp, mode); } -std::vector +ss::chunked_fifo controller_backend::list_ntp_deltas(const model::ntp& ntp) const { if (auto it = _topic_deltas.find(ntp); it != _topic_deltas.end()) { - return it->second; + ss::chunked_fifo ret; + ret.reserve(it->second.size()); + std::copy( + it->second.begin(), it->second.end(), std::back_inserter(ret)); + return ret; } return {}; diff --git a/src/v/cluster/controller_backend.h b/src/v/cluster/controller_backend.h index 8cfca654e82ad..10bb5ed178969 100644 --- a/src/v/cluster/controller_backend.h +++ b/src/v/cluster/controller_backend.h @@ -23,6 +23,7 @@ #include "storage/api.h" #include +#include #include #include @@ -30,6 +31,7 @@ #include #include +#include #include namespace cluster { @@ -231,7 +233,7 @@ class controller_backend friend std::ostream& operator<<(std::ostream&, const delta_metadata&); }; - using deltas_t = std::vector; + using deltas_t = std::deque; using results_t = std::vector; controller_backend( ss::sharded&, @@ -247,7 +249,7 @@ class controller_backend ss::future<> stop(); ss::future<> start(); - std::vector list_ntp_deltas(const model::ntp&) const; + ss::chunked_fifo list_ntp_deltas(const model::ntp&) const; private: struct cross_shard_move_request { @@ -437,6 +439,6 @@ class controller_backend ss::metrics::metric_groups _metrics; }; -std::vector calculate_bootstrap_deltas( - model::node_id self, const std::vector&); +controller_backend::deltas_t calculate_bootstrap_deltas( + model::node_id self, const controller_backend::deltas_t&); } // namespace cluster diff --git a/src/v/cluster/tests/controller_backend_test.cc b/src/v/cluster/tests/controller_backend_test.cc index ae1cfebcff9dd..2b40f68d2a287 100644 --- a/src/v/cluster/tests/controller_backend_test.cc +++ b/src/v/cluster/tests/controller_backend_test.cc @@ -31,7 +31,7 @@ make_assignment(std::vector replicas) { using op_t = cluster::topic_table::delta::op_type; using delta_t = cluster::topic_table::delta; using meta_t = cluster::controller_backend::delta_metadata; -using deltas_t = std::vector; +using deltas_t = std::deque; meta_t make_delta( int64_t o, @@ -142,11 +142,14 @@ SEASTAR_THREAD_TEST_CASE(update_including_current_node) { current_node, std::move(d_1)); BOOST_REQUIRE_EQUAL(deltas.size(), 3); - BOOST_REQUIRE_EQUAL(deltas[0].delta.offset, recreate_current.delta.offset); BOOST_REQUIRE_EQUAL( - deltas[1].delta.offset, update_with_current.delta.offset); + deltas.begin()->delta.offset, recreate_current.delta.offset); BOOST_REQUIRE_EQUAL( - deltas[2].delta.offset, finish_update_with_current.delta.offset); + std::next(deltas.begin())->delta.offset, + update_with_current.delta.offset); + BOOST_REQUIRE_EQUAL( + std::next(deltas.begin(), 2)->delta.offset, + finish_update_with_current.delta.offset); } SEASTAR_THREAD_TEST_CASE(update_excluding_current_node) { @@ -196,9 +199,10 @@ SEASTAR_THREAD_TEST_CASE(move_back_to_current_node) { BOOST_REQUIRE_EQUAL(deltas.size(), 2); BOOST_REQUIRE_EQUAL( - deltas[0].delta.offset, update_with_current_2.delta.offset); + deltas.begin()->delta.offset, update_with_current_2.delta.offset); BOOST_REQUIRE_EQUAL( - deltas[1].delta.offset, finish_update_with_current_2.delta.offset); + std::next(deltas.begin())->delta.offset, + finish_update_with_current_2.delta.offset); } SEASTAR_THREAD_TEST_CASE(move_back_to_current_node_not_finished) { @@ -218,5 +222,5 @@ SEASTAR_THREAD_TEST_CASE(move_back_to_current_node_not_finished) { BOOST_REQUIRE_EQUAL(deltas.size(), 1); BOOST_REQUIRE_EQUAL( - deltas[0].delta.offset, update_with_current_2.delta.offset); + deltas.begin()->delta.offset, update_with_current_2.delta.offset); } diff --git a/src/v/cluster/tests/serialization_rt_test.cc b/src/v/cluster/tests/serialization_rt_test.cc index 59941d5c34839..3eea1670a7cb6 100644 --- a/src/v/cluster/tests/serialization_rt_test.cc +++ b/src/v/cluster/tests/serialization_rt_test.cc @@ -1428,7 +1428,7 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) { roundtrip_test(data); } { - std::vector backend_operations; + ss::chunked_fifo backend_operations; for (int i = 0, mi = random_generators::get_int(10); i < mi; i++) { backend_operations.push_back(cluster::backend_operation{ .source_shard = random_generators::get_int(1000), @@ -1436,18 +1436,19 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) { .type = cluster::topic_table_delta::op_type::del, }); } + cluster::ntp_reconciliation_state data{ model::random_ntp(), - backend_operations, + std::move(backend_operations), cluster::reconciliation_status::error, cluster::errc::feature_disabled, }; - roundtrip_test(data); + roundtrip_test(std::move(data)); } { std::vector results; for (int i = 0, mi = random_generators::get_int(10); i < mi; i++) { - std::vector backend_operations; + ss::chunked_fifo backend_operations; for (int j = 0, mj = random_generators::get_int(10); j < mj; j++) { backend_operations.push_back(cluster::backend_operation{ .source_shard = random_generators::get_int(1000), @@ -1455,17 +1456,16 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) { .type = cluster::topic_table_delta::op_type::del, }); } - results.push_back(cluster::ntp_reconciliation_state{ + results.emplace_back( model::random_ntp(), - backend_operations, + std::move(backend_operations), cluster::reconciliation_status::error, - cluster::errc::feature_disabled, - }); + cluster::errc::feature_disabled); } cluster::reconciliation_state_reply data{ - .results = results, + .results = std::move(results), }; - roundtrip_test(data); + roundtrip_test(std::move(data)); } { cluster::create_acls_cmd_data create_acls_data{}; diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index 5184637dfa0ec..61aeb9c69ab65 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -245,14 +245,14 @@ topic_table_delta::get_replica_revision(model::node_id replica) const { ntp_reconciliation_state::ntp_reconciliation_state( model::ntp ntp, - std::vector ops, + ss::chunked_fifo ops, reconciliation_status status) : ntp_reconciliation_state( std::move(ntp), std::move(ops), status, errc::success) {} ntp_reconciliation_state::ntp_reconciliation_state( model::ntp ntp, - std::vector ops, + ss::chunked_fifo ops, reconciliation_status status, errc ec) : _ntp(std::move(ntp)) @@ -1546,12 +1546,11 @@ void adl::to( cluster::ntp_reconciliation_state adl::from(iobuf_parser& in) { auto ntp = adl{}.from(in); - auto ops = adl>{}.from(in); + auto ops = adl>{}.from(in); auto status = adl{}.from(in); auto error = adl{}.from(in); - return cluster::ntp_reconciliation_state( - std::move(ntp), std::move(ops), status, error); + return {std::move(ntp), std::move(ops), status, error}; } void adl::to( diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index bce55d76e21af..752c939e6b2fd 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -2763,23 +2763,23 @@ class ntp_reconciliation_state // success case ntp_reconciliation_state( - model::ntp, std::vector, reconciliation_status); + model::ntp, ss::chunked_fifo, reconciliation_status); // error ntp_reconciliation_state(model::ntp, cluster::errc); ntp_reconciliation_state( model::ntp, - std::vector, + ss::chunked_fifo, reconciliation_status, cluster::errc); const model::ntp& ntp() const { return _ntp; } - const std::vector& pending_operations() const { + const ss::chunked_fifo& pending_operations() const { return _backend_operations; } - std::vector& pending_operations() { + ss::chunked_fifo& pending_operations() { return _backend_operations; } @@ -2788,9 +2788,28 @@ class ntp_reconciliation_state std::error_code error() const { return make_error_code(_error); } errc cluster_errc() const { return _error; } - friend bool - operator==(const ntp_reconciliation_state&, const ntp_reconciliation_state&) - = default; + friend bool operator==( + const ntp_reconciliation_state& lhs, + const ntp_reconciliation_state& rhs) { + return lhs._ntp == rhs._ntp && lhs._status == rhs._status + && lhs._error == rhs._error + && lhs._backend_operations.size() + == rhs._backend_operations.size() + && std::equal( + lhs._backend_operations.begin(), + lhs._backend_operations.end(), + rhs._backend_operations.begin()); + }; + + ntp_reconciliation_state copy() const { + ss::chunked_fifo backend_operations; + backend_operations.reserve(_backend_operations.size()); + std::copy( + _backend_operations.begin(), + _backend_operations.end(), + std::back_inserter(backend_operations)); + return {_ntp, std::move(backend_operations), _status, _error}; + } friend std::ostream& operator<<(std::ostream&, const ntp_reconciliation_state&); @@ -2801,19 +2820,29 @@ class ntp_reconciliation_state private: model::ntp _ntp; - std::vector _backend_operations; + ss::chunked_fifo _backend_operations; reconciliation_status _status; errc _error; }; struct node_backend_operations { node_backend_operations( - model::node_id id, std::vector ops) + model::node_id id, ss::chunked_fifo ops) : node_id(id) , backend_operations(std::move(ops)) {} model::node_id node_id; - std::vector backend_operations; + ss::chunked_fifo backend_operations; + + node_backend_operations copy() const { + ss::chunked_fifo b_ops; + b_ops.reserve(backend_operations.size()); + std::copy( + backend_operations.begin(), + backend_operations.end(), + std::back_inserter(b_ops)); + return {node_id, std::move(b_ops)}; + } }; struct node_error { @@ -2826,7 +2855,7 @@ struct node_error { }; struct global_reconciliation_state { - absl::node_hash_map> + absl::node_hash_map> ntp_backend_operations; std::vector node_errors; }; @@ -2868,6 +2897,16 @@ struct reconciliation_state_reply return o; } + reconciliation_state_reply copy() const { + std::vector results_cp; + results_cp.reserve(results.size()); + for (auto& r : results) { + results_cp.push_back(r.copy()); + } + + return reconciliation_state_reply{.results = std::move(results_cp)}; + } + auto serde_fields() { return std::tie(results); } }; diff --git a/src/v/compat/cluster_compat.h b/src/v/compat/cluster_compat.h index 9bea3b000fd86..20f0d665b09dd 100644 --- a/src/v/compat/cluster_compat.h +++ b/src/v/compat/cluster_compat.h @@ -209,10 +209,37 @@ GEN_COMPAT_CHECK( { json_write(ntps); }, { json_read(ntps); }); -GEN_COMPAT_CHECK( - cluster::reconciliation_state_reply, - { json_write(results); }, - { json_read(results); }); +template<> +struct compat_check { + static constexpr std::string_view name = "reconciliation_state_reply"; + + static std::vector + create_test_cases() { + return generate_instances(); + } + + static void to_json( + cluster::reconciliation_state_reply obj, + json::Writer& wr) { + json::write_member(wr, "results", obj.results); + } + + static cluster::reconciliation_state_reply from_json(json::Value& rd) { + cluster::reconciliation_state_reply obj; + json::read_member(rd, "results", obj.results); + return obj; + } + + static std::vector + to_binary(cluster::reconciliation_state_reply obj) { + return compat_binary::serde_and_adl(std::move(obj)); + } + + static void + check(cluster::reconciliation_state_reply obj, compat_binary test) { + verify_adl_or_serde(std::move(obj), std::move(test)); + } +}; GEN_COMPAT_CHECK( cluster::finish_partition_update_request, diff --git a/src/v/compat/cluster_generator.h b/src/v/compat/cluster_generator.h index 4863e8db0537f..79e2bb06eb726 100644 --- a/src/v/compat/cluster_generator.h +++ b/src/v/compat/cluster_generator.h @@ -537,7 +537,7 @@ struct instance_generator { static cluster::ntp_reconciliation_state random() { return { model::random_ntp(), - tests::random_vector([] { + tests::random_chunked_fifo([] { return instance_generator::random(); }), random_generators::random_choice( @@ -548,7 +548,7 @@ struct instance_generator { } static std::vector limits() { - return {{}}; + return {}; } }; @@ -564,7 +564,7 @@ struct instance_generator { } static std::vector limits() { - return {{}}; + return {}; } }; diff --git a/src/v/compat/cluster_json.h b/src/v/compat/cluster_json.h index 0bd03f8a164b8..a4d4bf1ae814f 100644 --- a/src/v/compat/cluster_json.h +++ b/src/v/compat/cluster_json.h @@ -529,7 +529,7 @@ inline void rjson_serialize( inline void read_value(json::Value const& rd, cluster::ntp_reconciliation_state& obj) { model::ntp ntp; - std::vector operations; + ss::chunked_fifo operations; cluster::reconciliation_status status; cluster::errc error; diff --git a/src/v/redpanda/admin_server.cc b/src/v/redpanda/admin_server.cc index 6a4a21c1dfd8f..33b3ebed082a8 100644 --- a/src/v/redpanda/admin_server.cc +++ b/src/v/redpanda/admin_server.cc @@ -2775,9 +2775,14 @@ admin_server::get_reconfigurations_handler(std::unique_ptr) { reconfiguration_states.error()), ss::http::reply::status_type::service_unavailable); } + // we are forced to use shared pointer as underlying chunked_fifo is not + // copyable + auto reconciliations_ptr + = ss::make_lw_shared( + std::move(reconciliations)); co_return ss::json::json_return_type(ss::json::stream_range_as_array( std::move(reconfiguration_states.value()), - [reconciliations = std::move(reconciliations)](auto& s) { + [reconciliations = std::move(reconciliations_ptr)](auto& s) { reconfiguration r; r.ns = s.ntp.ns; r.topic = s.ntp.tp.topic; @@ -2812,8 +2817,8 @@ admin_server::get_reconfigurations_handler(std::unique_ptr) { r.bytes_left_to_move = s.current_partition_size; } - auto it = reconciliations.ntp_backend_operations.find(s.ntp); - if (it != reconciliations.ntp_backend_operations.end()) { + auto it = reconciliations->ntp_backend_operations.find(s.ntp); + if (it != reconciliations->ntp_backend_operations.end()) { for (auto& node_ops : it->second) { seastar::httpd::partition_json:: partition_reconciliation_status per_node_status; diff --git a/src/v/utils/fragmented_vector.h b/src/v/utils/fragmented_vector.h index e4a6274f312f8..1d3b223e1176e 100644 --- a/src/v/utils/fragmented_vector.h +++ b/src/v/utils/fragmented_vector.h @@ -239,6 +239,8 @@ class fragmented_vector { return tmp; } + pointer operator->() const { return &_vec->operator[](_index); } + iter operator+(difference_type offset) { return iter{*this} += offset; } iter operator-(difference_type offset) { return iter{*this} -= offset; }