From 78e9a735563e9ba46dd6496b2b875af2c6237eaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gell=C3=A9rt=20Peresztegi-Nagy?= Date: Tue, 18 Jun 2024 10:31:12 +0100 Subject: [PATCH 1/4] utils: add `log_hist_client_quota` --- src/v/utils/log_hist.cc | 14 ++++++++++++++ src/v/utils/log_hist.h | 17 +++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/src/v/utils/log_hist.cc b/src/v/utils/log_hist.cc index 2ab590ed025e..ddf132956e1b 100644 --- a/src/v/utils/log_hist.cc +++ b/src/v/utils/log_hist.cc @@ -104,9 +104,23 @@ log_hist:: return seastar_histogram_logform(); } +template< + typename duration_t, + int number_of_buckets, + uint64_t first_bucket_upper_bound> +seastar::metrics::histogram +log_hist:: + client_quota_histogram_logform() const { + using client_quota_config = logform_config<1'000l, 1ul, 15>; + + return seastar_histogram_logform(); +} + // Explicit instantiation for log_hist_public template class log_hist; // Explicit instantiation for log_hist_internal template class log_hist; // Explicit instantiation for log_hist_read_dist template class log_hist; +// Explicit instantiation for log_hist_client_quota +template class log_hist; diff --git a/src/v/utils/log_hist.h b/src/v/utils/log_hist.h index e8922a3e709f..c9e82c20783c 100644 --- a/src/v/utils/log_hist.h +++ b/src/v/utils/log_hist.h @@ -208,6 +208,15 @@ class log_hist { */ seastar::metrics::histogram read_dist_histogram_logform() const; + /* + * Generates a Prometheus histogram with 15 buckets. The first bucket has an + * upper bound of 1 - 1 and subsequent buckets have an upper bound of 2 + * times the upper bound of the previous bucket. + * + * This is the histogram type used for the Kafka client quota distribution. + */ + seastar::metrics::histogram client_quota_histogram_logform() const; + private: friend measurement; @@ -241,3 +250,11 @@ using log_hist_internal = log_hist; * the first bucket to greater than 91 days in the last bucket. */ using log_hist_read_dist = log_hist; + +/* + * This histogram has units of milliseconds instead of microseconds, and is + * used for measuring the Kafka client quota delays on the scale of less than + * 1 milliseconds in the first bucket to greater than 32 seconds in the last + * bucket. + */ +using log_hist_client_quota = log_hist; From 224ed42db38e47b8e52cb34e3422c035c4c6af9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gell=C3=A9rt=20Peresztegi-Nagy?= Date: Fri, 7 Jun 2024 13:53:09 +0100 Subject: [PATCH 2/4] k/quota_mgr: add metrics on throttling This adds two new metrics that provide insight into why and how throttling is happening inside quota_manager. * client_quota_throttle_time: shows how much delay is being applied to client requests because of throttling. * client_quota_throughput: shows the throughput that is used to calculate the throttling delays from Both metrics are at the granularity of per rule and per quota type, to provide a high-level insight into why and where throttling is happening. For client-id-specific insight, the trace level logger can be used. --- src/v/kafka/server/client_quota_translator.h | 13 ++ src/v/kafka/server/quota_manager.cc | 125 +++++++++++++++++++ src/v/kafka/server/quota_manager.h | 5 + 3 files changed, 143 insertions(+) diff --git a/src/v/kafka/server/client_quota_translator.h b/src/v/kafka/server/client_quota_translator.h index 31d4c00700de..c0c54c9fa048 100644 --- a/src/v/kafka/server/client_quota_translator.h +++ b/src/v/kafka/server/client_quota_translator.h @@ -65,6 +65,11 @@ enum class client_quota_type { partition_mutation_quota }; +static constexpr std::array all_client_quota_types = { + client_quota_type::produce_quota, + client_quota_type::fetch_quota, + client_quota_type::partition_mutation_quota}; + std::ostream& operator<<(std::ostream&, client_quota_type); struct client_quota_request_ctx { @@ -85,6 +90,14 @@ enum class client_quota_rule { kafka_client_id }; +static constexpr std::array all_client_quota_rules = { + client_quota_rule::not_applicable, + client_quota_rule::kafka_client_default, + client_quota_rule::cluster_client_default, + client_quota_rule::kafka_client_prefix, + client_quota_rule::cluster_client_prefix, + client_quota_rule::kafka_client_id}; + std::ostream& operator<<(std::ostream&, client_quota_rule); struct client_quota_value { diff --git a/src/v/kafka/server/quota_manager.cc b/src/v/kafka/server/quota_manager.cc index 47dc57468399..0ed4a9230126 100644 --- a/src/v/kafka/server/quota_manager.cc +++ b/src/v/kafka/server/quota_manager.cc @@ -16,10 +16,14 @@ #include "kafka/server/atomic_token_bucket.h" #include "kafka/server/client_quota_translator.h" #include "kafka/server/logger.h" +#include "metrics/metrics.h" +#include "prometheus/prometheus_sanitize.h" #include "ssx/future-util.h" +#include "utils/log_hist.h" #include #include +#include #include #include #include @@ -27,6 +31,7 @@ #include #include +#include #include #include #include @@ -35,6 +40,114 @@ using namespace std::chrono_literals; namespace kafka { +template +class client_quotas_probe { +public: + client_quotas_probe() = default; + client_quotas_probe(const client_quotas_probe&) = delete; + client_quotas_probe& operator=(const client_quotas_probe&) = delete; + client_quotas_probe(client_quotas_probe&&) = delete; + client_quotas_probe& operator=(client_quotas_probe&&) = delete; + ~client_quotas_probe() noexcept = default; + + void setup_metrics() { + namespace sm = ss::metrics; + + auto metric_defs = std::vector{}; + metric_defs.reserve( + all_client_quota_types.size() * all_client_quota_rules.size() * 2); + + auto rule_label = metrics::make_namespaced_label("quota_rule"); + auto quota_type_label = metrics::make_namespaced_label("quota_type"); + + for (auto quota_type : all_client_quota_types) { + for (auto rule : all_client_quota_rules) { + metric_defs.emplace_back( + sm::make_histogram( + "client_quota_throttle_time", + [this, rule, quota_type] { + return get_throttle_time(rule, quota_type); + }, + sm::description( + "Client quota throttling delay per rule and " + "quota type (in seconds)"), + {rule_label(rule), quota_type_label(quota_type)}) + .aggregate({sm::shard_label})); + metric_defs.emplace_back( + sm::make_histogram( + "client_quota_throughput", + [this, rule, quota_type] { + return get_throughput(rule, quota_type); + }, + sm::description( + "Client quota throughput per rule and quota type"), + {rule_label(rule), quota_type_label(quota_type)}) + .aggregate({sm::shard_label})); + } + } + + auto group_name = prometheus_sanitize::metrics_name("kafka:quotas"); + if (!config::shard_local_cfg().disable_metrics()) { + _internal_metrics.add_group(group_name, metric_defs); + } + if (!config::shard_local_cfg().disable_public_metrics()) { + _public_metrics.add_group(group_name, metric_defs); + } + } + + void record_throttle_time( + client_quota_rule rule, client_quota_type qt, clock::duration t) { + auto& gm = (*this)(rule, qt); + // The Kafka response field `ThrottleTimeMs` is in milliseconds, so + // round the metric to milliseconds as well. + auto t_ms = std::chrono::duration_cast(t); + gm.throttle_time.record(t_ms); + } + void record_throughput( + client_quota_rule rule, client_quota_type qt, uint64_t count) { + auto& gm = (*this)(rule, qt); + gm.throughput.record(count); + } + +private: + ss::metrics::histogram + get_throttle_time(client_quota_rule rule, client_quota_type qt) const { + auto& gm = (*this)(rule, qt); + return gm.throttle_time.client_quota_histogram_logform(); + } + ss::metrics::histogram + get_throughput(client_quota_rule rule, client_quota_type qt) const { + auto& gm = (*this)(rule, qt); + return gm.throughput.internal_histogram_logform(); + } + + struct granular_metrics { + log_hist_client_quota throttle_time; + log_hist_internal throughput; + }; + + granular_metrics& + operator()(client_quota_rule rule, client_quota_type type) { + return _metrics[static_cast(rule)][static_cast(type)]; + } + + const granular_metrics& + operator()(client_quota_rule rule, client_quota_type type) const { + return _metrics[static_cast(rule)][static_cast(type)]; + } + + // Assume the enums values are in sequence: [0, all_*.size()) + static_assert(static_cast(all_client_quota_rules[0]) == 0); + static_assert(static_cast(all_client_quota_types[0]) == 0); + using metrics_container_t = std::array< + std::array, + all_client_quota_rules.size()>; + + metrics::internal_metric_groups _internal_metrics; + metrics::public_metric_groups _public_metrics; + metrics_container_t _metrics{}; +}; + using clock = quota_manager::clock; quota_manager::quota_manager( @@ -60,9 +173,12 @@ quota_manager::~quota_manager() { _gc_timer.cancel(); } ss::future<> quota_manager::stop() { _gc_timer.cancel(); co_await _gate.close(); + _probe.reset(); } ss::future<> quota_manager::start() { + _probe = std::make_unique>(); + _probe->setup_metrics(); if (ss::this_shard_id() == _client_quotas.shard_id()) { co_await _client_quotas.reset(client_quotas_map_t{}); _gc_timer.arm_periodic(_gc_freq); @@ -207,7 +323,9 @@ ss::future quota_manager::record_partition_mutations( .client_id = client_id, }; auto [key, value] = _translator.find_quota(ctx); + _probe->record_throughput(value.rule, ctx.q_type, mutations); if (!value.limit) { + _probe->record_throttle_time(value.rule, ctx.q_type, 0ms); vlog( client_quota_log.trace, "request: ctx:{}, key:{}, value:{}, mutations: {}, delay:{}" @@ -235,6 +353,7 @@ ss::future quota_manager::record_partition_mutations( }); auto capped_delay = cap_to_max_delay(key, delay); + _probe->record_throttle_time(value.rule, ctx.q_type, capped_delay); vlog( client_quota_log.trace, "request: ctx:{}, key:{}, value:{}, mutations: {}, delay:{}" @@ -275,7 +394,9 @@ ss::future quota_manager::record_produce_tp_and_throttle( .client_id = client_id, }; auto [key, value] = _translator.find_quota(ctx); + _probe->record_throughput(value.rule, ctx.q_type, bytes); if (!value.limit) { + _probe->record_throttle_time(value.rule, ctx.q_type, 0ms); vlog( client_quota_log.trace, "request: ctx:{}, key:{}, value:{}, bytes: {}, delay:{}, " @@ -299,6 +420,7 @@ ss::future quota_manager::record_produce_tp_and_throttle( }); auto capped_delay = cap_to_max_delay(key, delay); + _probe->record_throttle_time(value.rule, ctx.q_type, capped_delay); vlog( client_quota_log.trace, "request: ctx:{}, key:{}, value:{}, bytes: {}, delay:{}, " @@ -321,6 +443,7 @@ ss::future<> quota_manager::record_fetch_tp( .client_id = client_id, }; auto [key, value] = _translator.find_quota(ctx); + _probe->record_throughput(value.rule, ctx.q_type, bytes); vlog( client_quota_log.trace, "record request: ctx:{}, key:{}, value:{}, bytes:{}", @@ -350,6 +473,7 @@ ss::future quota_manager::throttle_fetch_tp( }; auto [key, value] = _translator.find_quota(ctx); if (!value.limit) { + _probe->record_throttle_time(value.rule, ctx.q_type, 0ms); vlog( client_quota_log.trace, "throttle request: ctx:{}, key:{}, value:{}, delay:{}, " @@ -372,6 +496,7 @@ ss::future quota_manager::throttle_fetch_tp( }); auto capped_delay = cap_to_max_delay(key, delay); + _probe->record_throttle_time(value.rule, ctx.q_type, capped_delay); vlog( client_quota_log.trace, "throttle request: ctx:{}, key:{}, value:{}, delay:{}, " diff --git a/src/v/kafka/server/quota_manager.h b/src/v/kafka/server/quota_manager.h index 2ac2816cd310..018d6a1a866f 100644 --- a/src/v/kafka/server/quota_manager.h +++ b/src/v/kafka/server/quota_manager.h @@ -31,11 +31,15 @@ #include #include +#include #include #include namespace kafka { +template +class client_quotas_probe; + // quota_manager tracks quota usage // // TODO: @@ -138,6 +142,7 @@ class quota_manager : public ss::peering_sharded_service { client_quotas_t& _client_quotas; client_quota_translator _translator; + std::unique_ptr> _probe; ss::timer<> _gc_timer; clock::duration _gc_freq; From fb8269eac6b468f911260457c83fe990e398fdeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gell=C3=A9rt=20Peresztegi-Nagy?= Date: Fri, 7 Jun 2024 15:12:08 +0100 Subject: [PATCH 3/4] dt/quotas: refactor producer/consumer creation There are many places in this file where producers and consumers are created with the same arguments. This refactors the tests to create the producers and consumers using factory methods. This also sets retries=2 and request_timeout_ms=60000 consistently on all producers. An earlier commit set this for one of the tests referencing CI failures, so it makes sense to be defensive with this config and set this more widely. --- tests/rptest/tests/cluster_quota_test.py | 189 +++++++---------------- 1 file changed, 54 insertions(+), 135 deletions(-) diff --git a/tests/rptest/tests/cluster_quota_test.py b/tests/rptest/tests/cluster_quota_test.py index cc6035361393..1ca0e6cc3820 100644 --- a/tests/rptest/tests/cluster_quota_test.py +++ b/tests/rptest/tests/cluster_quota_test.py @@ -215,6 +215,32 @@ def fetch(self, consumer, messages_amount, timeout_sec=300): if now > deadline: raise TimeoutError() + def make_producer(self, + client_id: Optional[str] = None, + *args, + **kwargs) -> KafkaProducer: + return KafkaProducer(acks="all", + bootstrap_servers=self.leader_node, + value_serializer=str.encode, + retries=2, + request_timeout_ms=60000, + client_id=client_id, + *args, + **kwargs) + + def make_consumer( + self, + client_id: Optional[str] = None, + max_partition_fetch_bytes: Optional[int] = None) -> KafkaConsumer: + mpfb = max_partition_fetch_bytes if max_partition_fetch_bytes else self.max_partition_fetch_bytes + return KafkaConsumer(self.topic, + bootstrap_servers=self.leader_node, + client_id=client_id, + consumer_timeout_ms=1000, + max_partition_fetch_bytes=mpfb, + auto_offset_reset='earliest', + enable_auto_commit=False) + @cluster(num_nodes=3) def test_client_group_produce_rate_throttle_mechanism(self): """ @@ -237,11 +263,7 @@ def test_client_group_produce_rate_throttle_mechanism(self): ] }) - producer = KafkaProducer(acks="all", - bootstrap_servers=self.leader_node, - value_serializer=str.encode, - retries=1, - client_id="producer_group_alone_producer") + producer = self.make_producer("producer_group_alone_producer") # Produce under the limit self.produce(producer, self.under_group_quota_message_amount) @@ -251,16 +273,8 @@ def test_client_group_produce_rate_throttle_mechanism(self): self.produce(producer, self.break_group_quota_message_amount) self.check_producer_throttled(producer) - producer_1 = KafkaProducer(acks="all", - bootstrap_servers=self.leader_node, - value_serializer=str.encode, - retries=1, - client_id="producer_group_multiple_1") - - producer_2 = KafkaProducer(acks="all", - bootstrap_servers=self.leader_node, - value_serializer=str.encode, - client_id="producer_group_multiple_2") + producer_1 = self.make_producer("producer_group_multiple_1") + producer_2 = self.make_producer("producer_group_multiple_2") # Produce under the limit self.produce(producer_1, self.under_group_quota_message_amount) @@ -282,10 +296,7 @@ def test_client_group_produce_rate_throttle_mechanism(self): } }) - producer = KafkaProducer(acks="all", - bootstrap_servers=self.leader_node, - value_serializer=str.encode, - client_id="new_producer_group_producer") + producer = self.make_producer("new_producer_group_producer") self.produce(producer, self.break_group_quota_message_amount) self.check_producer_throttled(producer) @@ -312,42 +323,17 @@ def test_client_group_consume_rate_throttle_mechanism(self): ] }) - producer = KafkaProducer(acks="all", - bootstrap_servers=self.leader_node, - value_serializer=str.encode, - retries=1) + producer = self.make_producer() self.produce(producer, self.break_group_quota_message_amount * 2) - consumer = KafkaConsumer( - self.topic, - bootstrap_servers=self.leader_node, - client_id="consumer_alone", - consumer_timeout_ms=1000, - max_partition_fetch_bytes=self.max_partition_fetch_bytes, - auto_offset_reset='earliest', - enable_auto_commit=False) + consumer = self.make_consumer("consumer_alone") # Fetch more the limit self.fetch(consumer, self.break_group_quota_message_amount) self.check_consumer_throttled(consumer) - consumer_1 = KafkaConsumer( - self.topic, - bootstrap_servers=self.leader_node, - client_id="consumer_multiple_1", - consumer_timeout_ms=1000, - max_partition_fetch_bytes=self.max_partition_fetch_bytes, - auto_offset_reset='earliest', - enable_auto_commit=False) - - consumer_2 = KafkaConsumer( - self.topic, - bootstrap_servers=self.leader_node, - client_id="consumer_multiple_2", - consumer_timeout_ms=1000, - max_partition_fetch_bytes=self.max_partition_fetch_bytes, - auto_offset_reset='earliest', - enable_auto_commit=False) + consumer_1 = self.make_consumer("consumer_multiple_1") + consumer_2 = self.make_consumer("consumer_multiple_2") # Consume under the limit self.fetch(consumer_2, 10) @@ -369,14 +355,7 @@ def test_client_group_consume_rate_throttle_mechanism(self): } }) - consumer = KafkaConsumer( - self.topic, - bootstrap_servers=self.leader_node, - client_id="new_consumer", - consumer_timeout_ms=1000, - max_partition_fetch_bytes=self.max_partition_fetch_bytes, - auto_offset_reset='earliest', - enable_auto_commit=False) + consumer = self.make_consumer("new_consumer") self.fetch(consumer, self.break_group_quota_message_amount) self.check_consumer_throttled(consumer) @@ -388,21 +367,8 @@ def test_client_response_throttle_mechanism(self): """ self.init_test_data() - producer = KafkaProducer(acks="all", - bootstrap_servers=self.leader_node, - value_serializer=str.encode, - retries=2, - request_timeout_ms=60000, - client_id="producer") - - consumer = KafkaConsumer( - self.topic, - bootstrap_servers=self.leader_node, - client_id="consumer", - consumer_timeout_ms=1000, - max_partition_fetch_bytes=self.max_partition_fetch_bytes, - auto_offset_reset='earliest', - enable_auto_commit=False) + producer = self.make_producer("producer") + consumer = self.make_consumer("consumer") self.produce(producer, self.break_default_quota_message_amount * 2) @@ -421,22 +387,13 @@ def test_client_response_throttle_mechanism_applies_to_next_request(self): """ self.init_test_data() - producer = KafkaProducer(acks="all", - bootstrap_servers=self.leader_node, - value_serializer=str.encode, - retries=1, - client_id="producer") - - consumer = KafkaConsumer( - self.topic, - bootstrap_servers=self.leader_node, - client_id="consumer", - consumer_timeout_ms=1000, - # Set the max fetch size such that the first fetch is above the quota limit AND completes in a single request + producer = self.make_producer("producer") + + # Set the max fetch size such that the first fetch is above the quota limit AND completes in a single request + consumer = self.make_consumer( + "consumer", max_partition_fetch_bytes=self.break_default_quota_message_amount * - self.message_size, - auto_offset_reset='earliest', - enable_auto_commit=False) + self.message_size) # Ensure we have plenty of data to consume self.produce(producer, self.break_default_quota_message_amount * 2) @@ -466,20 +423,8 @@ def test_client_response_and_produce_throttle_mechanism(self): } }) # Producer and Consumer same client_id - producer = KafkaProducer(acks="all", - bootstrap_servers=self.leader_node, - value_serializer=str.encode, - retries=1, - client_id="throttle_producer_only") - - consumer = KafkaConsumer( - self.topic, - bootstrap_servers=self.leader_node, - client_id="throttle_producer_only", - consumer_timeout_ms=1000, - max_partition_fetch_bytes=self.max_partition_fetch_bytes, - auto_offset_reset='earliest', - enable_auto_commit=False) + producer = self.make_producer("throttle_producer_only") + consumer = self.make_consumer("throttle_producer_only") # Produce more than limit self.produce(producer, self.break_group_quota_message_amount) @@ -502,20 +447,8 @@ def test_client_response_and_produce_throttle_mechanism(self): } }) # Producer and Consumer same client_id - producer = KafkaProducer(acks="all", - bootstrap_servers=self.leader_node, - value_serializer=str.encode, - retries=1, - client_id="throttle_consumer_only") - - consumer = KafkaConsumer( - self.topic, - bootstrap_servers=self.leader_node, - client_id="throttle_consumer_only", - consumer_timeout_ms=1000, - max_partition_fetch_bytes=self.max_partition_fetch_bytes, - auto_offset_reset='earliest', - enable_auto_commit=False) + producer = self.make_producer("throttle_consumer_only") + consumer = self.make_consumer("throttle_consumer_only") # Fetch more than limit self.fetch(consumer, self.break_group_quota_message_amount) @@ -540,27 +473,13 @@ def test_throttling_ms_enforcement_is_per_connection(self): {"max_kafka_throttle_delay_ms": "1000"}) # Create two producers sharing a client.id - def make_producer(): - return KafkaProducer( - acks="all", - bootstrap_servers=self.leader_node, - value_serializer=str.encode, - retries=1, - client_id="shared_client_id", - max_request_size=1 * GB, - max_in_flight_requests_per_connection=1, - ) - - producer1 = make_producer() - producer2 = make_producer() - consumer = KafkaConsumer( - self.topic, - bootstrap_servers=self.leader_node, - client_id="shared_client_id", - consumer_timeout_ms=1000, - max_partition_fetch_bytes=self.max_partition_fetch_bytes, - auto_offset_reset='earliest', - enable_auto_commit=False) + producer1 = self.make_producer("shared_client_id", + max_request_size=1 * GB, + max_in_flight_requests_per_connection=1) + producer2 = self.make_producer("shared_client_id", + max_request_size=1 * GB, + max_in_flight_requests_per_connection=1) + consumer = self.make_consumer("shared_client_id") # Produce above the produce quota limit self.produce(producer1, 1, self.large_msg) From c53cb026e7fe539bd8ed43fc396f4a81db7907f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gell=C3=A9rt=20Peresztegi-Nagy?= Date: Fri, 7 Jun 2024 13:53:40 +0100 Subject: [PATCH 4/4] dt/quotas: test client quota metrics --- tests/rptest/tests/cluster_quota_test.py | 118 ++++++++++++++++++++++- 1 file changed, 117 insertions(+), 1 deletion(-) diff --git a/tests/rptest/tests/cluster_quota_test.py b/tests/rptest/tests/cluster_quota_test.py index 1ca0e6cc3820..e1037db6809c 100644 --- a/tests/rptest/tests/cluster_quota_test.py +++ b/tests/rptest/tests/cluster_quota_test.py @@ -11,10 +11,11 @@ import time import random import string +from typing import NamedTuple, Optional from ducktape.utils.util import wait_until from rptest.clients.rpk import RpkTool -from rptest.services.redpanda import ResourceSettings, LoggingConfig +from rptest.services.redpanda import MetricSample, MetricSamples, MetricsEndpoint, ResourceSettings, LoggingConfig from kafka import KafkaProducer, KafkaConsumer, TopicPartition from rptest.clients.kcl import RawKCL, KclCreateTopicsRequestTopic, \ KclCreatePartitionsRequestTopic @@ -25,6 +26,20 @@ GB = 1_000_000_000 +class ExpectedMetric(NamedTuple): + labels: dict[str, str] + + +class ExpectedMetrics(NamedTuple): + metrics: list[ExpectedMetric] + + def has_matching(self, got_labels: dict[str, str]) -> bool: + """ + Returns true if there is a metric in the list of expected metrics with a matching label + """ + return any(expected.labels == got_labels for expected in self.metrics) + + class ClusterQuotaPartitionMutationTest(RedpandaTest): """ Ducktape tests for partition mutation quota @@ -517,3 +532,104 @@ def test_throttling_ms_enforcement_is_per_connection(self): timeout_sec=10, err_msg="Subsequent messages should be throttled broker-side", ) + + def get_metrics(self, metric: str) -> list[MetricSample]: + metrics = self.redpanda.metrics_sample( + metric, metrics_endpoint=MetricsEndpoint.METRICS) + + assert metrics, f"Metric is missing: {metric}" + self.logger.debug(f"Samples for {metric}: {metrics.samples}") + return metrics.samples + + @cluster(num_nodes=1) + def test_client_quota_metrics(self): + self.init_test_data() + self.redpanda.set_cluster_config({ + "kafka_client_group_byte_rate_quota": { + "group_name": "producer_with_group", + "clients_prefix": "producer_with_group", + "quota": self.target_group_quota_byte_rate, + }, + "kafka_client_group_fetch_byte_rate_quota": { + "group_name": "consumer_with_group", + "clients_prefix": "consumer_with_group", + "quota": self.target_group_quota_byte_rate, + }, + "target_quota_byte_rate": + self.target_default_quota_byte_rate, + "target_fetch_quota_byte_rate": + self.target_default_quota_byte_rate, + }) + + producer_with_group = self.make_producer("producer_with_group") + consumer_with_group = self.make_consumer("consumer_with_group") + unknown_producer = self.make_producer("unknown_producer") + unknown_consumer = self.make_consumer("unknown_consumer") + + # When the test topic is created, throughput for this metric is recorded + expected_pm_metrics = ExpectedMetrics([ + ExpectedMetric({ + 'redpanda_quota_rule': 'not_applicable', + 'redpanda_quota_type': 'partition_mutation_quota', + }), + ]) + + # When we produce/fetch to/from the cluster, the metrics with these label should update + expected_tput_metrics = ExpectedMetrics([ + ExpectedMetric({ + 'redpanda_quota_rule': 'cluster_client_prefix', + 'redpanda_quota_type': 'produce_quota', + }), + ExpectedMetric({ + 'redpanda_quota_rule': 'cluster_client_prefix', + 'redpanda_quota_type': 'fetch_quota', + }), + ExpectedMetric({ + 'redpanda_quota_rule': 'cluster_client_default', + 'redpanda_quota_type': 'produce_quota', + }), + ExpectedMetric({ + 'redpanda_quota_rule': 'cluster_client_default', + 'redpanda_quota_type': 'fetch_quota', + }), + ]) + + def check_sample(sample: MetricSample, assertion: bool): + assert assertion, f"Unexpected sample: {sample}." + + self.redpanda.logger.debug("Produce under the limit") + self.produce(producer_with_group, 1) + self.fetch(consumer_with_group, 1) + self.produce(unknown_producer, 1) + self.fetch(unknown_consumer, 1) + + self.redpanda.logger.debug( + "Assert that throttling time is 0 when under the limit") + for sample in self.get_metrics( + "vectorized_kafka_quotas_client_quota_throttle_time_sum"): + check_sample(sample, sample.value == 0) + + self.redpanda.logger.debug( + "Assert that throughput is recorded with the expected labels") + for sample in self.get_metrics("client_quota_throughput_sum"): + all_expected_metrics = ExpectedMetrics( + expected_pm_metrics.metrics + expected_tput_metrics.metrics) + if all_expected_metrics.has_matching(sample.labels): + check_sample(sample, sample.value > 0) + else: + check_sample(sample, sample.value == 0) + + self.redpanda.logger.debug("Produce over the limit") + self.produce(producer_with_group, + self.break_group_quota_message_amount) + self.fetch(consumer_with_group, self.break_group_quota_message_amount) + self.produce(unknown_producer, self.break_default_quota_message_amount) + self.fetch(unknown_consumer, self.break_default_quota_message_amount) + + self.redpanda.logger.debug( + "Assert that throttling time is positive when over the limit") + for sample in self.get_metrics("client_quota_throttle_time_sum"): + if expected_tput_metrics.has_matching(sample.labels): + check_sample(sample, sample.value > 0) + else: + check_sample(sample, sample.value == 0)