From 7c2623acaee63e9062be36b24613dc3690b7a894 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gell=C3=A9rt=20Peresztegi-Nagy?= Date: Tue, 4 Jun 2024 20:38:57 +0100 Subject: [PATCH 1/6] k/quota_mgr: avoid a copy using move --- src/v/kafka/server/quota_manager.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/v/kafka/server/quota_manager.cc b/src/v/kafka/server/quota_manager.cc index 813c895f7449..b3f85b767db4 100644 --- a/src/v/kafka/server/quota_manager.cc +++ b/src/v/kafka/server/quota_manager.cc @@ -74,8 +74,9 @@ ss::future quota_manager::maybe_add_and_retrieve_quota( auto it = _client_quotas->find(qid); if (it == _client_quotas->end()) { co_await container().invoke_on( - _client_quotas.shard_id(), - [qid, now](quota_manager& me) { return me.add_quota_id(qid, now); }); + _client_quotas.shard_id(), [qid, now](quota_manager& me) mutable { + return me.add_quota_id(std::move(qid), now); + }); it = _client_quotas->find(qid); if (it == _client_quotas->end()) { From 7c3b4ea6c1be77ef7454fc72ab7fe3a6ebd5537c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gell=C3=A9rt=20Peresztegi-Nagy?= Date: Tue, 4 Jun 2024 19:47:56 +0100 Subject: [PATCH 2/6] k/client_quota_translator_test: reset configs between tests We should start each test with clean configs to prevent them from interfering with eachother. --- .../server/tests/client_quota_translator_test.cc | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/v/kafka/server/tests/client_quota_translator_test.cc b/src/v/kafka/server/tests/client_quota_translator_test.cc index 7d5aae507773..9f4a6a3b0afd 100644 --- a/src/v/kafka/server/tests/client_quota_translator_test.cc +++ b/src/v/kafka/server/tests/client_quota_translator_test.cc @@ -53,8 +53,16 @@ const auto CHECK_VARIANT_EQ = [](auto expected, auto got) { BOOST_CHECK_EQUAL(expected, get(got)); }; +void reset_configs() { + config::shard_local_cfg().target_quota_byte_rate.reset(); + config::shard_local_cfg().target_fetch_quota_byte_rate.reset(); + config::shard_local_cfg().kafka_admin_topic_api_rate.reset(); + config::shard_local_cfg().kafka_client_group_byte_rate_quota.reset(); + config::shard_local_cfg().kafka_client_group_fetch_byte_rate_quota.reset(); +} + BOOST_AUTO_TEST_CASE(quota_translator_default_test) { - client_quota_translator tr; + reset_configs(); auto default_limits = client_quota_limits{ .produce_limit = 2147483648, @@ -68,6 +76,7 @@ BOOST_AUTO_TEST_CASE(quota_translator_default_test) { } BOOST_AUTO_TEST_CASE(quota_translator_modified_default_test) { + reset_configs(); config::shard_local_cfg().target_quota_byte_rate.set_value(1111); config::shard_local_cfg().target_fetch_quota_byte_rate.set_value(2222); config::shard_local_cfg().kafka_admin_topic_api_rate.set_value(3333); @@ -86,6 +95,7 @@ BOOST_AUTO_TEST_CASE(quota_translator_modified_default_test) { } BOOST_AUTO_TEST_CASE(quota_translator_client_group_test) { + reset_configs(); constexpr auto P_DEF = 1111; constexpr auto F_DEF = 2222; constexpr auto PM_DEF = 3333; From bd6e3cbb4e252cae28a83b034c3aa783329a6a85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gell=C3=A9rt=20Peresztegi-Nagy?= Date: Mon, 3 Jun 2024 16:49:15 +0100 Subject: [PATCH 3/6] k/client_quota_translator_test: fix client_id name clash A follow up commit adds kafka/types.h as a transitive dependency to the client_quota_translator_test.cc which causes the client_id constant in the test file to clash with the client_id type alias from types.h. --- .../server/tests/client_quota_translator_test.cc | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/v/kafka/server/tests/client_quota_translator_test.cc b/src/v/kafka/server/tests/client_quota_translator_test.cc index 9f4a6a3b0afd..6b428f5d9058 100644 --- a/src/v/kafka/server/tests/client_quota_translator_test.cc +++ b/src/v/kafka/server/tests/client_quota_translator_test.cc @@ -19,8 +19,8 @@ using namespace kafka; -const ss::sstring client_id = "franz-go"; -const tracker_key client_id_key = k_client_id{client_id}; +const ss::sstring test_client_id = "franz-go"; +const tracker_key test_client_id_key = k_client_id{test_client_id}; constexpr std::string_view raw_basic_produce_config = R"([ { @@ -70,8 +70,8 @@ BOOST_AUTO_TEST_CASE(quota_translator_default_test) { .partition_mutation_limit = std::nullopt, }; auto [key, limits] = tr.find_quota( - {client_quota_type::produce_quota, client_id}); - BOOST_CHECK_EQUAL(client_id_key, key); + {client_quota_type::produce_quota, test_client_id}); + BOOST_CHECK_EQUAL(test_client_id_key, key); BOOST_CHECK_EQUAL(default_limits, limits); } @@ -89,8 +89,8 @@ BOOST_AUTO_TEST_CASE(quota_translator_modified_default_test) { .partition_mutation_limit = 3333, }; auto [key, limits] = tr.find_quota( - {client_quota_type::produce_quota, client_id}); - BOOST_CHECK_EQUAL(client_id_key, key); + {client_quota_type::produce_quota, test_client_id}); + BOOST_CHECK_EQUAL(test_client_id_key, key); BOOST_CHECK_EQUAL(expected_limits, limits); } From a659eef57449f7fdf5a72b0c80edab035d21498e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gell=C3=A9rt=20Peresztegi-Nagy?= Date: Tue, 4 Jun 2024 18:02:37 +0100 Subject: [PATCH 4/6] k/quota_translator: fix quota values for group-based keys This fixes a bug in the quota values being returned for group-based quota keys where we would incorrectly fall back to the default values for group-based keys when the correct quota is really "not applicable" (i.e. `std::nullopt`). This is not an observable bug because these quota values are never really accessed. If the request had a group-based applicable quota, it would both have a group-based key and its limit. It's not possible for a request to have a group-based key and a default quota value. We fix this bug now because we are about to integrate with the quota store, which makes this bug more apparent. --- src/v/kafka/server/client_quota_translator.cc | 25 ++++++++++++++----- src/v/kafka/server/client_quota_translator.h | 5 +++- .../tests/client_quota_translator_test.cc | 16 ++++++------ .../kafka/server/tests/quota_manager_test.cc | 21 ++++++++++------ 4 files changed, 45 insertions(+), 22 deletions(-) diff --git a/src/v/kafka/server/client_quota_translator.cc b/src/v/kafka/server/client_quota_translator.cc index a882a8b872e3..280be749c6bc 100644 --- a/src/v/kafka/server/client_quota_translator.cc +++ b/src/v/kafka/server/client_quota_translator.cc @@ -42,19 +42,20 @@ client_quota_translator::client_quota_translator() config::shard_local_cfg() .kafka_client_group_fetch_byte_rate_quota.bind()) {} -uint64_t client_quota_translator::get_client_target_produce_tp_rate( +std::optional +client_quota_translator::get_client_target_produce_tp_rate( const tracker_key& quota_id) { return ss::visit( quota_id, - [this](const k_client_id&) -> uint64_t { + [this](const k_client_id&) -> std::optional { return _default_target_produce_tp_rate(); }, - [this](const k_group_name& k) -> uint64_t { + [this](const k_group_name& k) -> std::optional { auto group = _target_produce_tp_rate_per_client_group().find(k); if (group != _target_produce_tp_rate_per_client_group().end()) { return group->second.quota; } - return _default_target_produce_tp_rate(); + return {}; }); } @@ -71,10 +72,21 @@ client_quota_translator::get_client_target_fetch_tp_rate( if (group != _target_fetch_tp_rate_per_client_group().end()) { return group->second.quota; } - return _default_target_fetch_tp_rate(); + return {}; }); } +std::optional +client_quota_translator::get_client_target_partition_mutation_rate( + const tracker_key& quota_id) { + return ss::visit( + quota_id, + [this](const k_client_id&) -> std::optional { + return _target_partition_mutation_quota(); + }, + [](const k_group_name&) -> std::optional { return {}; }); +} + namespace { // If client is part of some group then client quota ID is a group // else client quota ID is client_id @@ -140,7 +152,8 @@ client_quota_translator::find_quota_value(const tracker_key& key) { return client_quota_limits{ .produce_limit = get_client_target_produce_tp_rate(key), .fetch_limit = get_client_target_fetch_tp_rate(key), - .partition_mutation_limit = _target_partition_mutation_quota(), + .partition_mutation_limit = get_client_target_partition_mutation_rate( + key), }; } diff --git a/src/v/kafka/server/client_quota_translator.h b/src/v/kafka/server/client_quota_translator.h index 924a20ab9605..89fd8671d7ed 100644 --- a/src/v/kafka/server/client_quota_translator.h +++ b/src/v/kafka/server/client_quota_translator.h @@ -102,9 +102,12 @@ class client_quota_translator { tracker_key get_partition_mutation_key(std::optional client_id); - uint64_t get_client_target_produce_tp_rate(const tracker_key& quota_id); + std::optional + get_client_target_produce_tp_rate(const tracker_key& quota_id); std::optional get_client_target_fetch_tp_rate(const tracker_key& quota_id); + std::optional + get_client_target_partition_mutation_rate(const tracker_key& quota_id); config::binding _default_target_produce_tp_rate; config::binding> _default_target_fetch_tp_rate; diff --git a/src/v/kafka/server/tests/client_quota_translator_test.cc b/src/v/kafka/server/tests/client_quota_translator_test.cc index 6b428f5d9058..70a2b576335e 100644 --- a/src/v/kafka/server/tests/client_quota_translator_test.cc +++ b/src/v/kafka/server/tests/client_quota_translator_test.cc @@ -156,16 +156,16 @@ BOOST_AUTO_TEST_CASE(quota_translator_client_group_test) { // Check limits for the franz-go groups auto franz_go_produce_limits = client_quota_limits{ .produce_limit = 4096, - .fetch_limit = F_DEF, - .partition_mutation_limit = PM_DEF, + .fetch_limit = {}, + .partition_mutation_limit = {}, }; BOOST_CHECK_EQUAL( franz_go_produce_limits, tr.find_quota_value(k_group_name{"franz-go-produce-group"})); auto franz_go_fetch_limits = client_quota_limits{ - .produce_limit = P_DEF, + .produce_limit = {}, .fetch_limit = 4097, - .partition_mutation_limit = PM_DEF, + .partition_mutation_limit = {}, }; BOOST_CHECK_EQUAL( franz_go_fetch_limits, @@ -174,16 +174,16 @@ BOOST_AUTO_TEST_CASE(quota_translator_client_group_test) { // Check limits for the not-franz-go groups auto not_franz_go_produce_limits = client_quota_limits{ .produce_limit = 2048, - .fetch_limit = F_DEF, - .partition_mutation_limit = PM_DEF, + .fetch_limit = {}, + .partition_mutation_limit = {}, }; BOOST_CHECK_EQUAL( not_franz_go_produce_limits, tr.find_quota_value(k_group_name{"not-franz-go-produce-group"})); auto not_franz_go_fetch_limits = client_quota_limits{ - .produce_limit = P_DEF, + .produce_limit = {}, .fetch_limit = 2049, - .partition_mutation_limit = PM_DEF, + .partition_mutation_limit = {}, }; BOOST_CHECK_EQUAL( not_franz_go_fetch_limits, diff --git a/src/v/kafka/server/tests/quota_manager_test.cc b/src/v/kafka/server/tests/quota_manager_test.cc index 0c8c1de470dd..406250ab8ddc 100644 --- a/src/v/kafka/server/tests/quota_manager_test.cc +++ b/src/v/kafka/server/tests/quota_manager_test.cc @@ -196,8 +196,7 @@ SEASTAR_THREAD_TEST_CASE(static_config_test) { BOOST_CHECK_EQUAL(it->second->tp_produce_rate->rate(), 4096); BOOST_REQUIRE(it->second->tp_fetch_rate.has_value()); BOOST_CHECK_EQUAL(it->second->tp_fetch_rate->rate(), 4097); - BOOST_REQUIRE(it->second->pm_rate.has_value()); - BOOST_CHECK_EQUAL(it->second->pm_rate->rate(), 1026); + BOOST_REQUIRE(!it->second->pm_rate.has_value()); } { ss::sstring client_id = "not-franz-go"; @@ -211,8 +210,7 @@ SEASTAR_THREAD_TEST_CASE(static_config_test) { BOOST_CHECK_EQUAL(it->second->tp_produce_rate->rate(), 2048); BOOST_REQUIRE(it->second->tp_fetch_rate.has_value()); BOOST_CHECK_EQUAL(it->second->tp_fetch_rate->rate(), 2049); - BOOST_REQUIRE(it->second->pm_rate.has_value()); - BOOST_CHECK_EQUAL(it->second->pm_rate->rate(), 1026); + BOOST_REQUIRE(!it->second->pm_rate.has_value()); } { ss::sstring client_id = "unconfigured"; @@ -233,6 +231,7 @@ SEASTAR_THREAD_TEST_CASE(static_config_test) { SEASTAR_THREAD_TEST_CASE(update_test) { using clock = kafka::quota_manager::clock; using k_group_name = kafka::k_group_name; + using k_client_id = kafka::k_client_id; fixture f; f.start().get(); auto stop = ss::defer([&] { f.stop().get(); }); @@ -294,16 +293,24 @@ SEASTAR_THREAD_TEST_CASE(update_test) { conf.kafka_client_group_byte_rate_quota.set_value(produce_config); }).get(); - // Check the rate has been updated + // Check the produce rate has been updated on the group auto it = f.buckets_map.local()->find( k_group_name{client_id + "-group"}); BOOST_REQUIRE(it != f.buckets_map.local()->end()); - BOOST_REQUIRE(it->second->tp_produce_rate.has_value()); - BOOST_CHECK_EQUAL(it->second->tp_produce_rate->rate(), 1024); + BOOST_CHECK(!it->second->tp_produce_rate.has_value()); // Check fetch is the same bucket BOOST_REQUIRE(it->second->tp_fetch_rate.has_value()); auto delay = f.sqm.local().throttle_fetch_tp(client_id, now).get(); BOOST_CHECK_EQUAL(delay / 1ms, 1000); + + // Check the new produce rate now applies + f.sqm.local() + .record_produce_tp_and_throttle(client_id, 8192, now) + .get(); + auto client_it = f.buckets_map.local()->find(k_client_id{client_id}); + BOOST_REQUIRE(client_it != f.buckets_map.local()->end()); + BOOST_REQUIRE(client_it->second->tp_produce_rate.has_value()); + BOOST_CHECK_EQUAL(client_it->second->tp_produce_rate->rate(), 1024); } } From bed6e22825f2909c44c17794ceff4db7e3d111a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gell=C3=A9rt=20Peresztegi-Nagy?= Date: Mon, 3 Jun 2024 17:53:22 +0100 Subject: [PATCH 5/6] k/client_quota_translator: inject client_quota::store dependency This wires the sharded quota_store into the client_quota_translator. There's no behaviour change yet (the quota store is not accessed yet), so this is merely a refactor. --- src/v/kafka/server/client_quota_translator.cc | 8 ++- src/v/kafka/server/client_quota_translator.h | 7 ++- src/v/kafka/server/quota_manager.cc | 6 +- src/v/kafka/server/quota_manager.h | 4 +- src/v/kafka/server/tests/CMakeLists.txt | 10 +++- .../tests/client_quota_translator_test.cc | 59 ++++++++++++------- .../kafka/server/tests/quota_manager_bench.cc | 6 +- .../kafka/server/tests/quota_manager_test.cc | 10 +++- src/v/redpanda/application.cc | 6 +- 9 files changed, 83 insertions(+), 33 deletions(-) diff --git a/src/v/kafka/server/client_quota_translator.cc b/src/v/kafka/server/client_quota_translator.cc index 280be749c6bc..54a573bb8b21 100644 --- a/src/v/kafka/server/client_quota_translator.cc +++ b/src/v/kafka/server/client_quota_translator.cc @@ -29,9 +29,11 @@ std::ostream& operator<<(std::ostream& os, const client_quota_limits& l) { return os; } -client_quota_translator::client_quota_translator() - : _default_target_produce_tp_rate( - config::shard_local_cfg().target_quota_byte_rate.bind()) +client_quota_translator::client_quota_translator( + ss::sharded& quota_store) + : _quota_store(quota_store) + , _default_target_produce_tp_rate( + config::shard_local_cfg().target_quota_byte_rate.bind()) , _default_target_fetch_tp_rate( config::shard_local_cfg().target_fetch_quota_byte_rate.bind()) , _target_partition_mutation_quota( diff --git a/src/v/kafka/server/client_quota_translator.h b/src/v/kafka/server/client_quota_translator.h index 89fd8671d7ed..1ac3168afd51 100644 --- a/src/v/kafka/server/client_quota_translator.h +++ b/src/v/kafka/server/client_quota_translator.h @@ -12,10 +12,12 @@ #pragma once #include "base/seastarx.h" +#include "cluster/fwd.h" #include "config/client_group_byte_rate_quota.h" #include "config/property.h" #include "utils/named_type.h" +#include #include #include @@ -75,7 +77,8 @@ class client_quota_translator { public: using on_change_fn = std::function; - client_quota_translator(); + explicit client_quota_translator( + ss::sharded&); /// Returns the quota tracker key applicable to the given quota context /// Note: because the client quotas configured for produce/fetch/pm might be @@ -109,6 +112,8 @@ class client_quota_translator { std::optional get_client_target_partition_mutation_rate(const tracker_key& quota_id); + ss::sharded& _quota_store; + config::binding _default_target_produce_tp_rate; config::binding> _default_target_fetch_tp_rate; config::binding> _target_partition_mutation_quota; diff --git a/src/v/kafka/server/quota_manager.cc b/src/v/kafka/server/quota_manager.cc index b3f85b767db4..a80d0b84dca6 100644 --- a/src/v/kafka/server/quota_manager.cc +++ b/src/v/kafka/server/quota_manager.cc @@ -37,13 +37,15 @@ namespace kafka { using clock = quota_manager::clock; -quota_manager::quota_manager(client_quotas_t& client_quotas) +quota_manager::quota_manager( + client_quotas_t& client_quotas, + ss::sharded& client_quota_store) : _default_num_windows(config::shard_local_cfg().default_num_windows.bind()) , _default_window_width(config::shard_local_cfg().default_window_sec.bind()) , _replenish_threshold( config::shard_local_cfg().kafka_throughput_replenish_threshold.bind()) , _client_quotas{client_quotas} - , _translator{} + , _translator{client_quota_store} , _gc_freq(config::shard_local_cfg().quota_manager_gc_sec()) , _max_delay(config::shard_local_cfg().max_kafka_throttle_delay_ms.bind()) { if (seastar::this_shard_id() == _client_quotas.shard_id()) { diff --git a/src/v/kafka/server/quota_manager.h b/src/v/kafka/server/quota_manager.h index 74a3e731d204..2ac2816cd310 100644 --- a/src/v/kafka/server/quota_manager.h +++ b/src/v/kafka/server/quota_manager.h @@ -73,7 +73,9 @@ class quota_manager : public ss::peering_sharded_service { = absl::node_hash_map>; using client_quotas_t = ssx::sharded_ptr; - explicit quota_manager(client_quotas_t& client_quotas); + quota_manager( + client_quotas_t& client_quotas, + ss::sharded& client_quota_store); quota_manager(const quota_manager&) = delete; quota_manager& operator=(const quota_manager&) = delete; quota_manager(quota_manager&&) = delete; diff --git a/src/v/kafka/server/tests/CMakeLists.txt b/src/v/kafka/server/tests/CMakeLists.txt index b6b25a5c97c2..bcf4e20f8fa9 100644 --- a/src/v/kafka/server/tests/CMakeLists.txt +++ b/src/v/kafka/server/tests/CMakeLists.txt @@ -13,7 +13,6 @@ rp_test( fetch_unit_test.cc config_utils_test.cc config_response_utils_test.cc - client_quota_translator_test.cc DEFINITIONS BOOST_TEST_DYN_LINK LIBRARIES Boost::unit_test_framework v::kafka LABELS kafka @@ -78,6 +77,15 @@ rp_test( LABELS kafka ) +rp_test( + FIXTURE_TEST + BINARY_NAME quota_translator + SOURCES client_quota_translator_test.cc + LIBRARIES v::seastar_testing_main v::kafka + ARGS "-- -c 1" + LABELS kafka +) + rp_test( BENCHMARK_TEST BINARY_NAME kafka_fetch_plan diff --git a/src/v/kafka/server/tests/client_quota_translator_test.cc b/src/v/kafka/server/tests/client_quota_translator_test.cc index 70a2b576335e..004720c0d678 100644 --- a/src/v/kafka/server/tests/client_quota_translator_test.cc +++ b/src/v/kafka/server/tests/client_quota_translator_test.cc @@ -8,9 +8,12 @@ // by the Apache License, Version 2.0 #include "base/seastarx.h" +#include "cluster/client_quota_store.h" #include "config/configuration.h" #include "kafka/server/client_quota_translator.h" +#include + #include #include #include @@ -61,40 +64,53 @@ void reset_configs() { config::shard_local_cfg().kafka_client_group_fetch_byte_rate_quota.reset(); } -BOOST_AUTO_TEST_CASE(quota_translator_default_test) { +struct fixture { + ss::sharded quota_store; + kafka::client_quota_translator tr; + + fixture() + : tr(std::ref(quota_store)) { + quota_store.start().get(); + } + + ~fixture() { quota_store.stop().get(); } +}; + +SEASTAR_THREAD_TEST_CASE(quota_translator_default_test) { reset_configs(); + fixture f; auto default_limits = client_quota_limits{ .produce_limit = 2147483648, .fetch_limit = std::nullopt, .partition_mutation_limit = std::nullopt, }; - auto [key, limits] = tr.find_quota( + auto [key, limits] = f.tr.find_quota( {client_quota_type::produce_quota, test_client_id}); BOOST_CHECK_EQUAL(test_client_id_key, key); BOOST_CHECK_EQUAL(default_limits, limits); } -BOOST_AUTO_TEST_CASE(quota_translator_modified_default_test) { +SEASTAR_THREAD_TEST_CASE(quota_translator_modified_default_test) { reset_configs(); config::shard_local_cfg().target_quota_byte_rate.set_value(1111); config::shard_local_cfg().target_fetch_quota_byte_rate.set_value(2222); config::shard_local_cfg().kafka_admin_topic_api_rate.set_value(3333); - client_quota_translator tr; + fixture f; auto expected_limits = client_quota_limits{ .produce_limit = 1111, .fetch_limit = 2222, .partition_mutation_limit = 3333, }; - auto [key, limits] = tr.find_quota( + auto [key, limits] = f.tr.find_quota( {client_quota_type::produce_quota, test_client_id}); BOOST_CHECK_EQUAL(test_client_id_key, key); BOOST_CHECK_EQUAL(expected_limits, limits); } -BOOST_AUTO_TEST_CASE(quota_translator_client_group_test) { +SEASTAR_THREAD_TEST_CASE(quota_translator_client_group_test) { reset_configs(); constexpr auto P_DEF = 1111; constexpr auto F_DEF = 2222; @@ -110,18 +126,19 @@ BOOST_AUTO_TEST_CASE(quota_translator_client_group_test) { .kafka_client_group_fetch_byte_rate_quota.set_value( YAML::Load(std::string(raw_basic_fetch_config))); - client_quota_translator tr; + fixture f; // Stage 1 - Start by checking that tracker_key's are correctly detected // for various client ids - auto get_produce_key = [&tr](auto client_id) { - return tr.find_quota_key({client_quota_type::produce_quota, client_id}); + auto get_produce_key = [&f](auto client_id) { + return f.tr.find_quota_key( + {client_quota_type::produce_quota, client_id}); }; - auto get_fetch_key = [&tr](auto client_id) { - return tr.find_quota_key({client_quota_type::fetch_quota, client_id}); + auto get_fetch_key = [&f](auto client_id) { + return f.tr.find_quota_key({client_quota_type::fetch_quota, client_id}); }; - auto get_mutation_key = [&tr](auto client_id) { - return tr.find_quota_key( + auto get_mutation_key = [&f](auto client_id) { + return f.tr.find_quota_key( {client_quota_type::partition_mutation_quota, client_id}); }; @@ -161,7 +178,7 @@ BOOST_AUTO_TEST_CASE(quota_translator_client_group_test) { }; BOOST_CHECK_EQUAL( franz_go_produce_limits, - tr.find_quota_value(k_group_name{"franz-go-produce-group"})); + f.tr.find_quota_value(k_group_name{"franz-go-produce-group"})); auto franz_go_fetch_limits = client_quota_limits{ .produce_limit = {}, .fetch_limit = 4097, @@ -169,7 +186,7 @@ BOOST_AUTO_TEST_CASE(quota_translator_client_group_test) { }; BOOST_CHECK_EQUAL( franz_go_fetch_limits, - tr.find_quota_value(k_group_name{"franz-go-fetch-group"})); + f.tr.find_quota_value(k_group_name{"franz-go-fetch-group"})); // Check limits for the not-franz-go groups auto not_franz_go_produce_limits = client_quota_limits{ @@ -179,7 +196,7 @@ BOOST_AUTO_TEST_CASE(quota_translator_client_group_test) { }; BOOST_CHECK_EQUAL( not_franz_go_produce_limits, - tr.find_quota_value(k_group_name{"not-franz-go-produce-group"})); + f.tr.find_quota_value(k_group_name{"not-franz-go-produce-group"})); auto not_franz_go_fetch_limits = client_quota_limits{ .produce_limit = {}, .fetch_limit = 2049, @@ -187,7 +204,7 @@ BOOST_AUTO_TEST_CASE(quota_translator_client_group_test) { }; BOOST_CHECK_EQUAL( not_franz_go_fetch_limits, - tr.find_quota_value(k_group_name{"not-franz-go-fetch-group"})); + f.tr.find_quota_value(k_group_name{"not-franz-go-fetch-group"})); // Check limits for the non-client-group keys auto default_limits = client_quota_limits{ @@ -196,10 +213,10 @@ BOOST_AUTO_TEST_CASE(quota_translator_client_group_test) { .partition_mutation_limit = PM_DEF, }; BOOST_CHECK_EQUAL( - default_limits, tr.find_quota_value(k_client_id{"unknown"})); - BOOST_CHECK_EQUAL(default_limits, tr.find_quota_value(k_client_id{""})); + default_limits, f.tr.find_quota_value(k_client_id{"unknown"})); + BOOST_CHECK_EQUAL(default_limits, f.tr.find_quota_value(k_client_id{""})); BOOST_CHECK_EQUAL( - default_limits, tr.find_quota_value(k_client_id{"franz-go"})); + default_limits, f.tr.find_quota_value(k_client_id{"franz-go"})); BOOST_CHECK_EQUAL( - default_limits, tr.find_quota_value(k_client_id{"not-franz-go"})); + default_limits, f.tr.find_quota_value(k_client_id{"not-franz-go"})); } diff --git a/src/v/kafka/server/tests/quota_manager_bench.cc b/src/v/kafka/server/tests/quota_manager_bench.cc index 956244a25dd6..b4646566816c 100644 --- a/src/v/kafka/server/tests/quota_manager_bench.cc +++ b/src/v/kafka/server/tests/quota_manager_bench.cc @@ -8,6 +8,7 @@ * the Business Source License, use of this software will be governed * by the Apache License, Version 2.0 */ +#include "cluster/client_quota_store.h" #include "config/configuration.h" #include "kafka/server/quota_manager.h" @@ -61,9 +62,11 @@ send_requests(kafka::quota_manager& qm, size_t count, bool use_unique) { } ss::future<> test_quota_manager(size_t count, bool use_unique) { + ss::sharded quota_store; kafka::quota_manager::client_quotas_t buckets_map; ss::sharded sqm; - co_await sqm.start(std::ref(buckets_map)); + co_await quota_store.start(); + co_await sqm.start(std::ref(buckets_map), std::ref(quota_store)); co_await sqm.invoke_on_all(&kafka::quota_manager::start); perf_tests::start_measuring_time(); @@ -72,6 +75,7 @@ ss::future<> test_quota_manager(size_t count, bool use_unique) { }); perf_tests::stop_measuring_time(); co_await sqm.stop(); + co_await quota_store.stop(); } struct test_case { diff --git a/src/v/kafka/server/tests/quota_manager_test.cc b/src/v/kafka/server/tests/quota_manager_test.cc index 406250ab8ddc..c7a9ac988bf5 100644 --- a/src/v/kafka/server/tests/quota_manager_test.cc +++ b/src/v/kafka/server/tests/quota_manager_test.cc @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0 +#include "cluster/client_quota_store.h" #include "config/configuration.h" #include "kafka/server/client_quota_translator.h" #include "kafka/server/quota_manager.h" @@ -27,14 +28,19 @@ const static auto client_id = "franz-go"; struct fixture { kafka::quota_manager::client_quotas_t buckets_map; + ss::sharded quota_store; ss::sharded sqm; ss::future<> start() { - co_await sqm.start(std::ref(buckets_map)); + co_await quota_store.start(); + co_await sqm.start(std::ref(buckets_map), std::ref(quota_store)); co_await sqm.invoke_on_all(&kafka::quota_manager::start); } - ss::future<> stop() { co_await sqm.stop(); } + ss::future<> stop() { + co_await sqm.stop(); + co_await quota_store.stop(); + } }; template diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index fcf901c034ab..7d8e9290832d 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -1670,7 +1670,11 @@ void application::wire_up_redpanda_services( // metrics and quota management syschecks::systemd_message("Adding kafka quota managers").get(); - construct_service(quota_mgr, std::ref(quota_mgr_state)).get(); + construct_service( + quota_mgr, + std::ref(quota_mgr_state), + std::ref(controller->get_quota_store())) + .get(); construct_service(snc_quota_mgr, std::ref(snc_node_quota)).get(); syschecks::systemd_message("Creating auditing subsystem").get(); From c1b933d6923684d5815a1da8fa137391f9b66765 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gell=C3=A9rt=20Peresztegi-Nagy?= Date: Tue, 4 Jun 2024 19:08:48 +0100 Subject: [PATCH 6/6] k/quota_translator: refactor quota lookup This refactors the quota lookup methods to be generic across the different request types in a way that allows us to hook in quotas from the quota store in follow up commits. Refactor only, no behaviour change. --- src/v/kafka/server/client_quota_translator.cc | 50 +++++++++---------- src/v/kafka/server/client_quota_translator.h | 5 ++ 2 files changed, 30 insertions(+), 25 deletions(-) diff --git a/src/v/kafka/server/client_quota_translator.cc b/src/v/kafka/server/client_quota_translator.cc index 54a573bb8b21..ce17efc343c1 100644 --- a/src/v/kafka/server/client_quota_translator.cc +++ b/src/v/kafka/server/client_quota_translator.cc @@ -47,46 +47,46 @@ client_quota_translator::client_quota_translator( std::optional client_quota_translator::get_client_target_produce_tp_rate( const tracker_key& quota_id) { - return ss::visit( + return get_client_quota_value( quota_id, - [this](const k_client_id&) -> std::optional { - return _default_target_produce_tp_rate(); - }, - [this](const k_group_name& k) -> std::optional { - auto group = _target_produce_tp_rate_per_client_group().find(k); - if (group != _target_produce_tp_rate_per_client_group().end()) { - return group->second.quota; - } - return {}; - }); + _target_produce_tp_rate_per_client_group(), + _default_target_produce_tp_rate()); } std::optional client_quota_translator::get_client_target_fetch_tp_rate( const tracker_key& quota_id) { - return ss::visit( + return get_client_quota_value( quota_id, - [this](const k_client_id&) -> std::optional { - return _default_target_fetch_tp_rate(); - }, - [this](const k_group_name& k) -> std::optional { - auto group = _target_fetch_tp_rate_per_client_group().find(k); - if (group != _target_fetch_tp_rate_per_client_group().end()) { - return group->second.quota; - } - return {}; - }); + _target_fetch_tp_rate_per_client_group(), + _default_target_fetch_tp_rate()); } std::optional client_quota_translator::get_client_target_partition_mutation_rate( const tracker_key& quota_id) { + return get_client_quota_value( + quota_id, {}, _target_partition_mutation_quota()); +} + +std::optional client_quota_translator::get_client_quota_value( + const tracker_key& quota_id, + const std::unordered_map& + group_quota_config, + std::optional default_value_config) { return ss::visit( quota_id, - [this](const k_client_id&) -> std::optional { - return _target_partition_mutation_quota(); + [&default_value_config](const k_client_id&) -> std::optional { + return default_value_config; }, - [](const k_group_name&) -> std::optional { return {}; }); + [&group_quota_config](const k_group_name& k) -> std::optional { + auto group = group_quota_config.find(k); + if (group != group_quota_config.end()) { + return group->second.quota; + } + + return {}; + }); } namespace { diff --git a/src/v/kafka/server/client_quota_translator.h b/src/v/kafka/server/client_quota_translator.h index 1ac3168afd51..a6aa820585e0 100644 --- a/src/v/kafka/server/client_quota_translator.h +++ b/src/v/kafka/server/client_quota_translator.h @@ -111,6 +111,11 @@ class client_quota_translator { get_client_target_fetch_tp_rate(const tracker_key& quota_id); std::optional get_client_target_partition_mutation_rate(const tracker_key& quota_id); + std::optional get_client_quota_value( + const tracker_key& quota_id, + const std::unordered_map& + group_quota_config, + std::optional default_value_config); ss::sharded& _quota_store;