Skip to content

Commit

Permalink
dt/quotas: test client quota metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
pgellert committed Jun 19, 2024
1 parent fb8269e commit c53cb02
Showing 1 changed file with 117 additions and 1 deletion.
118 changes: 117 additions & 1 deletion tests/rptest/tests/cluster_quota_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
import time
import random
import string
from typing import NamedTuple, Optional

from ducktape.utils.util import wait_until
from rptest.clients.rpk import RpkTool
from rptest.services.redpanda import ResourceSettings, LoggingConfig
from rptest.services.redpanda import MetricSample, MetricSamples, MetricsEndpoint, ResourceSettings, LoggingConfig
from kafka import KafkaProducer, KafkaConsumer, TopicPartition
from rptest.clients.kcl import RawKCL, KclCreateTopicsRequestTopic, \
KclCreatePartitionsRequestTopic
Expand All @@ -25,6 +26,20 @@
GB = 1_000_000_000


class ExpectedMetric(NamedTuple):
labels: dict[str, str]


class ExpectedMetrics(NamedTuple):
metrics: list[ExpectedMetric]

def has_matching(self, got_labels: dict[str, str]) -> bool:
"""
Returns true if there is a metric in the list of expected metrics with a matching label
"""
return any(expected.labels == got_labels for expected in self.metrics)


class ClusterQuotaPartitionMutationTest(RedpandaTest):
"""
Ducktape tests for partition mutation quota
Expand Down Expand Up @@ -517,3 +532,104 @@ def test_throttling_ms_enforcement_is_per_connection(self):
timeout_sec=10,
err_msg="Subsequent messages should be throttled broker-side",
)

def get_metrics(self, metric: str) -> list[MetricSample]:
metrics = self.redpanda.metrics_sample(
metric, metrics_endpoint=MetricsEndpoint.METRICS)

assert metrics, f"Metric is missing: {metric}"
self.logger.debug(f"Samples for {metric}: {metrics.samples}")
return metrics.samples

@cluster(num_nodes=1)
def test_client_quota_metrics(self):
self.init_test_data()
self.redpanda.set_cluster_config({
"kafka_client_group_byte_rate_quota": {
"group_name": "producer_with_group",
"clients_prefix": "producer_with_group",
"quota": self.target_group_quota_byte_rate,
},
"kafka_client_group_fetch_byte_rate_quota": {
"group_name": "consumer_with_group",
"clients_prefix": "consumer_with_group",
"quota": self.target_group_quota_byte_rate,
},
"target_quota_byte_rate":
self.target_default_quota_byte_rate,
"target_fetch_quota_byte_rate":
self.target_default_quota_byte_rate,
})

producer_with_group = self.make_producer("producer_with_group")
consumer_with_group = self.make_consumer("consumer_with_group")
unknown_producer = self.make_producer("unknown_producer")
unknown_consumer = self.make_consumer("unknown_consumer")

# When the test topic is created, throughput for this metric is recorded
expected_pm_metrics = ExpectedMetrics([
ExpectedMetric({
'redpanda_quota_rule': 'not_applicable',
'redpanda_quota_type': 'partition_mutation_quota',
}),
])

# When we produce/fetch to/from the cluster, the metrics with these label should update
expected_tput_metrics = ExpectedMetrics([
ExpectedMetric({
'redpanda_quota_rule': 'cluster_client_prefix',
'redpanda_quota_type': 'produce_quota',
}),
ExpectedMetric({
'redpanda_quota_rule': 'cluster_client_prefix',
'redpanda_quota_type': 'fetch_quota',
}),
ExpectedMetric({
'redpanda_quota_rule': 'cluster_client_default',
'redpanda_quota_type': 'produce_quota',
}),
ExpectedMetric({
'redpanda_quota_rule': 'cluster_client_default',
'redpanda_quota_type': 'fetch_quota',
}),
])

def check_sample(sample: MetricSample, assertion: bool):
assert assertion, f"Unexpected sample: {sample}."

self.redpanda.logger.debug("Produce under the limit")
self.produce(producer_with_group, 1)
self.fetch(consumer_with_group, 1)
self.produce(unknown_producer, 1)
self.fetch(unknown_consumer, 1)

self.redpanda.logger.debug(
"Assert that throttling time is 0 when under the limit")
for sample in self.get_metrics(
"vectorized_kafka_quotas_client_quota_throttle_time_sum"):
check_sample(sample, sample.value == 0)

self.redpanda.logger.debug(
"Assert that throughput is recorded with the expected labels")
for sample in self.get_metrics("client_quota_throughput_sum"):
all_expected_metrics = ExpectedMetrics(
expected_pm_metrics.metrics + expected_tput_metrics.metrics)
if all_expected_metrics.has_matching(sample.labels):
check_sample(sample, sample.value > 0)
else:
check_sample(sample, sample.value == 0)

self.redpanda.logger.debug("Produce over the limit")
self.produce(producer_with_group,
self.break_group_quota_message_amount)
self.fetch(consumer_with_group, self.break_group_quota_message_amount)
self.produce(unknown_producer, self.break_default_quota_message_amount)
self.fetch(unknown_consumer, self.break_default_quota_message_amount)

self.redpanda.logger.debug(
"Assert that throttling time is positive when over the limit")
for sample in self.get_metrics("client_quota_throttle_time_sum"):
if expected_tput_metrics.has_matching(sample.labels):
check_sample(sample, sample.value > 0)
else:
check_sample(sample, sample.value == 0)

0 comments on commit c53cb02

Please sign in to comment.