Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CORE-2699 Quotas: prepare for using the quota store in client_quota_translator #18777

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 37 additions & 22 deletions src/v/kafka/server/client_quota_translator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ std::ostream& operator<<(std::ostream& os, const client_quota_limits& l) {
return os;
}

client_quota_translator::client_quota_translator()
: _default_target_produce_tp_rate(
config::shard_local_cfg().target_quota_byte_rate.bind())
client_quota_translator::client_quota_translator(
ss::sharded<cluster::client_quota::store>& quota_store)
: _quota_store(quota_store)
, _default_target_produce_tp_rate(
config::shard_local_cfg().target_quota_byte_rate.bind())
, _default_target_fetch_tp_rate(
config::shard_local_cfg().target_fetch_quota_byte_rate.bind())
, _target_partition_mutation_quota(
Expand All @@ -42,36 +44,48 @@ client_quota_translator::client_quota_translator()
config::shard_local_cfg()
.kafka_client_group_fetch_byte_rate_quota.bind()) {}

uint64_t client_quota_translator::get_client_target_produce_tp_rate(
std::optional<uint64_t>
client_quota_translator::get_client_target_produce_tp_rate(
const tracker_key& quota_id) {
return ss::visit(
return get_client_quota_value(
quota_id,
[this](const k_client_id&) -> uint64_t {
return _default_target_produce_tp_rate();
},
[this](const k_group_name& k) -> uint64_t {
auto group = _target_produce_tp_rate_per_client_group().find(k);
if (group != _target_produce_tp_rate_per_client_group().end()) {
return group->second.quota;
}
return _default_target_produce_tp_rate();
});
_target_produce_tp_rate_per_client_group(),
_default_target_produce_tp_rate());
}

std::optional<uint64_t>
client_quota_translator::get_client_target_fetch_tp_rate(
const tracker_key& quota_id) {
return get_client_quota_value(
quota_id,
_target_fetch_tp_rate_per_client_group(),
_default_target_fetch_tp_rate());
}

std::optional<uint64_t>
client_quota_translator::get_client_target_partition_mutation_rate(
const tracker_key& quota_id) {
return get_client_quota_value(
quota_id, {}, _target_partition_mutation_quota());
}

std::optional<uint64_t> client_quota_translator::get_client_quota_value(
const tracker_key& quota_id,
const std::unordered_map<ss::sstring, config::client_group_quota>&
group_quota_config,
std::optional<uint64_t> default_value_config) {
return ss::visit(
quota_id,
[this](const k_client_id&) -> std::optional<uint64_t> {
return _default_target_fetch_tp_rate();
[&default_value_config](const k_client_id&) -> std::optional<uint64_t> {
return default_value_config;
},
[this](const k_group_name& k) -> std::optional<uint64_t> {
auto group = _target_fetch_tp_rate_per_client_group().find(k);
if (group != _target_fetch_tp_rate_per_client_group().end()) {
[&group_quota_config](const k_group_name& k) -> std::optional<uint64_t> {
auto group = group_quota_config.find(k);
if (group != group_quota_config.end()) {
return group->second.quota;
}
return _default_target_fetch_tp_rate();

return {};
});
}

Expand Down Expand Up @@ -140,7 +154,8 @@ client_quota_translator::find_quota_value(const tracker_key& key) {
return client_quota_limits{
.produce_limit = get_client_target_produce_tp_rate(key),
.fetch_limit = get_client_target_fetch_tp_rate(key),
.partition_mutation_limit = _target_partition_mutation_quota(),
.partition_mutation_limit = get_client_target_partition_mutation_rate(
key),
};
}

Expand Down
17 changes: 15 additions & 2 deletions src/v/kafka/server/client_quota_translator.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
#pragma once

#include "base/seastarx.h"
#include "cluster/fwd.h"
#include "config/client_group_byte_rate_quota.h"
#include "config/property.h"
#include "utils/named_type.h"

#include <seastar/core/sharded.hh>
#include <seastar/core/sstring.hh>

#include <utility>
Expand Down Expand Up @@ -75,7 +77,8 @@ class client_quota_translator {
public:
using on_change_fn = std::function<void()>;

client_quota_translator();
explicit client_quota_translator(
ss::sharded<cluster::client_quota::store>&);

/// Returns the quota tracker key applicable to the given quota context
/// Note: because the client quotas configured for produce/fetch/pm might be
Expand All @@ -102,9 +105,19 @@ class client_quota_translator {
tracker_key
get_partition_mutation_key(std::optional<std::string_view> client_id);

uint64_t get_client_target_produce_tp_rate(const tracker_key& quota_id);
std::optional<uint64_t>
get_client_target_produce_tp_rate(const tracker_key& quota_id);
std::optional<uint64_t>
get_client_target_fetch_tp_rate(const tracker_key& quota_id);
std::optional<uint64_t>
get_client_target_partition_mutation_rate(const tracker_key& quota_id);
std::optional<uint64_t> get_client_quota_value(
const tracker_key& quota_id,
const std::unordered_map<ss::sstring, config::client_group_quota>&
group_quota_config,
std::optional<uint64_t> default_value_config);

ss::sharded<cluster::client_quota::store>& _quota_store;

config::binding<uint32_t> _default_target_produce_tp_rate;
config::binding<std::optional<uint32_t>> _default_target_fetch_tp_rate;
Expand Down
11 changes: 7 additions & 4 deletions src/v/kafka/server/quota_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ namespace kafka {

using clock = quota_manager::clock;

quota_manager::quota_manager(client_quotas_t& client_quotas)
quota_manager::quota_manager(
client_quotas_t& client_quotas,
ss::sharded<cluster::client_quota::store>& client_quota_store)
: _default_num_windows(config::shard_local_cfg().default_num_windows.bind())
, _default_window_width(config::shard_local_cfg().default_window_sec.bind())
, _replenish_threshold(
config::shard_local_cfg().kafka_throughput_replenish_threshold.bind())
, _client_quotas{client_quotas}
, _translator{}
, _translator{client_quota_store}
, _gc_freq(config::shard_local_cfg().quota_manager_gc_sec())
, _max_delay(config::shard_local_cfg().max_kafka_throttle_delay_ms.bind()) {
if (seastar::this_shard_id() == _client_quotas.shard_id()) {
Expand Down Expand Up @@ -74,8 +76,9 @@ ss::future<clock::duration> quota_manager::maybe_add_and_retrieve_quota(
auto it = _client_quotas->find(qid);
if (it == _client_quotas->end()) {
co_await container().invoke_on(
_client_quotas.shard_id(),
[qid, now](quota_manager& me) { return me.add_quota_id(qid, now); });
_client_quotas.shard_id(), [qid, now](quota_manager& me) mutable {
return me.add_quota_id(std::move(qid), now);
});

it = _client_quotas->find(qid);
if (it == _client_quotas->end()) {
Expand Down
4 changes: 3 additions & 1 deletion src/v/kafka/server/quota_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ class quota_manager : public ss::peering_sharded_service<quota_manager> {
= absl::node_hash_map<tracker_key, ss::lw_shared_ptr<client_quota>>;
using client_quotas_t = ssx::sharded_ptr<client_quotas_map_t>;

explicit quota_manager(client_quotas_t& client_quotas);
quota_manager(
client_quotas_t& client_quotas,
ss::sharded<cluster::client_quota::store>& client_quota_store);
quota_manager(const quota_manager&) = delete;
quota_manager& operator=(const quota_manager&) = delete;
quota_manager(quota_manager&&) = delete;
Expand Down
10 changes: 9 additions & 1 deletion src/v/kafka/server/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ rp_test(
fetch_unit_test.cc
config_utils_test.cc
config_response_utils_test.cc
client_quota_translator_test.cc
DEFINITIONS BOOST_TEST_DYN_LINK
LIBRARIES Boost::unit_test_framework v::kafka
LABELS kafka
Expand Down Expand Up @@ -78,6 +77,15 @@ rp_test(
LABELS kafka
)

rp_test(
FIXTURE_TEST
BINARY_NAME quota_translator
SOURCES client_quota_translator_test.cc
LIBRARIES v::seastar_testing_main v::kafka
ARGS "-- -c 1"
LABELS kafka
)

rp_test(
BENCHMARK_TEST
BINARY_NAME kafka_fetch_plan
Expand Down
99 changes: 63 additions & 36 deletions src/v/kafka/server/tests/client_quota_translator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
// by the Apache License, Version 2.0

#include "base/seastarx.h"
#include "cluster/client_quota_store.h"
#include "config/configuration.h"
#include "kafka/server/client_quota_translator.h"

#include <seastar/testing/thread_test_case.hh>

#include <boost/test/auto_unit_test.hpp>
#include <boost/test/test_tools.hpp>
#include <boost/test/unit_test.hpp>
Expand All @@ -19,8 +22,8 @@

using namespace kafka;

const ss::sstring client_id = "franz-go";
const tracker_key client_id_key = k_client_id{client_id};
const ss::sstring test_client_id = "franz-go";
const tracker_key test_client_id_key = k_client_id{test_client_id};

constexpr std::string_view raw_basic_produce_config = R"([
{
Expand Down Expand Up @@ -53,39 +56,62 @@ const auto CHECK_VARIANT_EQ = [](auto expected, auto got) {
BOOST_CHECK_EQUAL(expected, get<decltype(expected)>(got));
};

BOOST_AUTO_TEST_CASE(quota_translator_default_test) {
client_quota_translator tr;
void reset_configs() {
config::shard_local_cfg().target_quota_byte_rate.reset();
config::shard_local_cfg().target_fetch_quota_byte_rate.reset();
config::shard_local_cfg().kafka_admin_topic_api_rate.reset();
config::shard_local_cfg().kafka_client_group_byte_rate_quota.reset();
config::shard_local_cfg().kafka_client_group_fetch_byte_rate_quota.reset();
}

struct fixture {
ss::sharded<cluster::client_quota::store> quota_store;
kafka::client_quota_translator tr;

fixture()
: tr(std::ref(quota_store)) {
quota_store.start().get();
}

~fixture() { quota_store.stop().get(); }
};

SEASTAR_THREAD_TEST_CASE(quota_translator_default_test) {
reset_configs();
fixture f;

auto default_limits = client_quota_limits{
.produce_limit = 2147483648,
.fetch_limit = std::nullopt,
.partition_mutation_limit = std::nullopt,
};
auto [key, limits] = tr.find_quota(
{client_quota_type::produce_quota, client_id});
BOOST_CHECK_EQUAL(client_id_key, key);
auto [key, limits] = f.tr.find_quota(
{client_quota_type::produce_quota, test_client_id});
BOOST_CHECK_EQUAL(test_client_id_key, key);
BOOST_CHECK_EQUAL(default_limits, limits);
}

BOOST_AUTO_TEST_CASE(quota_translator_modified_default_test) {
SEASTAR_THREAD_TEST_CASE(quota_translator_modified_default_test) {
reset_configs();
config::shard_local_cfg().target_quota_byte_rate.set_value(1111);
config::shard_local_cfg().target_fetch_quota_byte_rate.set_value(2222);
config::shard_local_cfg().kafka_admin_topic_api_rate.set_value(3333);

client_quota_translator tr;
fixture f;

auto expected_limits = client_quota_limits{
.produce_limit = 1111,
.fetch_limit = 2222,
.partition_mutation_limit = 3333,
};
auto [key, limits] = tr.find_quota(
{client_quota_type::produce_quota, client_id});
BOOST_CHECK_EQUAL(client_id_key, key);
auto [key, limits] = f.tr.find_quota(
{client_quota_type::produce_quota, test_client_id});
BOOST_CHECK_EQUAL(test_client_id_key, key);
BOOST_CHECK_EQUAL(expected_limits, limits);
}

BOOST_AUTO_TEST_CASE(quota_translator_client_group_test) {
SEASTAR_THREAD_TEST_CASE(quota_translator_client_group_test) {
reset_configs();
constexpr auto P_DEF = 1111;
constexpr auto F_DEF = 2222;
constexpr auto PM_DEF = 3333;
Expand All @@ -100,18 +126,19 @@ BOOST_AUTO_TEST_CASE(quota_translator_client_group_test) {
.kafka_client_group_fetch_byte_rate_quota.set_value(
YAML::Load(std::string(raw_basic_fetch_config)));

client_quota_translator tr;
fixture f;

// Stage 1 - Start by checking that tracker_key's are correctly detected
// for various client ids
auto get_produce_key = [&tr](auto client_id) {
return tr.find_quota_key({client_quota_type::produce_quota, client_id});
auto get_produce_key = [&f](auto client_id) {
return f.tr.find_quota_key(
{client_quota_type::produce_quota, client_id});
};
auto get_fetch_key = [&tr](auto client_id) {
return tr.find_quota_key({client_quota_type::fetch_quota, client_id});
auto get_fetch_key = [&f](auto client_id) {
return f.tr.find_quota_key({client_quota_type::fetch_quota, client_id});
};
auto get_mutation_key = [&tr](auto client_id) {
return tr.find_quota_key(
auto get_mutation_key = [&f](auto client_id) {
return f.tr.find_quota_key(
{client_quota_type::partition_mutation_quota, client_id});
};

Expand Down Expand Up @@ -146,38 +173,38 @@ BOOST_AUTO_TEST_CASE(quota_translator_client_group_test) {
// Check limits for the franz-go groups
auto franz_go_produce_limits = client_quota_limits{
.produce_limit = 4096,
.fetch_limit = F_DEF,
.partition_mutation_limit = PM_DEF,
.fetch_limit = {},
.partition_mutation_limit = {},
};
BOOST_CHECK_EQUAL(
franz_go_produce_limits,
tr.find_quota_value(k_group_name{"franz-go-produce-group"}));
f.tr.find_quota_value(k_group_name{"franz-go-produce-group"}));
auto franz_go_fetch_limits = client_quota_limits{
.produce_limit = P_DEF,
.produce_limit = {},
.fetch_limit = 4097,
.partition_mutation_limit = PM_DEF,
.partition_mutation_limit = {},
};
BOOST_CHECK_EQUAL(
franz_go_fetch_limits,
tr.find_quota_value(k_group_name{"franz-go-fetch-group"}));
f.tr.find_quota_value(k_group_name{"franz-go-fetch-group"}));

// Check limits for the not-franz-go groups
auto not_franz_go_produce_limits = client_quota_limits{
.produce_limit = 2048,
.fetch_limit = F_DEF,
.partition_mutation_limit = PM_DEF,
.fetch_limit = {},
.partition_mutation_limit = {},
};
BOOST_CHECK_EQUAL(
not_franz_go_produce_limits,
tr.find_quota_value(k_group_name{"not-franz-go-produce-group"}));
f.tr.find_quota_value(k_group_name{"not-franz-go-produce-group"}));
auto not_franz_go_fetch_limits = client_quota_limits{
.produce_limit = P_DEF,
.produce_limit = {},
.fetch_limit = 2049,
.partition_mutation_limit = PM_DEF,
.partition_mutation_limit = {},
};
BOOST_CHECK_EQUAL(
not_franz_go_fetch_limits,
tr.find_quota_value(k_group_name{"not-franz-go-fetch-group"}));
f.tr.find_quota_value(k_group_name{"not-franz-go-fetch-group"}));

// Check limits for the non-client-group keys
auto default_limits = client_quota_limits{
Expand All @@ -186,10 +213,10 @@ BOOST_AUTO_TEST_CASE(quota_translator_client_group_test) {
.partition_mutation_limit = PM_DEF,
};
BOOST_CHECK_EQUAL(
default_limits, tr.find_quota_value(k_client_id{"unknown"}));
BOOST_CHECK_EQUAL(default_limits, tr.find_quota_value(k_client_id{""}));
default_limits, f.tr.find_quota_value(k_client_id{"unknown"}));
BOOST_CHECK_EQUAL(default_limits, f.tr.find_quota_value(k_client_id{""}));
BOOST_CHECK_EQUAL(
default_limits, tr.find_quota_value(k_client_id{"franz-go"}));
default_limits, f.tr.find_quota_value(k_client_id{"franz-go"}));
BOOST_CHECK_EQUAL(
default_limits, tr.find_quota_value(k_client_id{"not-franz-go"}));
default_limits, f.tr.find_quota_value(k_client_id{"not-franz-go"}));
}
Loading
Loading