diff --git a/src/v/kafka/server/client_quota_translator.cc b/src/v/kafka/server/client_quota_translator.cc index a882a8b872e3..ce17efc343c1 100644 --- a/src/v/kafka/server/client_quota_translator.cc +++ b/src/v/kafka/server/client_quota_translator.cc @@ -29,9 +29,11 @@ std::ostream& operator<<(std::ostream& os, const client_quota_limits& l) { return os; } -client_quota_translator::client_quota_translator() - : _default_target_produce_tp_rate( - config::shard_local_cfg().target_quota_byte_rate.bind()) +client_quota_translator::client_quota_translator( + ss::sharded& quota_store) + : _quota_store(quota_store) + , _default_target_produce_tp_rate( + config::shard_local_cfg().target_quota_byte_rate.bind()) , _default_target_fetch_tp_rate( config::shard_local_cfg().target_fetch_quota_byte_rate.bind()) , _target_partition_mutation_quota( @@ -42,36 +44,48 @@ client_quota_translator::client_quota_translator() config::shard_local_cfg() .kafka_client_group_fetch_byte_rate_quota.bind()) {} -uint64_t client_quota_translator::get_client_target_produce_tp_rate( +std::optional +client_quota_translator::get_client_target_produce_tp_rate( const tracker_key& quota_id) { - return ss::visit( + return get_client_quota_value( quota_id, - [this](const k_client_id&) -> uint64_t { - return _default_target_produce_tp_rate(); - }, - [this](const k_group_name& k) -> uint64_t { - auto group = _target_produce_tp_rate_per_client_group().find(k); - if (group != _target_produce_tp_rate_per_client_group().end()) { - return group->second.quota; - } - return _default_target_produce_tp_rate(); - }); + _target_produce_tp_rate_per_client_group(), + _default_target_produce_tp_rate()); } std::optional client_quota_translator::get_client_target_fetch_tp_rate( const tracker_key& quota_id) { + return get_client_quota_value( + quota_id, + _target_fetch_tp_rate_per_client_group(), + _default_target_fetch_tp_rate()); +} + +std::optional +client_quota_translator::get_client_target_partition_mutation_rate( + const tracker_key& quota_id) { + return get_client_quota_value( + quota_id, {}, _target_partition_mutation_quota()); +} + +std::optional client_quota_translator::get_client_quota_value( + const tracker_key& quota_id, + const std::unordered_map& + group_quota_config, + std::optional default_value_config) { return ss::visit( quota_id, - [this](const k_client_id&) -> std::optional { - return _default_target_fetch_tp_rate(); + [&default_value_config](const k_client_id&) -> std::optional { + return default_value_config; }, - [this](const k_group_name& k) -> std::optional { - auto group = _target_fetch_tp_rate_per_client_group().find(k); - if (group != _target_fetch_tp_rate_per_client_group().end()) { + [&group_quota_config](const k_group_name& k) -> std::optional { + auto group = group_quota_config.find(k); + if (group != group_quota_config.end()) { return group->second.quota; } - return _default_target_fetch_tp_rate(); + + return {}; }); } @@ -140,7 +154,8 @@ client_quota_translator::find_quota_value(const tracker_key& key) { return client_quota_limits{ .produce_limit = get_client_target_produce_tp_rate(key), .fetch_limit = get_client_target_fetch_tp_rate(key), - .partition_mutation_limit = _target_partition_mutation_quota(), + .partition_mutation_limit = get_client_target_partition_mutation_rate( + key), }; } diff --git a/src/v/kafka/server/client_quota_translator.h b/src/v/kafka/server/client_quota_translator.h index 924a20ab9605..a6aa820585e0 100644 --- a/src/v/kafka/server/client_quota_translator.h +++ b/src/v/kafka/server/client_quota_translator.h @@ -12,10 +12,12 @@ #pragma once #include "base/seastarx.h" +#include "cluster/fwd.h" #include "config/client_group_byte_rate_quota.h" #include "config/property.h" #include "utils/named_type.h" +#include #include #include @@ -75,7 +77,8 @@ class client_quota_translator { public: using on_change_fn = std::function; - client_quota_translator(); + explicit client_quota_translator( + ss::sharded&); /// Returns the quota tracker key applicable to the given quota context /// Note: because the client quotas configured for produce/fetch/pm might be @@ -102,9 +105,19 @@ class client_quota_translator { tracker_key get_partition_mutation_key(std::optional client_id); - uint64_t get_client_target_produce_tp_rate(const tracker_key& quota_id); + std::optional + get_client_target_produce_tp_rate(const tracker_key& quota_id); std::optional get_client_target_fetch_tp_rate(const tracker_key& quota_id); + std::optional + get_client_target_partition_mutation_rate(const tracker_key& quota_id); + std::optional get_client_quota_value( + const tracker_key& quota_id, + const std::unordered_map& + group_quota_config, + std::optional default_value_config); + + ss::sharded& _quota_store; config::binding _default_target_produce_tp_rate; config::binding> _default_target_fetch_tp_rate; diff --git a/src/v/kafka/server/quota_manager.cc b/src/v/kafka/server/quota_manager.cc index 813c895f7449..a80d0b84dca6 100644 --- a/src/v/kafka/server/quota_manager.cc +++ b/src/v/kafka/server/quota_manager.cc @@ -37,13 +37,15 @@ namespace kafka { using clock = quota_manager::clock; -quota_manager::quota_manager(client_quotas_t& client_quotas) +quota_manager::quota_manager( + client_quotas_t& client_quotas, + ss::sharded& client_quota_store) : _default_num_windows(config::shard_local_cfg().default_num_windows.bind()) , _default_window_width(config::shard_local_cfg().default_window_sec.bind()) , _replenish_threshold( config::shard_local_cfg().kafka_throughput_replenish_threshold.bind()) , _client_quotas{client_quotas} - , _translator{} + , _translator{client_quota_store} , _gc_freq(config::shard_local_cfg().quota_manager_gc_sec()) , _max_delay(config::shard_local_cfg().max_kafka_throttle_delay_ms.bind()) { if (seastar::this_shard_id() == _client_quotas.shard_id()) { @@ -74,8 +76,9 @@ ss::future quota_manager::maybe_add_and_retrieve_quota( auto it = _client_quotas->find(qid); if (it == _client_quotas->end()) { co_await container().invoke_on( - _client_quotas.shard_id(), - [qid, now](quota_manager& me) { return me.add_quota_id(qid, now); }); + _client_quotas.shard_id(), [qid, now](quota_manager& me) mutable { + return me.add_quota_id(std::move(qid), now); + }); it = _client_quotas->find(qid); if (it == _client_quotas->end()) { diff --git a/src/v/kafka/server/quota_manager.h b/src/v/kafka/server/quota_manager.h index 74a3e731d204..2ac2816cd310 100644 --- a/src/v/kafka/server/quota_manager.h +++ b/src/v/kafka/server/quota_manager.h @@ -73,7 +73,9 @@ class quota_manager : public ss::peering_sharded_service { = absl::node_hash_map>; using client_quotas_t = ssx::sharded_ptr; - explicit quota_manager(client_quotas_t& client_quotas); + quota_manager( + client_quotas_t& client_quotas, + ss::sharded& client_quota_store); quota_manager(const quota_manager&) = delete; quota_manager& operator=(const quota_manager&) = delete; quota_manager(quota_manager&&) = delete; diff --git a/src/v/kafka/server/tests/CMakeLists.txt b/src/v/kafka/server/tests/CMakeLists.txt index b6b25a5c97c2..bcf4e20f8fa9 100644 --- a/src/v/kafka/server/tests/CMakeLists.txt +++ b/src/v/kafka/server/tests/CMakeLists.txt @@ -13,7 +13,6 @@ rp_test( fetch_unit_test.cc config_utils_test.cc config_response_utils_test.cc - client_quota_translator_test.cc DEFINITIONS BOOST_TEST_DYN_LINK LIBRARIES Boost::unit_test_framework v::kafka LABELS kafka @@ -78,6 +77,15 @@ rp_test( LABELS kafka ) +rp_test( + FIXTURE_TEST + BINARY_NAME quota_translator + SOURCES client_quota_translator_test.cc + LIBRARIES v::seastar_testing_main v::kafka + ARGS "-- -c 1" + LABELS kafka +) + rp_test( BENCHMARK_TEST BINARY_NAME kafka_fetch_plan diff --git a/src/v/kafka/server/tests/client_quota_translator_test.cc b/src/v/kafka/server/tests/client_quota_translator_test.cc index 7d5aae507773..004720c0d678 100644 --- a/src/v/kafka/server/tests/client_quota_translator_test.cc +++ b/src/v/kafka/server/tests/client_quota_translator_test.cc @@ -8,9 +8,12 @@ // by the Apache License, Version 2.0 #include "base/seastarx.h" +#include "cluster/client_quota_store.h" #include "config/configuration.h" #include "kafka/server/client_quota_translator.h" +#include + #include #include #include @@ -19,8 +22,8 @@ using namespace kafka; -const ss::sstring client_id = "franz-go"; -const tracker_key client_id_key = k_client_id{client_id}; +const ss::sstring test_client_id = "franz-go"; +const tracker_key test_client_id_key = k_client_id{test_client_id}; constexpr std::string_view raw_basic_produce_config = R"([ { @@ -53,39 +56,62 @@ const auto CHECK_VARIANT_EQ = [](auto expected, auto got) { BOOST_CHECK_EQUAL(expected, get(got)); }; -BOOST_AUTO_TEST_CASE(quota_translator_default_test) { - client_quota_translator tr; +void reset_configs() { + config::shard_local_cfg().target_quota_byte_rate.reset(); + config::shard_local_cfg().target_fetch_quota_byte_rate.reset(); + config::shard_local_cfg().kafka_admin_topic_api_rate.reset(); + config::shard_local_cfg().kafka_client_group_byte_rate_quota.reset(); + config::shard_local_cfg().kafka_client_group_fetch_byte_rate_quota.reset(); +} + +struct fixture { + ss::sharded quota_store; + kafka::client_quota_translator tr; + + fixture() + : tr(std::ref(quota_store)) { + quota_store.start().get(); + } + + ~fixture() { quota_store.stop().get(); } +}; + +SEASTAR_THREAD_TEST_CASE(quota_translator_default_test) { + reset_configs(); + fixture f; auto default_limits = client_quota_limits{ .produce_limit = 2147483648, .fetch_limit = std::nullopt, .partition_mutation_limit = std::nullopt, }; - auto [key, limits] = tr.find_quota( - {client_quota_type::produce_quota, client_id}); - BOOST_CHECK_EQUAL(client_id_key, key); + auto [key, limits] = f.tr.find_quota( + {client_quota_type::produce_quota, test_client_id}); + BOOST_CHECK_EQUAL(test_client_id_key, key); BOOST_CHECK_EQUAL(default_limits, limits); } -BOOST_AUTO_TEST_CASE(quota_translator_modified_default_test) { +SEASTAR_THREAD_TEST_CASE(quota_translator_modified_default_test) { + reset_configs(); config::shard_local_cfg().target_quota_byte_rate.set_value(1111); config::shard_local_cfg().target_fetch_quota_byte_rate.set_value(2222); config::shard_local_cfg().kafka_admin_topic_api_rate.set_value(3333); - client_quota_translator tr; + fixture f; auto expected_limits = client_quota_limits{ .produce_limit = 1111, .fetch_limit = 2222, .partition_mutation_limit = 3333, }; - auto [key, limits] = tr.find_quota( - {client_quota_type::produce_quota, client_id}); - BOOST_CHECK_EQUAL(client_id_key, key); + auto [key, limits] = f.tr.find_quota( + {client_quota_type::produce_quota, test_client_id}); + BOOST_CHECK_EQUAL(test_client_id_key, key); BOOST_CHECK_EQUAL(expected_limits, limits); } -BOOST_AUTO_TEST_CASE(quota_translator_client_group_test) { +SEASTAR_THREAD_TEST_CASE(quota_translator_client_group_test) { + reset_configs(); constexpr auto P_DEF = 1111; constexpr auto F_DEF = 2222; constexpr auto PM_DEF = 3333; @@ -100,18 +126,19 @@ BOOST_AUTO_TEST_CASE(quota_translator_client_group_test) { .kafka_client_group_fetch_byte_rate_quota.set_value( YAML::Load(std::string(raw_basic_fetch_config))); - client_quota_translator tr; + fixture f; // Stage 1 - Start by checking that tracker_key's are correctly detected // for various client ids - auto get_produce_key = [&tr](auto client_id) { - return tr.find_quota_key({client_quota_type::produce_quota, client_id}); + auto get_produce_key = [&f](auto client_id) { + return f.tr.find_quota_key( + {client_quota_type::produce_quota, client_id}); }; - auto get_fetch_key = [&tr](auto client_id) { - return tr.find_quota_key({client_quota_type::fetch_quota, client_id}); + auto get_fetch_key = [&f](auto client_id) { + return f.tr.find_quota_key({client_quota_type::fetch_quota, client_id}); }; - auto get_mutation_key = [&tr](auto client_id) { - return tr.find_quota_key( + auto get_mutation_key = [&f](auto client_id) { + return f.tr.find_quota_key( {client_quota_type::partition_mutation_quota, client_id}); }; @@ -146,38 +173,38 @@ BOOST_AUTO_TEST_CASE(quota_translator_client_group_test) { // Check limits for the franz-go groups auto franz_go_produce_limits = client_quota_limits{ .produce_limit = 4096, - .fetch_limit = F_DEF, - .partition_mutation_limit = PM_DEF, + .fetch_limit = {}, + .partition_mutation_limit = {}, }; BOOST_CHECK_EQUAL( franz_go_produce_limits, - tr.find_quota_value(k_group_name{"franz-go-produce-group"})); + f.tr.find_quota_value(k_group_name{"franz-go-produce-group"})); auto franz_go_fetch_limits = client_quota_limits{ - .produce_limit = P_DEF, + .produce_limit = {}, .fetch_limit = 4097, - .partition_mutation_limit = PM_DEF, + .partition_mutation_limit = {}, }; BOOST_CHECK_EQUAL( franz_go_fetch_limits, - tr.find_quota_value(k_group_name{"franz-go-fetch-group"})); + f.tr.find_quota_value(k_group_name{"franz-go-fetch-group"})); // Check limits for the not-franz-go groups auto not_franz_go_produce_limits = client_quota_limits{ .produce_limit = 2048, - .fetch_limit = F_DEF, - .partition_mutation_limit = PM_DEF, + .fetch_limit = {}, + .partition_mutation_limit = {}, }; BOOST_CHECK_EQUAL( not_franz_go_produce_limits, - tr.find_quota_value(k_group_name{"not-franz-go-produce-group"})); + f.tr.find_quota_value(k_group_name{"not-franz-go-produce-group"})); auto not_franz_go_fetch_limits = client_quota_limits{ - .produce_limit = P_DEF, + .produce_limit = {}, .fetch_limit = 2049, - .partition_mutation_limit = PM_DEF, + .partition_mutation_limit = {}, }; BOOST_CHECK_EQUAL( not_franz_go_fetch_limits, - tr.find_quota_value(k_group_name{"not-franz-go-fetch-group"})); + f.tr.find_quota_value(k_group_name{"not-franz-go-fetch-group"})); // Check limits for the non-client-group keys auto default_limits = client_quota_limits{ @@ -186,10 +213,10 @@ BOOST_AUTO_TEST_CASE(quota_translator_client_group_test) { .partition_mutation_limit = PM_DEF, }; BOOST_CHECK_EQUAL( - default_limits, tr.find_quota_value(k_client_id{"unknown"})); - BOOST_CHECK_EQUAL(default_limits, tr.find_quota_value(k_client_id{""})); + default_limits, f.tr.find_quota_value(k_client_id{"unknown"})); + BOOST_CHECK_EQUAL(default_limits, f.tr.find_quota_value(k_client_id{""})); BOOST_CHECK_EQUAL( - default_limits, tr.find_quota_value(k_client_id{"franz-go"})); + default_limits, f.tr.find_quota_value(k_client_id{"franz-go"})); BOOST_CHECK_EQUAL( - default_limits, tr.find_quota_value(k_client_id{"not-franz-go"})); + default_limits, f.tr.find_quota_value(k_client_id{"not-franz-go"})); } diff --git a/src/v/kafka/server/tests/quota_manager_bench.cc b/src/v/kafka/server/tests/quota_manager_bench.cc index 956244a25dd6..b4646566816c 100644 --- a/src/v/kafka/server/tests/quota_manager_bench.cc +++ b/src/v/kafka/server/tests/quota_manager_bench.cc @@ -8,6 +8,7 @@ * the Business Source License, use of this software will be governed * by the Apache License, Version 2.0 */ +#include "cluster/client_quota_store.h" #include "config/configuration.h" #include "kafka/server/quota_manager.h" @@ -61,9 +62,11 @@ send_requests(kafka::quota_manager& qm, size_t count, bool use_unique) { } ss::future<> test_quota_manager(size_t count, bool use_unique) { + ss::sharded quota_store; kafka::quota_manager::client_quotas_t buckets_map; ss::sharded sqm; - co_await sqm.start(std::ref(buckets_map)); + co_await quota_store.start(); + co_await sqm.start(std::ref(buckets_map), std::ref(quota_store)); co_await sqm.invoke_on_all(&kafka::quota_manager::start); perf_tests::start_measuring_time(); @@ -72,6 +75,7 @@ ss::future<> test_quota_manager(size_t count, bool use_unique) { }); perf_tests::stop_measuring_time(); co_await sqm.stop(); + co_await quota_store.stop(); } struct test_case { diff --git a/src/v/kafka/server/tests/quota_manager_test.cc b/src/v/kafka/server/tests/quota_manager_test.cc index 0c8c1de470dd..c7a9ac988bf5 100644 --- a/src/v/kafka/server/tests/quota_manager_test.cc +++ b/src/v/kafka/server/tests/quota_manager_test.cc @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0 +#include "cluster/client_quota_store.h" #include "config/configuration.h" #include "kafka/server/client_quota_translator.h" #include "kafka/server/quota_manager.h" @@ -27,14 +28,19 @@ const static auto client_id = "franz-go"; struct fixture { kafka::quota_manager::client_quotas_t buckets_map; + ss::sharded quota_store; ss::sharded sqm; ss::future<> start() { - co_await sqm.start(std::ref(buckets_map)); + co_await quota_store.start(); + co_await sqm.start(std::ref(buckets_map), std::ref(quota_store)); co_await sqm.invoke_on_all(&kafka::quota_manager::start); } - ss::future<> stop() { co_await sqm.stop(); } + ss::future<> stop() { + co_await sqm.stop(); + co_await quota_store.stop(); + } }; template @@ -196,8 +202,7 @@ SEASTAR_THREAD_TEST_CASE(static_config_test) { BOOST_CHECK_EQUAL(it->second->tp_produce_rate->rate(), 4096); BOOST_REQUIRE(it->second->tp_fetch_rate.has_value()); BOOST_CHECK_EQUAL(it->second->tp_fetch_rate->rate(), 4097); - BOOST_REQUIRE(it->second->pm_rate.has_value()); - BOOST_CHECK_EQUAL(it->second->pm_rate->rate(), 1026); + BOOST_REQUIRE(!it->second->pm_rate.has_value()); } { ss::sstring client_id = "not-franz-go"; @@ -211,8 +216,7 @@ SEASTAR_THREAD_TEST_CASE(static_config_test) { BOOST_CHECK_EQUAL(it->second->tp_produce_rate->rate(), 2048); BOOST_REQUIRE(it->second->tp_fetch_rate.has_value()); BOOST_CHECK_EQUAL(it->second->tp_fetch_rate->rate(), 2049); - BOOST_REQUIRE(it->second->pm_rate.has_value()); - BOOST_CHECK_EQUAL(it->second->pm_rate->rate(), 1026); + BOOST_REQUIRE(!it->second->pm_rate.has_value()); } { ss::sstring client_id = "unconfigured"; @@ -233,6 +237,7 @@ SEASTAR_THREAD_TEST_CASE(static_config_test) { SEASTAR_THREAD_TEST_CASE(update_test) { using clock = kafka::quota_manager::clock; using k_group_name = kafka::k_group_name; + using k_client_id = kafka::k_client_id; fixture f; f.start().get(); auto stop = ss::defer([&] { f.stop().get(); }); @@ -294,16 +299,24 @@ SEASTAR_THREAD_TEST_CASE(update_test) { conf.kafka_client_group_byte_rate_quota.set_value(produce_config); }).get(); - // Check the rate has been updated + // Check the produce rate has been updated on the group auto it = f.buckets_map.local()->find( k_group_name{client_id + "-group"}); BOOST_REQUIRE(it != f.buckets_map.local()->end()); - BOOST_REQUIRE(it->second->tp_produce_rate.has_value()); - BOOST_CHECK_EQUAL(it->second->tp_produce_rate->rate(), 1024); + BOOST_CHECK(!it->second->tp_produce_rate.has_value()); // Check fetch is the same bucket BOOST_REQUIRE(it->second->tp_fetch_rate.has_value()); auto delay = f.sqm.local().throttle_fetch_tp(client_id, now).get(); BOOST_CHECK_EQUAL(delay / 1ms, 1000); + + // Check the new produce rate now applies + f.sqm.local() + .record_produce_tp_and_throttle(client_id, 8192, now) + .get(); + auto client_it = f.buckets_map.local()->find(k_client_id{client_id}); + BOOST_REQUIRE(client_it != f.buckets_map.local()->end()); + BOOST_REQUIRE(client_it->second->tp_produce_rate.has_value()); + BOOST_CHECK_EQUAL(client_it->second->tp_produce_rate->rate(), 1024); } } diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index 052e22802274..7cd5fb2a6c43 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -1703,7 +1703,11 @@ void application::wire_up_redpanda_services( // metrics and quota management syschecks::systemd_message("Adding kafka quota managers").get(); - construct_service(quota_mgr, std::ref(quota_mgr_state)).get(); + construct_service( + quota_mgr, + std::ref(quota_mgr_state), + std::ref(controller->get_quota_store())) + .get(); construct_service(snc_quota_mgr, std::ref(snc_node_quota)).get(); syschecks::systemd_message("Creating auditing subsystem").get();