diff --git a/src/v/kafka/server/client_quota_translator.h b/src/v/kafka/server/client_quota_translator.h index 31d4c00700de..c0c54c9fa048 100644 --- a/src/v/kafka/server/client_quota_translator.h +++ b/src/v/kafka/server/client_quota_translator.h @@ -65,6 +65,11 @@ enum class client_quota_type { partition_mutation_quota }; +static constexpr std::array all_client_quota_types = { + client_quota_type::produce_quota, + client_quota_type::fetch_quota, + client_quota_type::partition_mutation_quota}; + std::ostream& operator<<(std::ostream&, client_quota_type); struct client_quota_request_ctx { @@ -85,6 +90,14 @@ enum class client_quota_rule { kafka_client_id }; +static constexpr std::array all_client_quota_rules = { + client_quota_rule::not_applicable, + client_quota_rule::kafka_client_default, + client_quota_rule::cluster_client_default, + client_quota_rule::kafka_client_prefix, + client_quota_rule::cluster_client_prefix, + client_quota_rule::kafka_client_id}; + std::ostream& operator<<(std::ostream&, client_quota_rule); struct client_quota_value { diff --git a/src/v/kafka/server/quota_manager.cc b/src/v/kafka/server/quota_manager.cc index 2d9c2fc6dafd..c7bcd0438b54 100644 --- a/src/v/kafka/server/quota_manager.cc +++ b/src/v/kafka/server/quota_manager.cc @@ -16,10 +16,14 @@ #include "kafka/server/atomic_token_bucket.h" #include "kafka/server/client_quota_translator.h" #include "kafka/server/logger.h" +#include "metrics/metrics.h" +#include "prometheus/prometheus_sanitize.h" #include "ssx/future-util.h" +#include "utils/log_hist.h" #include #include +#include #include #include #include @@ -27,6 +31,7 @@ #include #include +#include #include #include #include @@ -35,6 +40,114 @@ using namespace std::chrono_literals; namespace kafka { +template +class client_quotas_probe { +public: + client_quotas_probe() = default; + client_quotas_probe(const client_quotas_probe&) = delete; + client_quotas_probe& operator=(const client_quotas_probe&) = delete; + client_quotas_probe(client_quotas_probe&&) = delete; + client_quotas_probe& operator=(client_quotas_probe&&) = delete; + ~client_quotas_probe() noexcept = default; + + void setup_metrics() { + namespace sm = ss::metrics; + + auto metric_defs = std::vector{}; + metric_defs.reserve( + all_client_quota_types.size() * all_client_quota_rules.size() * 2); + + auto rule_label = metrics::make_namespaced_label("quota_rule"); + auto quota_type_label = metrics::make_namespaced_label("quota_type"); + + for (auto quota_type : all_client_quota_types) { + for (auto rule : all_client_quota_rules) { + metric_defs.emplace_back( + sm::make_histogram( + "client_quota_throttle_time", + [this, rule, quota_type] { + return get_throttle_time(rule, quota_type); + }, + sm::description( + "Client quota throttling delay per rule and " + "quota type (in seconds)"), + {rule_label(rule), quota_type_label(quota_type)}) + .aggregate({sm::shard_label})); + metric_defs.emplace_back( + sm::make_histogram( + "client_quota_throughput", + [this, rule, quota_type] { + return get_throughput(rule, quota_type); + }, + sm::description( + "Client quota throughput per rule and quota type"), + {rule_label(rule), quota_type_label(quota_type)}) + .aggregate({sm::shard_label})); + } + } + + auto group_name = prometheus_sanitize::metrics_name("kafka:quotas"); + if (!config::shard_local_cfg().disable_metrics()) { + _internal_metrics.add_group(group_name, metric_defs); + } + if (!config::shard_local_cfg().disable_public_metrics()) { + _public_metrics.add_group(group_name, metric_defs); + } + } + + void record_throttle_time( + client_quota_rule rule, client_quota_type qt, clock::duration t) { + auto& gm = (*this)(rule, qt); + // The Kafka response field `ThrottleTimeMs` is in milliseconds, so + // round the metric to milliseconds as well. + auto t_ms = std::chrono::duration_cast(t); + gm.throttle_time.record(t_ms); + } + void record_throughput( + client_quota_rule rule, client_quota_type qt, uint64_t count) { + auto& gm = (*this)(rule, qt); + gm.throughput.record(count); + } + +private: + ss::metrics::histogram + get_throttle_time(client_quota_rule rule, client_quota_type qt) const { + auto& gm = (*this)(rule, qt); + return gm.throttle_time.client_quota_histogram_logform(); + } + ss::metrics::histogram + get_throughput(client_quota_rule rule, client_quota_type qt) const { + auto& gm = (*this)(rule, qt); + return gm.throughput.internal_histogram_logform(); + } + + struct granular_metrics { + log_hist_client_quota throttle_time; + log_hist_internal throughput; + }; + + granular_metrics& + operator()(client_quota_rule rule, client_quota_type type) { + return _metrics[static_cast(rule)][static_cast(type)]; + } + + const granular_metrics& + operator()(client_quota_rule rule, client_quota_type type) const { + return _metrics[static_cast(rule)][static_cast(type)]; + } + + // Assume the enums values are in sequence: [0, all_*.size()) + static_assert(static_cast(all_client_quota_rules[0]) == 0); + static_assert(static_cast(all_client_quota_types[0]) == 0); + using metrics_container_t = std::array< + std::array, + all_client_quota_rules.size()>; + + metrics::internal_metric_groups _internal_metrics; + metrics::public_metric_groups _public_metrics; + metrics_container_t _metrics{}; +}; + using clock = quota_manager::clock; quota_manager::quota_manager( @@ -58,9 +171,12 @@ quota_manager::~quota_manager() { _gc_timer.cancel(); } ss::future<> quota_manager::stop() { _gc_timer.cancel(); co_await _gate.close(); + _probe.reset(); } ss::future<> quota_manager::start() { + _probe = std::make_unique>(); + _probe->setup_metrics(); if (ss::this_shard_id() == _client_quotas.shard_id()) { co_await _client_quotas.reset(client_quotas_map_t{}); _gc_timer.arm_periodic(_gc_freq); @@ -208,7 +324,9 @@ ss::future quota_manager::record_partition_mutations( .client_id = client_id, }; auto [key, value] = _translator.find_quota(ctx); + _probe->record_throughput(value.rule, ctx.q_type, mutations); if (!value.limit) { + _probe->record_throttle_time(value.rule, ctx.q_type, 0ms); vlog( client_quota_log.trace, "request: ctx:{}, key:{}, value:{}, mutations: {}, delay:{}" @@ -236,6 +354,7 @@ ss::future quota_manager::record_partition_mutations( }); auto capped_delay = cap_to_max_delay(key, delay); + _probe->record_throttle_time(value.rule, ctx.q_type, capped_delay); vlog( client_quota_log.trace, "request: ctx:{}, key:{}, value:{}, mutations: {}, delay:{}" @@ -276,7 +395,9 @@ ss::future quota_manager::record_produce_tp_and_throttle( .client_id = client_id, }; auto [key, value] = _translator.find_quota(ctx); + _probe->record_throughput(value.rule, ctx.q_type, bytes); if (!value.limit) { + _probe->record_throttle_time(value.rule, ctx.q_type, 0ms); vlog( client_quota_log.trace, "request: ctx:{}, key:{}, value:{}, bytes: {}, delay:{}, " @@ -300,6 +421,7 @@ ss::future quota_manager::record_produce_tp_and_throttle( }); auto capped_delay = cap_to_max_delay(key, delay); + _probe->record_throttle_time(value.rule, ctx.q_type, capped_delay); vlog( client_quota_log.trace, "request: ctx:{}, key:{}, value:{}, bytes: {}, delay:{}, " @@ -322,6 +444,7 @@ ss::future<> quota_manager::record_fetch_tp( .client_id = client_id, }; auto [key, value] = _translator.find_quota(ctx); + _probe->record_throughput(value.rule, ctx.q_type, bytes); vlog( client_quota_log.trace, "record request: ctx:{}, key:{}, value:{}, bytes:{}", @@ -351,6 +474,7 @@ ss::future quota_manager::throttle_fetch_tp( }; auto [key, value] = _translator.find_quota(ctx); if (!value.limit) { + _probe->record_throttle_time(value.rule, ctx.q_type, 0ms); vlog( client_quota_log.trace, "throttle request: ctx:{}, key:{}, value:{}, delay:{}, " @@ -373,6 +497,7 @@ ss::future quota_manager::throttle_fetch_tp( }); auto capped_delay = cap_to_max_delay(key, delay); + _probe->record_throttle_time(value.rule, ctx.q_type, capped_delay); vlog( client_quota_log.trace, "throttle request: ctx:{}, key:{}, value:{}, delay:{}, " diff --git a/src/v/kafka/server/quota_manager.h b/src/v/kafka/server/quota_manager.h index 2ac2816cd310..018d6a1a866f 100644 --- a/src/v/kafka/server/quota_manager.h +++ b/src/v/kafka/server/quota_manager.h @@ -31,11 +31,15 @@ #include #include +#include #include #include namespace kafka { +template +class client_quotas_probe; + // quota_manager tracks quota usage // // TODO: @@ -138,6 +142,7 @@ class quota_manager : public ss::peering_sharded_service { client_quotas_t& _client_quotas; client_quota_translator _translator; + std::unique_ptr> _probe; ss::timer<> _gc_timer; clock::duration _gc_freq; diff --git a/src/v/utils/log_hist.cc b/src/v/utils/log_hist.cc index 2ab590ed025e..ddf132956e1b 100644 --- a/src/v/utils/log_hist.cc +++ b/src/v/utils/log_hist.cc @@ -104,9 +104,23 @@ log_hist:: return seastar_histogram_logform(); } +template< + typename duration_t, + int number_of_buckets, + uint64_t first_bucket_upper_bound> +seastar::metrics::histogram +log_hist:: + client_quota_histogram_logform() const { + using client_quota_config = logform_config<1'000l, 1ul, 15>; + + return seastar_histogram_logform(); +} + // Explicit instantiation for log_hist_public template class log_hist; // Explicit instantiation for log_hist_internal template class log_hist; // Explicit instantiation for log_hist_read_dist template class log_hist; +// Explicit instantiation for log_hist_client_quota +template class log_hist; diff --git a/src/v/utils/log_hist.h b/src/v/utils/log_hist.h index e8922a3e709f..c9e82c20783c 100644 --- a/src/v/utils/log_hist.h +++ b/src/v/utils/log_hist.h @@ -208,6 +208,15 @@ class log_hist { */ seastar::metrics::histogram read_dist_histogram_logform() const; + /* + * Generates a Prometheus histogram with 15 buckets. The first bucket has an + * upper bound of 1 - 1 and subsequent buckets have an upper bound of 2 + * times the upper bound of the previous bucket. + * + * This is the histogram type used for the Kafka client quota distribution. + */ + seastar::metrics::histogram client_quota_histogram_logform() const; + private: friend measurement; @@ -241,3 +250,11 @@ using log_hist_internal = log_hist; * the first bucket to greater than 91 days in the last bucket. */ using log_hist_read_dist = log_hist; + +/* + * This histogram has units of milliseconds instead of microseconds, and is + * used for measuring the Kafka client quota delays on the scale of less than + * 1 milliseconds in the first bucket to greater than 32 seconds in the last + * bucket. + */ +using log_hist_client_quota = log_hist; diff --git a/tests/rptest/tests/cluster_quota_test.py b/tests/rptest/tests/cluster_quota_test.py index cc6035361393..e1037db6809c 100644 --- a/tests/rptest/tests/cluster_quota_test.py +++ b/tests/rptest/tests/cluster_quota_test.py @@ -11,10 +11,11 @@ import time import random import string +from typing import NamedTuple, Optional from ducktape.utils.util import wait_until from rptest.clients.rpk import RpkTool -from rptest.services.redpanda import ResourceSettings, LoggingConfig +from rptest.services.redpanda import MetricSample, MetricSamples, MetricsEndpoint, ResourceSettings, LoggingConfig from kafka import KafkaProducer, KafkaConsumer, TopicPartition from rptest.clients.kcl import RawKCL, KclCreateTopicsRequestTopic, \ KclCreatePartitionsRequestTopic @@ -25,6 +26,20 @@ GB = 1_000_000_000 +class ExpectedMetric(NamedTuple): + labels: dict[str, str] + + +class ExpectedMetrics(NamedTuple): + metrics: list[ExpectedMetric] + + def has_matching(self, got_labels: dict[str, str]) -> bool: + """ + Returns true if there is a metric in the list of expected metrics with a matching label + """ + return any(expected.labels == got_labels for expected in self.metrics) + + class ClusterQuotaPartitionMutationTest(RedpandaTest): """ Ducktape tests for partition mutation quota @@ -215,6 +230,32 @@ def fetch(self, consumer, messages_amount, timeout_sec=300): if now > deadline: raise TimeoutError() + def make_producer(self, + client_id: Optional[str] = None, + *args, + **kwargs) -> KafkaProducer: + return KafkaProducer(acks="all", + bootstrap_servers=self.leader_node, + value_serializer=str.encode, + retries=2, + request_timeout_ms=60000, + client_id=client_id, + *args, + **kwargs) + + def make_consumer( + self, + client_id: Optional[str] = None, + max_partition_fetch_bytes: Optional[int] = None) -> KafkaConsumer: + mpfb = max_partition_fetch_bytes if max_partition_fetch_bytes else self.max_partition_fetch_bytes + return KafkaConsumer(self.topic, + bootstrap_servers=self.leader_node, + client_id=client_id, + consumer_timeout_ms=1000, + max_partition_fetch_bytes=mpfb, + auto_offset_reset='earliest', + enable_auto_commit=False) + @cluster(num_nodes=3) def test_client_group_produce_rate_throttle_mechanism(self): """ @@ -237,11 +278,7 @@ def test_client_group_produce_rate_throttle_mechanism(self): ] }) - producer = KafkaProducer(acks="all", - bootstrap_servers=self.leader_node, - value_serializer=str.encode, - retries=1, - client_id="producer_group_alone_producer") + producer = self.make_producer("producer_group_alone_producer") # Produce under the limit self.produce(producer, self.under_group_quota_message_amount) @@ -251,16 +288,8 @@ def test_client_group_produce_rate_throttle_mechanism(self): self.produce(producer, self.break_group_quota_message_amount) self.check_producer_throttled(producer) - producer_1 = KafkaProducer(acks="all", - bootstrap_servers=self.leader_node, - value_serializer=str.encode, - retries=1, - client_id="producer_group_multiple_1") - - producer_2 = KafkaProducer(acks="all", - bootstrap_servers=self.leader_node, - value_serializer=str.encode, - client_id="producer_group_multiple_2") + producer_1 = self.make_producer("producer_group_multiple_1") + producer_2 = self.make_producer("producer_group_multiple_2") # Produce under the limit self.produce(producer_1, self.under_group_quota_message_amount) @@ -282,10 +311,7 @@ def test_client_group_produce_rate_throttle_mechanism(self): } }) - producer = KafkaProducer(acks="all", - bootstrap_servers=self.leader_node, - value_serializer=str.encode, - client_id="new_producer_group_producer") + producer = self.make_producer("new_producer_group_producer") self.produce(producer, self.break_group_quota_message_amount) self.check_producer_throttled(producer) @@ -312,42 +338,17 @@ def test_client_group_consume_rate_throttle_mechanism(self): ] }) - producer = KafkaProducer(acks="all", - bootstrap_servers=self.leader_node, - value_serializer=str.encode, - retries=1) + producer = self.make_producer() self.produce(producer, self.break_group_quota_message_amount * 2) - consumer = KafkaConsumer( - self.topic, - bootstrap_servers=self.leader_node, - client_id="consumer_alone", - consumer_timeout_ms=1000, - max_partition_fetch_bytes=self.max_partition_fetch_bytes, - auto_offset_reset='earliest', - enable_auto_commit=False) + consumer = self.make_consumer("consumer_alone") # Fetch more the limit self.fetch(consumer, self.break_group_quota_message_amount) self.check_consumer_throttled(consumer) - consumer_1 = KafkaConsumer( - self.topic, - bootstrap_servers=self.leader_node, - client_id="consumer_multiple_1", - consumer_timeout_ms=1000, - max_partition_fetch_bytes=self.max_partition_fetch_bytes, - auto_offset_reset='earliest', - enable_auto_commit=False) - - consumer_2 = KafkaConsumer( - self.topic, - bootstrap_servers=self.leader_node, - client_id="consumer_multiple_2", - consumer_timeout_ms=1000, - max_partition_fetch_bytes=self.max_partition_fetch_bytes, - auto_offset_reset='earliest', - enable_auto_commit=False) + consumer_1 = self.make_consumer("consumer_multiple_1") + consumer_2 = self.make_consumer("consumer_multiple_2") # Consume under the limit self.fetch(consumer_2, 10) @@ -369,14 +370,7 @@ def test_client_group_consume_rate_throttle_mechanism(self): } }) - consumer = KafkaConsumer( - self.topic, - bootstrap_servers=self.leader_node, - client_id="new_consumer", - consumer_timeout_ms=1000, - max_partition_fetch_bytes=self.max_partition_fetch_bytes, - auto_offset_reset='earliest', - enable_auto_commit=False) + consumer = self.make_consumer("new_consumer") self.fetch(consumer, self.break_group_quota_message_amount) self.check_consumer_throttled(consumer) @@ -388,21 +382,8 @@ def test_client_response_throttle_mechanism(self): """ self.init_test_data() - producer = KafkaProducer(acks="all", - bootstrap_servers=self.leader_node, - value_serializer=str.encode, - retries=2, - request_timeout_ms=60000, - client_id="producer") - - consumer = KafkaConsumer( - self.topic, - bootstrap_servers=self.leader_node, - client_id="consumer", - consumer_timeout_ms=1000, - max_partition_fetch_bytes=self.max_partition_fetch_bytes, - auto_offset_reset='earliest', - enable_auto_commit=False) + producer = self.make_producer("producer") + consumer = self.make_consumer("consumer") self.produce(producer, self.break_default_quota_message_amount * 2) @@ -421,22 +402,13 @@ def test_client_response_throttle_mechanism_applies_to_next_request(self): """ self.init_test_data() - producer = KafkaProducer(acks="all", - bootstrap_servers=self.leader_node, - value_serializer=str.encode, - retries=1, - client_id="producer") - - consumer = KafkaConsumer( - self.topic, - bootstrap_servers=self.leader_node, - client_id="consumer", - consumer_timeout_ms=1000, - # Set the max fetch size such that the first fetch is above the quota limit AND completes in a single request + producer = self.make_producer("producer") + + # Set the max fetch size such that the first fetch is above the quota limit AND completes in a single request + consumer = self.make_consumer( + "consumer", max_partition_fetch_bytes=self.break_default_quota_message_amount * - self.message_size, - auto_offset_reset='earliest', - enable_auto_commit=False) + self.message_size) # Ensure we have plenty of data to consume self.produce(producer, self.break_default_quota_message_amount * 2) @@ -466,20 +438,8 @@ def test_client_response_and_produce_throttle_mechanism(self): } }) # Producer and Consumer same client_id - producer = KafkaProducer(acks="all", - bootstrap_servers=self.leader_node, - value_serializer=str.encode, - retries=1, - client_id="throttle_producer_only") - - consumer = KafkaConsumer( - self.topic, - bootstrap_servers=self.leader_node, - client_id="throttle_producer_only", - consumer_timeout_ms=1000, - max_partition_fetch_bytes=self.max_partition_fetch_bytes, - auto_offset_reset='earliest', - enable_auto_commit=False) + producer = self.make_producer("throttle_producer_only") + consumer = self.make_consumer("throttle_producer_only") # Produce more than limit self.produce(producer, self.break_group_quota_message_amount) @@ -502,20 +462,8 @@ def test_client_response_and_produce_throttle_mechanism(self): } }) # Producer and Consumer same client_id - producer = KafkaProducer(acks="all", - bootstrap_servers=self.leader_node, - value_serializer=str.encode, - retries=1, - client_id="throttle_consumer_only") - - consumer = KafkaConsumer( - self.topic, - bootstrap_servers=self.leader_node, - client_id="throttle_consumer_only", - consumer_timeout_ms=1000, - max_partition_fetch_bytes=self.max_partition_fetch_bytes, - auto_offset_reset='earliest', - enable_auto_commit=False) + producer = self.make_producer("throttle_consumer_only") + consumer = self.make_consumer("throttle_consumer_only") # Fetch more than limit self.fetch(consumer, self.break_group_quota_message_amount) @@ -540,27 +488,13 @@ def test_throttling_ms_enforcement_is_per_connection(self): {"max_kafka_throttle_delay_ms": "1000"}) # Create two producers sharing a client.id - def make_producer(): - return KafkaProducer( - acks="all", - bootstrap_servers=self.leader_node, - value_serializer=str.encode, - retries=1, - client_id="shared_client_id", - max_request_size=1 * GB, - max_in_flight_requests_per_connection=1, - ) - - producer1 = make_producer() - producer2 = make_producer() - consumer = KafkaConsumer( - self.topic, - bootstrap_servers=self.leader_node, - client_id="shared_client_id", - consumer_timeout_ms=1000, - max_partition_fetch_bytes=self.max_partition_fetch_bytes, - auto_offset_reset='earliest', - enable_auto_commit=False) + producer1 = self.make_producer("shared_client_id", + max_request_size=1 * GB, + max_in_flight_requests_per_connection=1) + producer2 = self.make_producer("shared_client_id", + max_request_size=1 * GB, + max_in_flight_requests_per_connection=1) + consumer = self.make_consumer("shared_client_id") # Produce above the produce quota limit self.produce(producer1, 1, self.large_msg) @@ -598,3 +532,104 @@ def make_producer(): timeout_sec=10, err_msg="Subsequent messages should be throttled broker-side", ) + + def get_metrics(self, metric: str) -> list[MetricSample]: + metrics = self.redpanda.metrics_sample( + metric, metrics_endpoint=MetricsEndpoint.METRICS) + + assert metrics, f"Metric is missing: {metric}" + self.logger.debug(f"Samples for {metric}: {metrics.samples}") + return metrics.samples + + @cluster(num_nodes=1) + def test_client_quota_metrics(self): + self.init_test_data() + self.redpanda.set_cluster_config({ + "kafka_client_group_byte_rate_quota": { + "group_name": "producer_with_group", + "clients_prefix": "producer_with_group", + "quota": self.target_group_quota_byte_rate, + }, + "kafka_client_group_fetch_byte_rate_quota": { + "group_name": "consumer_with_group", + "clients_prefix": "consumer_with_group", + "quota": self.target_group_quota_byte_rate, + }, + "target_quota_byte_rate": + self.target_default_quota_byte_rate, + "target_fetch_quota_byte_rate": + self.target_default_quota_byte_rate, + }) + + producer_with_group = self.make_producer("producer_with_group") + consumer_with_group = self.make_consumer("consumer_with_group") + unknown_producer = self.make_producer("unknown_producer") + unknown_consumer = self.make_consumer("unknown_consumer") + + # When the test topic is created, throughput for this metric is recorded + expected_pm_metrics = ExpectedMetrics([ + ExpectedMetric({ + 'redpanda_quota_rule': 'not_applicable', + 'redpanda_quota_type': 'partition_mutation_quota', + }), + ]) + + # When we produce/fetch to/from the cluster, the metrics with these label should update + expected_tput_metrics = ExpectedMetrics([ + ExpectedMetric({ + 'redpanda_quota_rule': 'cluster_client_prefix', + 'redpanda_quota_type': 'produce_quota', + }), + ExpectedMetric({ + 'redpanda_quota_rule': 'cluster_client_prefix', + 'redpanda_quota_type': 'fetch_quota', + }), + ExpectedMetric({ + 'redpanda_quota_rule': 'cluster_client_default', + 'redpanda_quota_type': 'produce_quota', + }), + ExpectedMetric({ + 'redpanda_quota_rule': 'cluster_client_default', + 'redpanda_quota_type': 'fetch_quota', + }), + ]) + + def check_sample(sample: MetricSample, assertion: bool): + assert assertion, f"Unexpected sample: {sample}." + + self.redpanda.logger.debug("Produce under the limit") + self.produce(producer_with_group, 1) + self.fetch(consumer_with_group, 1) + self.produce(unknown_producer, 1) + self.fetch(unknown_consumer, 1) + + self.redpanda.logger.debug( + "Assert that throttling time is 0 when under the limit") + for sample in self.get_metrics( + "vectorized_kafka_quotas_client_quota_throttle_time_sum"): + check_sample(sample, sample.value == 0) + + self.redpanda.logger.debug( + "Assert that throughput is recorded with the expected labels") + for sample in self.get_metrics("client_quota_throughput_sum"): + all_expected_metrics = ExpectedMetrics( + expected_pm_metrics.metrics + expected_tput_metrics.metrics) + if all_expected_metrics.has_matching(sample.labels): + check_sample(sample, sample.value > 0) + else: + check_sample(sample, sample.value == 0) + + self.redpanda.logger.debug("Produce over the limit") + self.produce(producer_with_group, + self.break_group_quota_message_amount) + self.fetch(consumer_with_group, self.break_group_quota_message_amount) + self.produce(unknown_producer, self.break_default_quota_message_amount) + self.fetch(unknown_consumer, self.break_default_quota_message_amount) + + self.redpanda.logger.debug( + "Assert that throttling time is positive when over the limit") + for sample in self.get_metrics("client_quota_throttle_time_sum"): + if expected_tput_metrics.has_matching(sample.labels): + check_sample(sample, sample.value > 0) + else: + check_sample(sample, sample.value == 0)