From b91ae5b2fd0dd39b387a1d83c56540aacd9f67e0 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Wed, 18 Oct 2023 17:30:40 +0200 Subject: [PATCH 01/10] features: add disabling_partitions feature --- src/v/features/feature_table.cc | 2 ++ src/v/features/feature_table.h | 10 +++++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/v/features/feature_table.cc b/src/v/features/feature_table.cc index a2238bec6fd11..a63a0247d1d77 100644 --- a/src/v/features/feature_table.cc +++ b/src/v/features/feature_table.cc @@ -91,6 +91,8 @@ std::string_view to_string_view(feature f) { return "idempotency_v2"; case feature::fast_partition_reconfiguration: return "fast_partition_reconfiguration"; + case feature::disabling_partitions: + return "disabling_partitions"; /* * testing features diff --git a/src/v/features/feature_table.h b/src/v/features/feature_table.h index 78832629bc0d1..1f3aa48e3a5c9 100644 --- a/src/v/features/feature_table.h +++ b/src/v/features/feature_table.h @@ -70,6 +70,7 @@ enum class feature : std::uint64_t { raft_config_serde = 1ULL << 36U, idempotency_v2 = 1ULL << 37U, fast_partition_reconfiguration = 1ULL << 38U, + disabling_partitions = 1ULL << 39U, // Dummy features for testing only test_alpha = 1ULL << 61U, @@ -342,7 +343,14 @@ constexpr static std::array feature_schema{ "fast_partition_reconfiguration", feature::fast_partition_reconfiguration, feature_spec::available_policy::always, - feature_spec::prepare_policy::always}}; + feature_spec::prepare_policy::always}, + feature_spec{ + cluster::cluster_version{11}, + "disabling_partitions", + feature::disabling_partitions, + feature_spec::available_policy::always, + feature_spec::prepare_policy::always}, +}; std::string_view to_string_view(feature); std::string_view to_string_view(feature_state::state); From 725a5510b01990f73db9d84dcb13af3834c244c9 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Wed, 18 Oct 2023 18:25:07 +0200 Subject: [PATCH 02/10] cluster: add set_topic_partitions_disabled_cmd controller command --- src/v/cluster/commands.h | 8 ++++++++ src/v/cluster/controller_log_limiter.h | 3 ++- src/v/cluster/types.cc | 11 +++++++++++ src/v/cluster/types.h | 21 +++++++++++++++++++++ 4 files changed, 42 insertions(+), 1 deletion(-) diff --git a/src/v/cluster/commands.h b/src/v/cluster/commands.h index a2ff9fd977dbf..63fbee0cc85b3 100644 --- a/src/v/cluster/commands.h +++ b/src/v/cluster/commands.h @@ -98,6 +98,7 @@ static constexpr int8_t revert_cancel_partition_move_cmd_type = 9; static constexpr int8_t topic_lifecycle_transition_cmd_type = 10; static constexpr int8_t force_partition_reconfiguration_type = 11; static constexpr int8_t update_partition_replicas_cmd_type = 12; +static constexpr int8_t set_topic_partitions_disabled_cmd_type = 13; static constexpr int8_t create_user_cmd_type = 5; static constexpr int8_t delete_user_cmd_type = 6; @@ -214,6 +215,13 @@ using force_partition_reconfiguration_cmd = controller_command< model::record_batch_type::topic_management_cmd, serde_opts::serde_only>; +using set_topic_partitions_disabled_cmd = controller_command< + int8_t, // unused + set_topic_partitions_disabled_cmd_data, + set_topic_partitions_disabled_cmd_type, + model::record_batch_type::topic_management_cmd, + serde_opts::serde_only>; + /** * new extendible version of move_partition_replicas command */ diff --git a/src/v/cluster/controller_log_limiter.h b/src/v/cluster/controller_log_limiter.h index ed05df5d5d632..812c4ef2dfac1 100644 --- a/src/v/cluster/controller_log_limiter.h +++ b/src/v/cluster/controller_log_limiter.h @@ -77,7 +77,8 @@ class controller_log_limiter { std::is_same_v || // std::is_same_v || // std::is_same_v || // - std::is_same_v) { + std::is_same_v || // + std::is_same_v) { return _topic_operations_limiter.try_throttle(); } else if constexpr ( std::is_same_v || // diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index cbd0698a6b86c..abe9781327cd0 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -1064,6 +1064,17 @@ operator<<(std::ostream& o, const force_partition_reconfiguration_cmd_data& r) { return o; } +std::ostream& +operator<<(std::ostream& o, const set_topic_partitions_disabled_cmd_data& r) { + fmt::print( + o, + "{{topic: {}, partition_id: {}, disabled: {}}}", + r.ns_tp, + r.partition_id, + r.disabled); + return o; +} + std::ostream& operator<<( std::ostream& o, const feature_update_license_update_cmd_data& fulu) { fmt::print(o, "{{redpanda_license {}}}", fulu.redpanda_license); diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index 2a8b50f94e577..917aa9c9fa063 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -2891,6 +2891,27 @@ struct move_topic_replicas_data operator<<(std::ostream&, const move_topic_replicas_data&); }; +struct set_topic_partitions_disabled_cmd_data + : serde::envelope< + set_topic_partitions_disabled_cmd_data, + serde::version<0>, + serde::compat_version<0>> { + model::topic_namespace ns_tp; + // if nullopt, applies to all partitions of the topic. + std::optional partition_id; + bool disabled = false; + + auto serde_fields() { return std::tie(ns_tp, partition_id, disabled); } + + friend bool operator==( + const set_topic_partitions_disabled_cmd_data&, + const set_topic_partitions_disabled_cmd_data&) + = default; + + friend std::ostream& + operator<<(std::ostream&, const set_topic_partitions_disabled_cmd_data&); +}; + struct feature_update_license_update_cmd_data : serde::envelope< feature_update_license_update_cmd_data, From cbd1f9099bc2d9f89e48532e3a09b8b517a915b0 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Wed, 25 Oct 2023 18:24:55 +0200 Subject: [PATCH 03/10] cluster: apply set_topic_partitions_disabled_cmd to topic_table --- src/v/cluster/controller_snapshot.cc | 6 +++ src/v/cluster/controller_snapshot.h | 9 +++- src/v/cluster/topic_table.cc | 56 +++++++++++++++++++++-- src/v/cluster/topic_table.h | 43 +++++++++++++++++ src/v/cluster/topic_updates_dispatcher.cc | 5 ++ src/v/cluster/topic_updates_dispatcher.h | 5 +- src/v/cluster/types.cc | 38 +++++++++++++++ src/v/cluster/types.h | 35 ++++++++++++++ 8 files changed, 191 insertions(+), 6 deletions(-) diff --git a/src/v/cluster/controller_snapshot.cc b/src/v/cluster/controller_snapshot.cc index 149f4daa2c2be..59aa79883adf1 100644 --- a/src/v/cluster/controller_snapshot.cc +++ b/src/v/cluster/controller_snapshot.cc @@ -125,6 +125,7 @@ ss::future<> topics_t::serde_async_write(iobuf& out) { co_await write_map_async(out, std::move(topics)); serde::write(out, highest_group_id); co_await write_map_async(out, std::move(lifecycle_markers)); + co_await write_map_async(out, std::move(disabled_partitions)); } ss::future<> @@ -136,6 +137,11 @@ topics_t::serde_async_read(iobuf_parser& in, serde::header const h) { lifecycle_markers = co_await read_map_async_nested( in, h._bytes_left_limit); + if (h._version >= 1) { + disabled_partitions + = co_await read_map_async_nested( + in, h._bytes_left_limit); + } if (in.bytes_left() > h._bytes_left_limit) { in.skip(in.bytes_left() - h._bytes_left_limit); diff --git a/src/v/cluster/controller_snapshot.h b/src/v/cluster/controller_snapshot.h index 42a3a888801eb..3fdff44ac9d35 100644 --- a/src/v/cluster/controller_snapshot.h +++ b/src/v/cluster/controller_snapshot.h @@ -113,7 +113,7 @@ struct config_t struct topics_t : public serde:: - envelope, serde::compat_version<0>> { + envelope, serde::compat_version<0>> { // NOTE: layout here is a bit different than in the topic table because it // allows more compact storage and more convenient generation of controller // backend deltas when applying the snapshot. @@ -184,6 +184,13 @@ struct topics_t nt_revision_eq> lifecycle_markers; + absl::node_hash_map< + model::topic_namespace, + topic_disabled_partitions_set, + model::topic_namespace_hash, + model::topic_namespace_eq> + disabled_partitions; + friend bool operator==(const topics_t&, const topics_t&) = default; ss::future<> serde_async_write(iobuf&); diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index f81773c0413e6..d20268825dbac 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -128,6 +128,7 @@ topic_table::do_local_delete(model::topic_namespace nt, model::offset offset) { } _topics.erase(tp); + _disabled_partitions.erase(nt); _topics_map_revision++; notify_waiters(); _probe.handle_topic_deletion(nt); @@ -620,6 +621,45 @@ topic_table::apply(force_partition_reconfiguration_cmd cmd, model::offset o) { return ss::make_ready_future(errc::success); } +ss::future +topic_table::apply(set_topic_partitions_disabled_cmd cmd, model::offset o) { + _last_applied_revision_id = model::revision_id(o); + + auto topic_it = _topics.find(cmd.value.ns_tp); + if (topic_it == _topics.end()) { + co_return errc::topic_not_exists; + } + + if (cmd.value.partition_id) { + const auto& assignments = topic_it->second.get_assignments(); + if (!assignments.contains(*cmd.value.partition_id)) { + co_return errc::partition_not_exists; + } + + auto [disabled_it, inserted] = _disabled_partitions.try_emplace( + cmd.value.ns_tp); + auto& disabled_set = disabled_it->second; + + if (cmd.value.disabled) { + disabled_set.add(*cmd.value.partition_id); + } else { + disabled_set.remove(*cmd.value.partition_id, assignments); + } + + if (disabled_set.is_empty()) { + _disabled_partitions.erase(disabled_it); + } + } else { + if (cmd.value.disabled) { + _disabled_partitions[cmd.value.ns_tp].set_topic_disabled(); + } else { + _disabled_partitions.erase(cmd.value.ns_tp); + } + } + + co_return errc::success; +} + template void incremental_update( std::optional& property, property_update> override) { @@ -858,6 +898,11 @@ topic_table::fill_snapshot(controller_snapshot& controller_snap) const { for (const auto& [ntr, lm] : _lifecycle_markers) { snap.lifecycle_markers.emplace(ntr, lm); } + + for (const auto& [ns_tp, dps] : _disabled_partitions) { + snap.disabled_partitions.emplace(ns_tp, dps); + co_await ss::coroutine::maybe_yield(); + } } // helper class to hold context needed for adding/deleting ntps when applying a @@ -1212,6 +1257,13 @@ ss::future<> topic_table::apply_snapshot( } } + // Lifecycle markers is a simple static collection without notifications + // etc, so we can just copy directly into place. + _lifecycle_markers = controller_snap.topics.lifecycle_markers; + + // Same for disabled partitions. + _disabled_partitions = controller_snap.topics.disabled_partitions; + // 2. re-calculate derived state _partition_count = 0; @@ -1223,10 +1275,6 @@ ss::future<> topic_table::apply_snapshot( // 3. notify delta waiters notify_waiters(); - // Lifecycle markers is a simple static collection without notifications - // etc, so we can just copy directly into place. - _lifecycle_markers = controller_snap.topics.lifecycle_markers; - _last_applied_revision_id = snap_revision; } diff --git a/src/v/cluster/topic_table.h b/src/v/cluster/topic_table.h index b07607c1feb45..e315e3f4ee7ef 100644 --- a/src/v/cluster/topic_table.h +++ b/src/v/cluster/topic_table.h @@ -244,6 +244,12 @@ class topic_table { nt_revision_hash, nt_revision_eq>; + using disabled_partitions_t = absl::node_hash_map< + model::topic_namespace, + topic_disabled_partitions_set, + model::topic_namespace_hash, + model::topic_namespace_eq>; + using delta_range_t = boost::iterator_range::const_iterator>; using delta_cb_t = ss::noncopyable_function; @@ -320,6 +326,8 @@ class topic_table { apply(force_partition_reconfiguration_cmd, model::offset); ss::future apply(update_partition_replicas_cmd, model::offset); + ss::future + apply(set_topic_partitions_disabled_cmd, model::offset); ss::future<> fill_snapshot(controller_snapshot&) const; ss::future<> @@ -493,6 +501,40 @@ class topic_table { return _lifecycle_markers; } + const disabled_partitions_t& get_disabled_partitions() const { + return _disabled_partitions; + } + + const topic_disabled_partitions_set* + get_topic_disabled_set(model::topic_namespace_view ns_tp) const { + auto it = _disabled_partitions.find(ns_tp); + if (it == _disabled_partitions.end()) { + return nullptr; + } + return &it->second; + } + + bool is_disabled(model::topic_namespace_view ns_tp) const { + auto it = _disabled_partitions.find(ns_tp); + if (it == _disabled_partitions.end()) { + return false; + } + return it->second.is_topic_disabled(); + } + + bool is_disabled( + model::topic_namespace_view ns_tp, model::partition_id p_id) const { + auto it = _disabled_partitions.find(ns_tp); + if (it == _disabled_partitions.end()) { + return false; + } + return it->second.is_disabled(p_id); + } + + bool is_disabled(const model::ntp& ntp) const { + return is_disabled(model::topic_namespace_view{ntp}, ntp.tp.partition); + } + auto topics_iterator_begin() const { return stable_iterator< underlying_t::const_iterator, @@ -542,6 +584,7 @@ class topic_table { underlying_t _topics; lifecycle_markers_t _lifecycle_markers; + disabled_partitions_t _disabled_partitions; size_t _partition_count{0}; updates_t _updates_in_progress; diff --git a/src/v/cluster/topic_updates_dispatcher.cc b/src/v/cluster/topic_updates_dispatcher.cc index 3d971e32bb48c..326ee5f0953c3 100644 --- a/src/v/cluster/topic_updates_dispatcher.cc +++ b/src/v/cluster/topic_updates_dispatcher.cc @@ -469,6 +469,11 @@ ss::future topic_updates_dispatcher::apply( co_return ec; } +ss::future topic_updates_dispatcher::apply( + set_topic_partitions_disabled_cmd cmd, model::offset base_offset) { + co_return co_await dispatch_updates_to_cores(cmd, base_offset); +} + topic_updates_dispatcher::in_progress_map topic_updates_dispatcher::collect_in_progress( const model::topic_namespace& tp_ns, diff --git a/src/v/cluster/topic_updates_dispatcher.h b/src/v/cluster/topic_updates_dispatcher.h index f9518f103f110..470f18a234c34 100644 --- a/src/v/cluster/topic_updates_dispatcher.h +++ b/src/v/cluster/topic_updates_dispatcher.h @@ -73,7 +73,8 @@ class topic_updates_dispatcher { move_topic_replicas_cmd, revert_cancel_partition_move_cmd, force_partition_reconfiguration_cmd, - update_partition_replicas_cmd>(); + update_partition_replicas_cmd, + set_topic_partitions_disabled_cmd>(); bool is_batch_applicable(const model::record_batch& batch) const { return batch.header().type @@ -106,6 +107,8 @@ class topic_updates_dispatcher { apply(force_partition_reconfiguration_cmd, model::offset); ss::future apply(update_partition_replicas_cmd, model::offset); + ss::future + apply(set_topic_partitions_disabled_cmd, model::offset); using ntp_leader = std::pair; diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index abe9781327cd0..74f96e6ec46d7 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -1330,6 +1330,44 @@ operator<<(std::ostream& o, const update_partition_replicas_cmd_data& data) { return o; } +std::ostream& +operator<<(std::ostream& o, const topic_disabled_partitions_set& disabled) { + if (disabled.partitions) { + fmt::print( + o, + "{{partitions: {}}}", + std::vector( + disabled.partitions->begin(), disabled.partitions->end())); + } else { + fmt::print(o, "{{partitions: all}}"); + } + return o; +} + +void topic_disabled_partitions_set::add(model::partition_id id) { + if (partitions) { + partitions->insert(id); + } else { + // do nothing, std::nullopt means all partitions are already + // disabled. + } +} + +void topic_disabled_partitions_set::remove( + model::partition_id id, const assignments_set& all_partitions) { + if (!all_partitions.contains(id)) { + return; + } + if (!partitions) { + partitions = absl::node_hash_set{}; + partitions->reserve(all_partitions.size()); + for (const auto& p : all_partitions) { + partitions->insert(p.id); + } + } + partitions->erase(id); +} + } // namespace cluster namespace reflection { diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index 917aa9c9fa063..92b9075bf6463 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -4377,6 +4377,41 @@ struct update_partition_replicas_cmd_data operator<<(std::ostream&, const update_partition_replicas_cmd_data&); }; +struct topic_disabled_partitions_set + : serde::envelope< + topic_disabled_partitions_set, + serde::version<0>, + serde::compat_version<0>> { + // std::nullopt means that the topic is fully disabled. + // This is different from the partitions set containing all partition ids, + // as it will also affect partitions created later with create_partitions + // request. + std::optional> partitions; + + topic_disabled_partitions_set() noexcept + : partitions(absl::node_hash_set{}) {} + + friend bool operator==( + const topic_disabled_partitions_set&, + const topic_disabled_partitions_set&) + = default; + + auto serde_fields() { return std::tie(partitions); } + + friend std::ostream& + operator<<(std::ostream&, const topic_disabled_partitions_set&); + + bool is_disabled(model::partition_id id) const { + return !partitions || partitions->contains(id); + } + bool is_topic_disabled() const { return !partitions.has_value(); } + bool is_empty() const { return partitions && partitions->empty(); } + + void add(model::partition_id id); + void remove(model::partition_id id, const assignments_set& all_partitions); + void set_topic_disabled() { partitions = std::nullopt; } +}; + } // namespace cluster namespace std { template<> From 979afe69c31af3d8163072b93b01dcf3d07ac81b Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Wed, 25 Oct 2023 18:26:48 +0200 Subject: [PATCH 04/10] c/topics_frontend: add set_topic_partitions_disabled method --- src/v/cluster/topics_frontend.cc | 48 ++++++++++++++++++++++++++++++++ src/v/cluster/topics_frontend.h | 11 ++++++++ 2 files changed, 59 insertions(+) diff --git a/src/v/cluster/topics_frontend.cc b/src/v/cluster/topics_frontend.cc index 86d2e25e04c3c..472adc139810b 100644 --- a/src/v/cluster/topics_frontend.cc +++ b/src/v/cluster/topics_frontend.cc @@ -1011,6 +1011,54 @@ ss::future> topics_frontend::create_partitions( co_return result; } +ss::future topics_frontend::set_topic_partitions_disabled( + model::topic_namespace_view ns_tp, + std::optional p_id, + bool disabled, + model::timeout_clock::time_point timeout) { + if (!_features.local().is_active(features::feature::disabling_partitions)) { + co_return errc::feature_disabled; + } + + auto r = co_await stm_linearizable_barrier(timeout); + if (!r) { + co_return r.error(); + } + + // pre-replicate checks + + if (p_id) { + if (!_topics.local().contains(ns_tp, *p_id)) { + co_return errc::partition_not_exists; + } + if (_topics.local().is_disabled(ns_tp, *p_id) == disabled) { + // no-op + co_return errc::success; + } + } else { + if (!_topics.local().contains(ns_tp)) { + co_return errc::topic_not_exists; + } + if (_topics.local().is_disabled(ns_tp) == disabled) { + // no-op + co_return errc::success; + } + } + + // replicate the command + + set_topic_partitions_disabled_cmd cmd( + 0, // unused + set_topic_partitions_disabled_cmd_data{ + .ns_tp = model::topic_namespace{ns_tp}, + .partition_id = p_id, + .disabled = disabled, + }); + + co_return co_await replicate_and_wait( + _stm, _features, _as, std::move(cmd), timeout); +} + ss::future topics_frontend::validate_shard(model::node_id node, uint32_t shard) const { return _allocator.invoke_on( diff --git a/src/v/cluster/topics_frontend.h b/src/v/cluster/topics_frontend.h index 807a722fbeb79..425ec492d8f5d 100644 --- a/src/v/cluster/topics_frontend.h +++ b/src/v/cluster/topics_frontend.h @@ -138,6 +138,17 @@ class topics_frontend { std::vector, model::timeout_clock::time_point); + /// if partition_id is nullopt, the disabled flag is applied to all + /// partitions of the topic. + /// + /// The method is idempotent, i.e. if the disabled flag already has the + /// desired value, the call is ignored. + ss::future set_topic_partitions_disabled( + model::topic_namespace_view, + std::optional, + bool disabled, + model::timeout_clock::time_point); + ss::future validate_shard(model::node_id node, uint32_t shard) const; ss::future>> From e08490df58f26c3fc8c43babd57a64078f24dfa2 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Fri, 20 Oct 2023 14:42:13 +0200 Subject: [PATCH 05/10] admin: add POST /v1/cluster/partitions/{ns}/{topic}[/{id}] handlers Currently the only functionality is setting topic/partitions disabled --- src/v/redpanda/admin/api-doc/cluster.json | 62 +++++++++++++++ src/v/redpanda/admin_server.cc | 94 +++++++++++++++++++++++ src/v/redpanda/admin_server.h | 8 ++ 3 files changed, 164 insertions(+) diff --git a/src/v/redpanda/admin/api-doc/cluster.json b/src/v/redpanda/admin/api-doc/cluster.json index e9be24af7dcbe..c687d94841904 100644 --- a/src/v/redpanda/admin/api-doc/cluster.json +++ b/src/v/redpanda/admin/api-doc/cluster.json @@ -66,6 +66,68 @@ "parameters": [] } ] + }, + { + "path": "/v1/cluster/partitions/{namespace}/{topic}", + "operations": [ + { + "method": "POST", + "summary": "Disable/enable all partitions of a topic", + "nickname": "post_cluster_partitions_topic", + "type": "void", + "produces": [ + "application/json" + ], + "parameters": [ + { + "name": "namespace", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "topic", + "in": "path", + "required": true, + "type": "string" + } + ] + } + ] + }, + { + "path": "/v1/cluster/partitions/{namespace}/{topic}/{partition}", + "operations": [ + { + "method": "POST", + "summary": "Disable/enable a single partition", + "nickname": "post_cluster_partitions_topic_partition", + "type": "void", + "produces": [ + "application/json" + ], + "parameters": [ + { + "name": "namespace", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "topic", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "partition", + "in": "path", + "required": true, + "type": "long" + } + ] + } + ] } ], "models": { diff --git a/src/v/redpanda/admin_server.cc b/src/v/redpanda/admin_server.cc index c8540f2c816fe..7ed36a3e83ec7 100644 --- a/src/v/redpanda/admin_server.cc +++ b/src/v/redpanda/admin_server.cc @@ -4931,6 +4931,84 @@ admin_server::cancel_all_partitions_reconfigs_handler( co_return ss::json::json_return_type( co_await map_partition_results(std::move(res.value()))); } + +static json::validator make_post_cluster_partitions_validator() { + const std::string schema = R"( +{ + "type": "object", + "properties": { + "disabled": { + "type": "boolean" + } + }, + "additionalProperties": false, + "required": ["disabled"] +} +)"; + return json::validator(schema); +} + +ss::future +admin_server::post_cluster_partitions_topic_handler( + std::unique_ptr req) { + if (need_redirect_to_leader(model::controller_ntp, _metadata_cache)) { + // In order that we can do a reliably ordered validation of + // the request (and drop no-op requests), run on controller leader; + throw co_await redirect_to_leader(*req, model::controller_ntp); + } + + auto ns_tp = model::topic_namespace{ + model::ns{req->param["namespace"]}, model::topic{req->param["topic"]}}; + + static thread_local auto body_validator( + make_post_cluster_partitions_validator()); + auto doc = co_await parse_json_body(req.get()); + apply_validator(body_validator, doc); + bool disabled = doc["disabled"].GetBool(); + + std::error_code err + = co_await _controller->get_topics_frontend() + .local() + .set_topic_partitions_disabled( + ns_tp, std::nullopt, disabled, model::timeout_clock::now() + 5s); + if (err) { + co_await throw_on_error(*req, err, model::controller_ntp); + } + + co_return ss::json::json_void(); +} + +ss::future +admin_server::post_cluster_partitions_topic_partition_handler( + std::unique_ptr req) { + if (need_redirect_to_leader(model::controller_ntp, _metadata_cache)) { + // In order that we can do a reliably ordered validation of + // the request (and drop no-op requests), run on controller leader; + throw co_await redirect_to_leader(*req, model::controller_ntp); + } + + auto ntp = parse_ntp_from_request(req->param); + + static thread_local auto body_validator( + make_post_cluster_partitions_validator()); + auto doc = co_await parse_json_body(req.get()); + apply_validator(body_validator, doc); + bool disabled = doc["disabled"].GetBool(); + + std::error_code err = co_await _controller->get_topics_frontend() + .local() + .set_topic_partitions_disabled( + model::topic_namespace_view{ntp}, + ntp.tp.partition, + disabled, + model::timeout_clock::now() + 5s); + if (err) { + co_await throw_on_error(*req, err, model::controller_ntp); + } + + co_return ss::json::json_void(); +} + void admin_server::register_cluster_routes() { register_route( ss::httpd::cluster_json::get_cluster_health_overview, @@ -5009,6 +5087,22 @@ void admin_server::register_cluster_routes() { } return ss::json::json_return_type(ss::json::json_void()); }); + + register_cluster_partitions_routes(); +} + +void admin_server::register_cluster_partitions_routes() { + register_route( + ss::httpd::cluster_json::post_cluster_partitions_topic, + [this](std::unique_ptr req) { + return post_cluster_partitions_topic_handler(std::move(req)); + }); + register_route( + ss::httpd::cluster_json::post_cluster_partitions_topic_partition, + [this](std::unique_ptr req) { + return post_cluster_partitions_topic_partition_handler( + std::move(req)); + }); } ss::future admin_server::sync_local_state_handler( diff --git a/src/v/redpanda/admin_server.h b/src/v/redpanda/admin_server.h index 53f59b02089e3..e4fa8a8041ae7 100644 --- a/src/v/redpanda/admin_server.h +++ b/src/v/redpanda/admin_server.h @@ -391,6 +391,7 @@ class admin_server { void register_usage_routes(); void register_self_test_routes(); void register_cluster_routes(); + void register_cluster_partitions_routes(); void register_shadow_indexing_routes(); void register_wasm_transform_routes(); @@ -482,6 +483,13 @@ class admin_server { cancel_all_partitions_reconfigs_handler( std::unique_ptr); + /// Cluster partition routes + ss::future + post_cluster_partitions_topic_handler(std::unique_ptr); + ss::future + post_cluster_partitions_topic_partition_handler( + std::unique_ptr); + /// Shadow indexing routes ss::future sync_local_state_handler(std::unique_ptr); From 15f309763a105c7cb02fc38df9fb0f3ccb000ef4 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Fri, 3 Nov 2023 14:43:10 +0100 Subject: [PATCH 06/10] admin: add disabled flag to existing get_partition results --- src/v/redpanda/admin/api-doc/partition.json | 3 +++ src/v/redpanda/admin_server.cc | 7 +++++++ 2 files changed, 10 insertions(+) diff --git a/src/v/redpanda/admin/api-doc/partition.json b/src/v/redpanda/admin/api-doc/partition.json index c254fefe9147c..b5ebbddbd1fa4 100644 --- a/src/v/redpanda/admin/api-doc/partition.json +++ b/src/v/redpanda/admin/api-doc/partition.json @@ -455,6 +455,9 @@ "type": "assignment" }, "description": "Replica assignments" + }, + "disabled": { + "type": "boolean" } } }, diff --git a/src/v/redpanda/admin_server.cc b/src/v/redpanda/admin_server.cc index 7ed36a3e83ec7..363c10aa2719d 100644 --- a/src/v/redpanda/admin_server.cc +++ b/src/v/redpanda/admin_server.cc @@ -3431,6 +3431,8 @@ admin_server::get_partition_handler(std::unique_ptr req) { p.leader_id = *leader; } + p.disabled = _controller->get_topics_state().local().is_disabled(ntp); + return _controller->get_api() .local() .get_reconciliation_state(ntp) @@ -3465,6 +3467,10 @@ admin_server::get_topic_partitions_handler( std::vector partitions; const auto& assignments = tp_md->get().get_assignments(); partitions.reserve(assignments.size()); + + const auto* disabled_set + = _controller->get_topics_state().local().get_topic_disabled_set(tp_ns); + // Normal topic for (const auto& p_as : assignments) { partition_t p; @@ -3482,6 +3488,7 @@ admin_server::get_topic_partitions_handler( if (leader) { p.leader_id = *leader; } + p.disabled = disabled_set && disabled_set->is_disabled(p_as.id); partitions.push_back(std::move(p)); } From a7a2170537b6d9070d48206b4db8aef9568e3497 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Fri, 3 Nov 2023 14:57:31 +0100 Subject: [PATCH 07/10] admin: add GET /v1/cluster/partitions/{ns}/{topic} handler Viewing cluster-wide info for partitions of a single topic + disabled filter. --- src/v/redpanda/admin/api-doc/cluster.json | 78 +++++++++++++ src/v/redpanda/admin_server.cc | 134 ++++++++++++++++++++++ src/v/redpanda/admin_server.h | 2 + 3 files changed, 214 insertions(+) diff --git a/src/v/redpanda/admin/api-doc/cluster.json b/src/v/redpanda/admin/api-doc/cluster.json index c687d94841904..aa6b7aad6ca43 100644 --- a/src/v/redpanda/admin/api-doc/cluster.json +++ b/src/v/redpanda/admin/api-doc/cluster.json @@ -92,6 +92,38 @@ "type": "string" } ] + }, + { + "method": "GET", + "summary": "Get cluster-level metadata for all partitions in a topic", + "nickname": "get_cluster_partitions_topic", + "type": "array", + "items": { + "type": "cluster_partition" + }, + "produces": [ + "application/json" + ], + "parameters": [ + { + "name": "namespace", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "topic", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "disabled", + "in": "query", + "required": false, + "type": "boolean" + } + ] } ] }, @@ -244,6 +276,52 @@ "type": "string" } } + }, + "replica_assignment": { + "id": "replica_assignment", + "description": "Replica assignment", + "properties": { + "node_id": { + "type": "long", + "description": "node id" + }, + "core": { + "type": "long", + "description": "core" + } + } + }, + "cluster_partition": { + "id": "cluster_partition", + "description": "high-level partition info known to all cluster nodes", + "properties": { + "ns": { + "type": "string", + "description": "namespace" + }, + "topic": { + "type": "string", + "description": "topic" + }, + "partition_id": { + "type": "long", + "description": "partition" + }, + "leader_id": { + "type": "long", + "description": "leader node id" + }, + "replicas": { + "type": "array", + "items": { + "type": "replica_assignment" + }, + "description": "replica assignments" + }, + "disabled": { + "type": "boolean" + } + } } } } diff --git a/src/v/redpanda/admin_server.cc b/src/v/redpanda/admin_server.cc index 363c10aa2719d..511d8df0556f6 100644 --- a/src/v/redpanda/admin_server.cc +++ b/src/v/redpanda/admin_server.cc @@ -5016,6 +5016,131 @@ admin_server::post_cluster_partitions_topic_partition_handler( co_return ss::json::json_void(); } +namespace { + +struct cluster_partition_info { + ss::lw_shared_ptr ns_tp; + model::partition_id id; + std::vector replicas; + bool disabled = false; + + ss::httpd::cluster_json::cluster_partition to_json() const { + ss::httpd::cluster_json::cluster_partition ret; + ret.ns = ns_tp->ns(); + ret.topic = ns_tp->tp(); + ret.partition_id = id(); + for (auto& r : replicas) { + ss::httpd::cluster_json::replica_assignment a; + a.node_id = r.node_id; + a.core = r.shard; + ret.replicas.push(a); + } + ret.disabled = disabled; + return ret; + } +}; + +fragmented_vector topic2cluster_partitions( + model::topic_namespace ns_tp, + const cluster::assignments_set& assignments, + const cluster::topic_disabled_partitions_set* disabled_set, + std::optional disabled_filter) { + fragmented_vector ret; + + if (disabled_filter) { + // fast exits + if ( + disabled_filter.value() + && (!disabled_set || disabled_set->is_empty())) { + return ret; + } + + if ( + !disabled_filter.value() && disabled_set + && disabled_set->is_topic_disabled()) { + return ret; + } + } + + auto shared_ns_tp = ss::make_lw_shared( + std::move(ns_tp)); + + if ( + disabled_filter && disabled_filter.value() && disabled_set + && disabled_set->partitions) { + // special handling for disabled=true filter, as we hope that iterating + // over the disabled set is more optimal. + for (const auto& id : *disabled_set->partitions) { + auto as_it = assignments.find(id); + vassert( + as_it != assignments.end(), + "topic: {}, partition {} must be present", + *shared_ns_tp, + id); + + ret.push_back(cluster_partition_info{ + .ns_tp = shared_ns_tp, + .id = id, + .replicas = as_it->replicas, + .disabled = true, + }); + } + } else { + for (const auto& p_as : assignments) { + bool disabled = disabled_set && disabled_set->is_disabled(p_as.id); + + if (disabled_filter && *disabled_filter != disabled) { + continue; + } + + ret.push_back(cluster_partition_info{ + .ns_tp = shared_ns_tp, + .id = p_as.id, + .replicas = p_as.replicas, + .disabled = disabled, + }); + } + } + + std::sort(ret.begin(), ret.end(), [](const auto& l, const auto& r) { + return l.id < r.id; + }); + + return ret; +} + +} // namespace + +ss::future +admin_server::get_cluster_partitions_topic_handler( + std::unique_ptr req) { + auto ns_tp = model::topic_namespace{ + model::ns{req->param["namespace"]}, model::topic{req->param["topic"]}}; + + std::optional disabled_filter; + if (req->query_parameters.contains("disabled")) { + disabled_filter = get_boolean_query_param(*req, "disabled"); + } + + const auto& topics_state = _controller->get_topics_state().local(); + + auto topic_it = topics_state.topics_map().find(ns_tp); + if (topic_it == topics_state.topics_map().end()) { + throw ss::httpd::not_found_exception( + fmt::format("topic {} not found", ns_tp)); + } + + auto partitions = topic2cluster_partitions( + ns_tp, + topic_it->second.get_assignments(), + topics_state.get_topic_disabled_set(ns_tp), + disabled_filter); + + co_return ss::json::json_return_type(ss::json::stream_range_as_array( + lw_shared_container{std::move(partitions)}, + [](const auto& p) { return p.to_json(); })); +} + void admin_server::register_cluster_routes() { register_route( ss::httpd::cluster_json::get_cluster_health_overview, @@ -5110,6 +5235,15 @@ void admin_server::register_cluster_partitions_routes() { return post_cluster_partitions_topic_partition_handler( std::move(req)); }); + + // The following GET routes provide APIs for getting high-level partition + // info known to all cluster nodes. + + register_route( + ss::httpd::cluster_json::get_cluster_partitions_topic, + [this](std::unique_ptr req) { + return get_cluster_partitions_topic_handler(std::move(req)); + }); } ss::future admin_server::sync_local_state_handler( diff --git a/src/v/redpanda/admin_server.h b/src/v/redpanda/admin_server.h index e4fa8a8041ae7..92c3f3bfbc560 100644 --- a/src/v/redpanda/admin_server.h +++ b/src/v/redpanda/admin_server.h @@ -489,6 +489,8 @@ class admin_server { ss::future post_cluster_partitions_topic_partition_handler( std::unique_ptr); + ss::future + get_cluster_partitions_topic_handler(std::unique_ptr); /// Shadow indexing routes ss::future From 7c73a28e9222e11b61f556df1002142fcbf9aac5 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Wed, 25 Oct 2023 18:42:30 +0200 Subject: [PATCH 08/10] admin: add GET /v1/cluster/partitions handler Viewing cluster-wide info for all partitions + disabled filter. --- src/v/redpanda/admin/api-doc/cluster.json | 31 +++++++++++ src/v/redpanda/admin_server.cc | 64 +++++++++++++++++++++++ src/v/redpanda/admin_server.h | 2 + 3 files changed, 97 insertions(+) diff --git a/src/v/redpanda/admin/api-doc/cluster.json b/src/v/redpanda/admin/api-doc/cluster.json index aa6b7aad6ca43..1f2c44908f13e 100644 --- a/src/v/redpanda/admin/api-doc/cluster.json +++ b/src/v/redpanda/admin/api-doc/cluster.json @@ -67,6 +67,37 @@ } ] }, + { + "path": "/v1/cluster/partitions", + "operations": [ + { + "method": "GET", + "summary": "Get cluster-level metadata for all partitions in the cluster", + "nickname": "get_cluster_partitions", + "type": "array", + "items": { + "type": "cluster_partition" + }, + "produces": [ + "application/json" + ], + "parameters": [ + { + "name": "disabled", + "in": "query", + "required": false, + "type": "boolean" + }, + { + "name": "with_internal", + "in": "query", + "required": false, + "type": "boolean" + } + ] + } + ] + }, { "path": "/v1/cluster/partitions/{namespace}/{topic}", "operations": [ diff --git a/src/v/redpanda/admin_server.cc b/src/v/redpanda/admin_server.cc index 511d8df0556f6..60943d2152210 100644 --- a/src/v/redpanda/admin_server.cc +++ b/src/v/redpanda/admin_server.cc @@ -5111,6 +5111,65 @@ fragmented_vector topic2cluster_partitions( } // namespace +ss::future +admin_server::get_cluster_partitions_handler( + std::unique_ptr req) { + std::optional disabled_filter; + if (req->query_parameters.contains("disabled")) { + disabled_filter = get_boolean_query_param(*req, "disabled"); + } + + bool with_internal = get_boolean_query_param(*req, "with_internal"); + + const auto& topics_state = _controller->get_topics_state().local(); + + fragmented_vector topics; + auto fill_topics = [&](const auto& map) { + for (const auto& [ns_tp, _] : map) { + if (!with_internal && !model::is_user_topic(ns_tp)) { + continue; + } + topics.push_back(ns_tp); + } + }; + + if (disabled_filter && *disabled_filter) { + // optimization: if disabled filter is on, iterate only over disabled + // topics; + fill_topics(topics_state.get_disabled_partitions()); + } else { + fill_topics(topics_state.topics_map()); + } + + std::sort(topics.begin(), topics.end()); + + ss::chunked_fifo partitions; + for (const auto& ns_tp : topics) { + auto topic_it = topics_state.topics_map().find(ns_tp); + if (topic_it == topics_state.topics_map().end()) { + // probably got deleted while we were iterating. + continue; + } + + auto topic_partitions = topic2cluster_partitions( + ns_tp, + topic_it->second.get_assignments(), + topics_state.get_topic_disabled_set(ns_tp), + disabled_filter); + + std::move( + topic_partitions.begin(), + topic_partitions.end(), + std::back_inserter(partitions)); + + co_await ss::coroutine::maybe_yield(); + } + + co_return ss::json::json_return_type(ss::json::stream_range_as_array( + lw_shared_container{std::move(partitions)}, + [](const auto& p) { return p.to_json(); })); +} + ss::future admin_server::get_cluster_partitions_topic_handler( std::unique_ptr req) { @@ -5239,6 +5298,11 @@ void admin_server::register_cluster_partitions_routes() { // The following GET routes provide APIs for getting high-level partition // info known to all cluster nodes. + register_route( + ss::httpd::cluster_json::get_cluster_partitions, + [this](std::unique_ptr req) { + return get_cluster_partitions_handler(std::move(req)); + }); register_route( ss::httpd::cluster_json::get_cluster_partitions_topic, [this](std::unique_ptr req) { diff --git a/src/v/redpanda/admin_server.h b/src/v/redpanda/admin_server.h index 92c3f3bfbc560..af75a436b0783 100644 --- a/src/v/redpanda/admin_server.h +++ b/src/v/redpanda/admin_server.h @@ -489,6 +489,8 @@ class admin_server { ss::future post_cluster_partitions_topic_partition_handler( std::unique_ptr); + ss::future + get_cluster_partitions_handler(std::unique_ptr); ss::future get_cluster_partitions_topic_handler(std::unique_ptr); From 904ea5c5e6adda764dd68d073ca081332ca95a89 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Thu, 26 Oct 2023 20:55:39 +0200 Subject: [PATCH 09/10] tests/admin: add support for setting/listing disabled partitions --- tests/rptest/services/admin.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/tests/rptest/services/admin.py b/tests/rptest/services/admin.py index 5df248807c265..302204b9007fe 100644 --- a/tests/rptest/services/admin.py +++ b/tests/rptest/services/admin.py @@ -614,6 +614,10 @@ def get_partitions(self, return self._request('get', path, node=node).json() + def get_partition(self, ns: str, topic: str, id: int, node=None): + return self._request("GET", f"partitions/{ns}/{topic}/{id}", + node=node).json() + def get_transactions(self, topic, partition, namespace, node=None): """ Get transaction for current partition @@ -1128,3 +1132,31 @@ def reset_scrubbing_metadata(self, "POST", f"cloud_storage/reset_scrubbing_metadata/{namespace}/{topic}/{partition}", node=node) + + def get_cluster_partitions(self, + ns: str | None = None, + topic: str | None = None, + disabled: bool | None = None, + node=None): + if topic is not None: + assert ns is not None + req = f"cluster/partitions/{ns}/{topic}" + else: + assert ns is None + req = f"cluster/partitions" + + if disabled is not None: + req += f"?disabled={disabled}" + + return self._request("GET", req, node=node).json() + + def set_partitions_disabled(self, + ns: str | None = None, + topic: str | None = None, + partition: int | None = None, + value: bool = True): + if partition is not None: + req = f"cluster/partitions/{ns}/{topic}/{partition}" + else: + req = f"cluster/partitions/{ns}/{topic}" + return self._request("POST", req, json={"disabled": value}) From 62477b6fdca080d7faa7eadc834d8f3a5801fa93 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Fri, 27 Oct 2023 01:10:06 +0200 Subject: [PATCH 10/10] tests: add disabling partitions APIs test --- tests/rptest/tests/recovery_mode_test.py | 146 +++++++++++++++++++++++ 1 file changed, 146 insertions(+) diff --git a/tests/rptest/tests/recovery_mode_test.py b/tests/rptest/tests/recovery_mode_test.py index 1f19bed39d89c..33144fa5520e8 100644 --- a/tests/rptest/tests/recovery_mode_test.py +++ b/tests/rptest/tests/recovery_mode_test.py @@ -8,6 +8,7 @@ # by the Apache License, Version 2.0 import tempfile +import dataclasses from ducktape.utils.util import wait_until @@ -15,6 +16,7 @@ from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST from rptest.services.cluster import cluster from rptest.clients.rpk import RpkTool, RpkException +from rptest.services.admin import Admin from rptest.services.rpk_producer import RpkProducer from rptest.util import wait_until_result @@ -196,3 +198,147 @@ def partitions_ready(): quiet=True).rstrip().split('\n') # check that group seek was successful assert len(consumed) == 2000 + + +@dataclasses.dataclass +class PartitionInfo: + ns: str + topic: str + partition_id: int + disabled: bool + + def from_json(json): + return PartitionInfo(**dict( + (f.name, json[f.name]) for f in dataclasses.fields(PartitionInfo))) + + +class DisablingPartitionsTest(RedpandaTest): + def __init__(self, *args, **kwargs): + super().__init__(*args, + num_brokers=4, + extra_rp_conf={"controller_snapshot_max_age_sec": 5}, + **kwargs) + + def sync(self): + admin = Admin(self.redpanda) + first = self.redpanda.nodes[0] + rest = self.redpanda.nodes[1:] + + def equal_everywhere(): + first_res = admin.get_cluster_partitions(node=first) + return all( + admin.get_cluster_partitions(node=n) == first_res + for n in rest) + + # give some time for controller updates to propagate + wait_until( + equal_everywhere, + timeout_sec=30, + backoff_sec=1, + err_msg="failed to wait for partitions metadata to equalize") + + @cluster(num_nodes=4) + def test_apis(self): + rpk = RpkTool(self.redpanda) + admin = Admin(self.redpanda) + + topics = ["mytopic1", "mytopic2", "mytopic3"] + for topic in topics: + rpk.create_topic(topic, partitions=3, replicas=3) + + admin.set_partitions_disabled(ns="kafka", topic="mytopic1") + + for p in [1, 2]: + admin.set_partitions_disabled(ns="kafka", + topic="mytopic2", + partition=p) + admin.set_partitions_disabled(ns="kafka", + topic="mytopic2", + partition=2, + value=False) + + admin.set_partitions_disabled(ns="kafka", topic="mytopic3") + admin.set_partitions_disabled(ns="kafka", + topic="mytopic3", + partition=1, + value=False) + + self.sync() + + def pi(topic_partition, disabled=False): + topic, partition = topic_partition.split('/') + return PartitionInfo('kafka', topic, int(partition), disabled) + + all_partitions = [ + pi('mytopic1/0', True), + pi('mytopic1/1', True), + pi('mytopic1/2', True), + pi('mytopic2/0', False), + pi('mytopic2/1', True), + pi('mytopic2/2', False), + pi('mytopic3/0', True), + pi('mytopic3/1', False), + pi('mytopic3/2', True), + ] + + def filtered(topic, partition, disabled): + def filter(p): + if topic is not None and p.topic != topic: + return False + if partition is not None and p.partition_id != partition: + return False + if disabled is not None and p.disabled != disabled: + return False + return True + + res = [p for p in all_partitions if filter(p)] + if partition is not None: + assert len(res) == 1 + res = res[0] + + return res + + def get(topic=None, partition=None, disabled=None): + if topic is None and partition is None: + ns = None + else: + ns = "kafka" + if partition is None: + json = admin.get_cluster_partitions(ns=ns, + topic=topic, + disabled=disabled) + + return [PartitionInfo.from_json(p) for p in json] + else: + json = admin.get_partition(ns, topic, partition) + return PartitionInfo.from_json(json) + + def check_everything(): + for topic in [None] + topics: + if topic is None: + partitions = [None] + else: + partitions = [None] + list(range(3)) + + for partition in partitions: + if partition is None: + disabled_list = [None, True, False] + else: + disabled_list = [None] + + for disabled in disabled_list: + filter = (topic, partition, disabled) + expected = filtered(*filter) + got = get(*filter) + self.logger.debug(f"{filter=} {got=} {expected=}") + assert got == expected + + check_everything() + + for n in self.redpanda.nodes: + self.redpanda.wait_for_controller_snapshot(n) + + self.redpanda.restart_nodes(self.redpanda.nodes) + self.redpanda.wait_for_membership(first_start=False) + + check_everything()