Skip to content

Commit

Permalink
Merge pull request #18777 from pgellert/quotas/integrate-quota-store-…
Browse files Browse the repository at this point in the history
…translator-basics

CORE-2699 Quotas: prepare for using the quota store in `client_quota_translator`
  • Loading branch information
pgellert authored Jun 5, 2024
2 parents 66c2664 + c1b933d commit 6bc4a00
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 77 deletions.
59 changes: 37 additions & 22 deletions src/v/kafka/server/client_quota_translator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ std::ostream& operator<<(std::ostream& os, const client_quota_limits& l) {
return os;
}

client_quota_translator::client_quota_translator()
: _default_target_produce_tp_rate(
config::shard_local_cfg().target_quota_byte_rate.bind())
client_quota_translator::client_quota_translator(
ss::sharded<cluster::client_quota::store>& quota_store)
: _quota_store(quota_store)
, _default_target_produce_tp_rate(
config::shard_local_cfg().target_quota_byte_rate.bind())
, _default_target_fetch_tp_rate(
config::shard_local_cfg().target_fetch_quota_byte_rate.bind())
, _target_partition_mutation_quota(
Expand All @@ -42,36 +44,48 @@ client_quota_translator::client_quota_translator()
config::shard_local_cfg()
.kafka_client_group_fetch_byte_rate_quota.bind()) {}

uint64_t client_quota_translator::get_client_target_produce_tp_rate(
std::optional<uint64_t>
client_quota_translator::get_client_target_produce_tp_rate(
const tracker_key& quota_id) {
return ss::visit(
return get_client_quota_value(
quota_id,
[this](const k_client_id&) -> uint64_t {
return _default_target_produce_tp_rate();
},
[this](const k_group_name& k) -> uint64_t {
auto group = _target_produce_tp_rate_per_client_group().find(k);
if (group != _target_produce_tp_rate_per_client_group().end()) {
return group->second.quota;
}
return _default_target_produce_tp_rate();
});
_target_produce_tp_rate_per_client_group(),
_default_target_produce_tp_rate());
}

std::optional<uint64_t>
client_quota_translator::get_client_target_fetch_tp_rate(
const tracker_key& quota_id) {
return get_client_quota_value(
quota_id,
_target_fetch_tp_rate_per_client_group(),
_default_target_fetch_tp_rate());
}

std::optional<uint64_t>
client_quota_translator::get_client_target_partition_mutation_rate(
const tracker_key& quota_id) {
return get_client_quota_value(
quota_id, {}, _target_partition_mutation_quota());
}

std::optional<uint64_t> client_quota_translator::get_client_quota_value(
const tracker_key& quota_id,
const std::unordered_map<ss::sstring, config::client_group_quota>&
group_quota_config,
std::optional<uint64_t> default_value_config) {
return ss::visit(
quota_id,
[this](const k_client_id&) -> std::optional<uint64_t> {
return _default_target_fetch_tp_rate();
[&default_value_config](const k_client_id&) -> std::optional<uint64_t> {
return default_value_config;
},
[this](const k_group_name& k) -> std::optional<uint64_t> {
auto group = _target_fetch_tp_rate_per_client_group().find(k);
if (group != _target_fetch_tp_rate_per_client_group().end()) {
[&group_quota_config](const k_group_name& k) -> std::optional<uint64_t> {
auto group = group_quota_config.find(k);
if (group != group_quota_config.end()) {
return group->second.quota;
}
return _default_target_fetch_tp_rate();

return {};
});
}

Expand Down Expand Up @@ -140,7 +154,8 @@ client_quota_translator::find_quota_value(const tracker_key& key) {
return client_quota_limits{
.produce_limit = get_client_target_produce_tp_rate(key),
.fetch_limit = get_client_target_fetch_tp_rate(key),
.partition_mutation_limit = _target_partition_mutation_quota(),
.partition_mutation_limit = get_client_target_partition_mutation_rate(
key),
};
}

Expand Down
17 changes: 15 additions & 2 deletions src/v/kafka/server/client_quota_translator.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
#pragma once

#include "base/seastarx.h"
#include "cluster/fwd.h"
#include "config/client_group_byte_rate_quota.h"
#include "config/property.h"
#include "utils/named_type.h"

#include <seastar/core/sharded.hh>
#include <seastar/core/sstring.hh>

#include <utility>
Expand Down Expand Up @@ -75,7 +77,8 @@ class client_quota_translator {
public:
using on_change_fn = std::function<void()>;

client_quota_translator();
explicit client_quota_translator(
ss::sharded<cluster::client_quota::store>&);

/// Returns the quota tracker key applicable to the given quota context
/// Note: because the client quotas configured for produce/fetch/pm might be
Expand All @@ -102,9 +105,19 @@ class client_quota_translator {
tracker_key
get_partition_mutation_key(std::optional<std::string_view> client_id);

uint64_t get_client_target_produce_tp_rate(const tracker_key& quota_id);
std::optional<uint64_t>
get_client_target_produce_tp_rate(const tracker_key& quota_id);
std::optional<uint64_t>
get_client_target_fetch_tp_rate(const tracker_key& quota_id);
std::optional<uint64_t>
get_client_target_partition_mutation_rate(const tracker_key& quota_id);
std::optional<uint64_t> get_client_quota_value(
const tracker_key& quota_id,
const std::unordered_map<ss::sstring, config::client_group_quota>&
group_quota_config,
std::optional<uint64_t> default_value_config);

ss::sharded<cluster::client_quota::store>& _quota_store;

config::binding<uint32_t> _default_target_produce_tp_rate;
config::binding<std::optional<uint32_t>> _default_target_fetch_tp_rate;
Expand Down
11 changes: 7 additions & 4 deletions src/v/kafka/server/quota_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ namespace kafka {

using clock = quota_manager::clock;

quota_manager::quota_manager(client_quotas_t& client_quotas)
quota_manager::quota_manager(
client_quotas_t& client_quotas,
ss::sharded<cluster::client_quota::store>& client_quota_store)
: _default_num_windows(config::shard_local_cfg().default_num_windows.bind())
, _default_window_width(config::shard_local_cfg().default_window_sec.bind())
, _replenish_threshold(
config::shard_local_cfg().kafka_throughput_replenish_threshold.bind())
, _client_quotas{client_quotas}
, _translator{}
, _translator{client_quota_store}
, _gc_freq(config::shard_local_cfg().quota_manager_gc_sec())
, _max_delay(config::shard_local_cfg().max_kafka_throttle_delay_ms.bind()) {
if (seastar::this_shard_id() == _client_quotas.shard_id()) {
Expand Down Expand Up @@ -74,8 +76,9 @@ ss::future<clock::duration> quota_manager::maybe_add_and_retrieve_quota(
auto it = _client_quotas->find(qid);
if (it == _client_quotas->end()) {
co_await container().invoke_on(
_client_quotas.shard_id(),
[qid, now](quota_manager& me) { return me.add_quota_id(qid, now); });
_client_quotas.shard_id(), [qid, now](quota_manager& me) mutable {
return me.add_quota_id(std::move(qid), now);
});

it = _client_quotas->find(qid);
if (it == _client_quotas->end()) {
Expand Down
4 changes: 3 additions & 1 deletion src/v/kafka/server/quota_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ class quota_manager : public ss::peering_sharded_service<quota_manager> {
= absl::node_hash_map<tracker_key, ss::lw_shared_ptr<client_quota>>;
using client_quotas_t = ssx::sharded_ptr<client_quotas_map_t>;

explicit quota_manager(client_quotas_t& client_quotas);
quota_manager(
client_quotas_t& client_quotas,
ss::sharded<cluster::client_quota::store>& client_quota_store);
quota_manager(const quota_manager&) = delete;
quota_manager& operator=(const quota_manager&) = delete;
quota_manager(quota_manager&&) = delete;
Expand Down
10 changes: 9 additions & 1 deletion src/v/kafka/server/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ rp_test(
fetch_unit_test.cc
config_utils_test.cc
config_response_utils_test.cc
client_quota_translator_test.cc
DEFINITIONS BOOST_TEST_DYN_LINK
LIBRARIES Boost::unit_test_framework v::kafka
LABELS kafka
Expand Down Expand Up @@ -78,6 +77,15 @@ rp_test(
LABELS kafka
)

rp_test(
FIXTURE_TEST
BINARY_NAME quota_translator
SOURCES client_quota_translator_test.cc
LIBRARIES v::seastar_testing_main v::kafka
ARGS "-- -c 1"
LABELS kafka
)

rp_test(
BENCHMARK_TEST
BINARY_NAME kafka_fetch_plan
Expand Down
99 changes: 63 additions & 36 deletions src/v/kafka/server/tests/client_quota_translator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
// by the Apache License, Version 2.0

#include "base/seastarx.h"
#include "cluster/client_quota_store.h"
#include "config/configuration.h"
#include "kafka/server/client_quota_translator.h"

#include <seastar/testing/thread_test_case.hh>

#include <boost/test/auto_unit_test.hpp>
#include <boost/test/test_tools.hpp>
#include <boost/test/unit_test.hpp>
Expand All @@ -19,8 +22,8 @@

using namespace kafka;

const ss::sstring client_id = "franz-go";
const tracker_key client_id_key = k_client_id{client_id};
const ss::sstring test_client_id = "franz-go";
const tracker_key test_client_id_key = k_client_id{test_client_id};

constexpr std::string_view raw_basic_produce_config = R"([
{
Expand Down Expand Up @@ -53,39 +56,62 @@ const auto CHECK_VARIANT_EQ = [](auto expected, auto got) {
BOOST_CHECK_EQUAL(expected, get<decltype(expected)>(got));
};

BOOST_AUTO_TEST_CASE(quota_translator_default_test) {
client_quota_translator tr;
void reset_configs() {
config::shard_local_cfg().target_quota_byte_rate.reset();
config::shard_local_cfg().target_fetch_quota_byte_rate.reset();
config::shard_local_cfg().kafka_admin_topic_api_rate.reset();
config::shard_local_cfg().kafka_client_group_byte_rate_quota.reset();
config::shard_local_cfg().kafka_client_group_fetch_byte_rate_quota.reset();
}

struct fixture {
ss::sharded<cluster::client_quota::store> quota_store;
kafka::client_quota_translator tr;

fixture()
: tr(std::ref(quota_store)) {
quota_store.start().get();
}

~fixture() { quota_store.stop().get(); }
};

SEASTAR_THREAD_TEST_CASE(quota_translator_default_test) {
reset_configs();
fixture f;

auto default_limits = client_quota_limits{
.produce_limit = 2147483648,
.fetch_limit = std::nullopt,
.partition_mutation_limit = std::nullopt,
};
auto [key, limits] = tr.find_quota(
{client_quota_type::produce_quota, client_id});
BOOST_CHECK_EQUAL(client_id_key, key);
auto [key, limits] = f.tr.find_quota(
{client_quota_type::produce_quota, test_client_id});
BOOST_CHECK_EQUAL(test_client_id_key, key);
BOOST_CHECK_EQUAL(default_limits, limits);
}

BOOST_AUTO_TEST_CASE(quota_translator_modified_default_test) {
SEASTAR_THREAD_TEST_CASE(quota_translator_modified_default_test) {
reset_configs();
config::shard_local_cfg().target_quota_byte_rate.set_value(1111);
config::shard_local_cfg().target_fetch_quota_byte_rate.set_value(2222);
config::shard_local_cfg().kafka_admin_topic_api_rate.set_value(3333);

client_quota_translator tr;
fixture f;

auto expected_limits = client_quota_limits{
.produce_limit = 1111,
.fetch_limit = 2222,
.partition_mutation_limit = 3333,
};
auto [key, limits] = tr.find_quota(
{client_quota_type::produce_quota, client_id});
BOOST_CHECK_EQUAL(client_id_key, key);
auto [key, limits] = f.tr.find_quota(
{client_quota_type::produce_quota, test_client_id});
BOOST_CHECK_EQUAL(test_client_id_key, key);
BOOST_CHECK_EQUAL(expected_limits, limits);
}

BOOST_AUTO_TEST_CASE(quota_translator_client_group_test) {
SEASTAR_THREAD_TEST_CASE(quota_translator_client_group_test) {
reset_configs();
constexpr auto P_DEF = 1111;
constexpr auto F_DEF = 2222;
constexpr auto PM_DEF = 3333;
Expand All @@ -100,18 +126,19 @@ BOOST_AUTO_TEST_CASE(quota_translator_client_group_test) {
.kafka_client_group_fetch_byte_rate_quota.set_value(
YAML::Load(std::string(raw_basic_fetch_config)));

client_quota_translator tr;
fixture f;

// Stage 1 - Start by checking that tracker_key's are correctly detected
// for various client ids
auto get_produce_key = [&tr](auto client_id) {
return tr.find_quota_key({client_quota_type::produce_quota, client_id});
auto get_produce_key = [&f](auto client_id) {
return f.tr.find_quota_key(
{client_quota_type::produce_quota, client_id});
};
auto get_fetch_key = [&tr](auto client_id) {
return tr.find_quota_key({client_quota_type::fetch_quota, client_id});
auto get_fetch_key = [&f](auto client_id) {
return f.tr.find_quota_key({client_quota_type::fetch_quota, client_id});
};
auto get_mutation_key = [&tr](auto client_id) {
return tr.find_quota_key(
auto get_mutation_key = [&f](auto client_id) {
return f.tr.find_quota_key(
{client_quota_type::partition_mutation_quota, client_id});
};

Expand Down Expand Up @@ -146,38 +173,38 @@ BOOST_AUTO_TEST_CASE(quota_translator_client_group_test) {
// Check limits for the franz-go groups
auto franz_go_produce_limits = client_quota_limits{
.produce_limit = 4096,
.fetch_limit = F_DEF,
.partition_mutation_limit = PM_DEF,
.fetch_limit = {},
.partition_mutation_limit = {},
};
BOOST_CHECK_EQUAL(
franz_go_produce_limits,
tr.find_quota_value(k_group_name{"franz-go-produce-group"}));
f.tr.find_quota_value(k_group_name{"franz-go-produce-group"}));
auto franz_go_fetch_limits = client_quota_limits{
.produce_limit = P_DEF,
.produce_limit = {},
.fetch_limit = 4097,
.partition_mutation_limit = PM_DEF,
.partition_mutation_limit = {},
};
BOOST_CHECK_EQUAL(
franz_go_fetch_limits,
tr.find_quota_value(k_group_name{"franz-go-fetch-group"}));
f.tr.find_quota_value(k_group_name{"franz-go-fetch-group"}));

// Check limits for the not-franz-go groups
auto not_franz_go_produce_limits = client_quota_limits{
.produce_limit = 2048,
.fetch_limit = F_DEF,
.partition_mutation_limit = PM_DEF,
.fetch_limit = {},
.partition_mutation_limit = {},
};
BOOST_CHECK_EQUAL(
not_franz_go_produce_limits,
tr.find_quota_value(k_group_name{"not-franz-go-produce-group"}));
f.tr.find_quota_value(k_group_name{"not-franz-go-produce-group"}));
auto not_franz_go_fetch_limits = client_quota_limits{
.produce_limit = P_DEF,
.produce_limit = {},
.fetch_limit = 2049,
.partition_mutation_limit = PM_DEF,
.partition_mutation_limit = {},
};
BOOST_CHECK_EQUAL(
not_franz_go_fetch_limits,
tr.find_quota_value(k_group_name{"not-franz-go-fetch-group"}));
f.tr.find_quota_value(k_group_name{"not-franz-go-fetch-group"}));

// Check limits for the non-client-group keys
auto default_limits = client_quota_limits{
Expand All @@ -186,10 +213,10 @@ BOOST_AUTO_TEST_CASE(quota_translator_client_group_test) {
.partition_mutation_limit = PM_DEF,
};
BOOST_CHECK_EQUAL(
default_limits, tr.find_quota_value(k_client_id{"unknown"}));
BOOST_CHECK_EQUAL(default_limits, tr.find_quota_value(k_client_id{""}));
default_limits, f.tr.find_quota_value(k_client_id{"unknown"}));
BOOST_CHECK_EQUAL(default_limits, f.tr.find_quota_value(k_client_id{""}));
BOOST_CHECK_EQUAL(
default_limits, tr.find_quota_value(k_client_id{"franz-go"}));
default_limits, f.tr.find_quota_value(k_client_id{"franz-go"}));
BOOST_CHECK_EQUAL(
default_limits, tr.find_quota_value(k_client_id{"not-franz-go"}));
default_limits, f.tr.find_quota_value(k_client_id{"not-franz-go"}));
}
Loading

0 comments on commit 6bc4a00

Please sign in to comment.