Skip to content

Commit

Permalink
rptest: add test_read_distribution_metric
Browse files Browse the repository at this point in the history
  • Loading branch information
WillemKauf committed Jun 5, 2024
1 parent 8884426 commit b91584f
Showing 1 changed file with 101 additions and 0 deletions.
101 changes: 101 additions & 0 deletions tests/rptest/tests/read_distribution_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Copyright 2024 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

import bisect
import time
from rptest.services.kgo_verifier_services import KgoVerifierProducer
from rptest.services.cluster import cluster
from rptest.clients.types import TopicSpec
from rptest.clients.kafka_cat import KafkaCat
from rptest.services.redpanda import MetricsEndpoint
from rptest.tests.redpanda_test import RedpandaTest


class ReadDistributionTest(RedpandaTest):
def __init__(self, test_context):
super().__init__(test_context=test_context, num_brokers=1)

self.topic_name = 'tapioca'
self.topics = [TopicSpec(name=self.topic_name, replication_factor=1)]

self.record_size = 1024
self.record_count = 10
self.records_per_batch = 2
self.timespan_ms = 24 * 60 * 60 * 1000 #One day in ms.
self.timestamp_step_ms = self.timespan_ms // (self.record_count - 1)
self.curr_ts = round(time.time() * 1000)
self.base_ts = self.curr_ts - self.timespan_ms

@cluster(num_nodes=2)
def test_read_distribution_metric(self):
'''
Validate that Kafka reads are being correctly set in the read distribution histogram.
'''
KgoVerifierProducer.oneshot(
self.test_context,
self.redpanda,
self.topic_name,
self.record_size,
self.record_count,
batch_max_bytes=self.record_size * self.records_per_batch - 1,
fake_timestamp_ms=self.base_ts,
fake_timestamp_step_ms=self.timestamp_step_ms)
kcat = KafkaCat(self.redpanda)

# The list of timestamps to be produced/consumed from.
timestamps_ms = [
self.base_ts + i * self.timestamp_step_ms
for i in range(0, self.record_count)
]

for timestamp in timestamps_ms:
kcat.consume_one(self.topic_name,
0,
offset=None,
first_timestamp=timestamp)

metrics = self.redpanda.metrics(self.redpanda.nodes[0],
MetricsEndpoint.PUBLIC_METRICS)
read_dist_metric = [
m for m in metrics if m.name == 'redpanda_kafka_read_distribution'
][0]
cumulative_topic_read_dist = [
(s.value, s.labels['le']) for s in read_dist_metric.samples if
s.labels['redpanda_topic'] == self.topic_name and 'le' in s.labels
]

num_expected_buckets = 16
# There should be 16 + 1 buckets in the read distribution histogram (including inf bucket)
assert len(cumulative_topic_read_dist) == num_expected_buckets + 1

prev = 0
topic_read_dist = {}
for v, le in cumulative_topic_read_dist:
try:
topic_read_dist[int(float(le))] = int(v - prev)
prev = v
except:
continue

bucket_list = list(topic_read_dist.keys())
# There should be 16 buckets in the bucket_list (after filtering out inf bucket)
assert len(bucket_list) == num_expected_buckets

expected_read_dist = {bucket: 0 for bucket in topic_read_dist.keys()}
for timestamp_ms in timestamps_ms:
delta_minutes = round((self.curr_ts - timestamp_ms) / (1000 * 60))

#Find the bucket into which the timestamp delta in minutes
#is going to fall, using lower bound.
bucket = bucket_list[bisect.bisect_left(bucket_list,
delta_minutes)]
expected_read_dist[bucket] += 1

for bucket, count in expected_read_dist.items():
assert topic_read_dist[bucket] == count

0 comments on commit b91584f

Please sign in to comment.