diff --git a/tests/rptest/tests/read_distribution_test.py b/tests/rptest/tests/read_distribution_test.py new file mode 100644 index 0000000000000..d627d412d04f1 --- /dev/null +++ b/tests/rptest/tests/read_distribution_test.py @@ -0,0 +1,101 @@ +# Copyright 2024 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +import bisect +import time +from rptest.services.kgo_verifier_services import KgoVerifierProducer +from rptest.services.cluster import cluster +from rptest.clients.types import TopicSpec +from rptest.clients.kafka_cat import KafkaCat +from rptest.services.redpanda import MetricsEndpoint +from rptest.tests.redpanda_test import RedpandaTest + + +class ReadDistributionTest(RedpandaTest): + def __init__(self, test_context): + super().__init__(test_context=test_context, num_brokers=1) + + self.topic_name = 'tapioca' + self.topics = [TopicSpec(name=self.topic_name, replication_factor=1)] + + self.record_size = 1024 + self.record_count = 10 + self.records_per_batch = 2 + self.timespan_ms = 24 * 60 * 60 * 1000 #One day in ms. + self.timestamp_step_ms = self.timespan_ms // (self.record_count - 1) + self.curr_ts = round(time.time() * 1000) + self.base_ts = self.curr_ts - self.timespan_ms + + @cluster(num_nodes=2) + def test_read_distribution_metric(self): + ''' + Validate that Kafka reads are being correctly set in the read distribution histogram. + ''' + KgoVerifierProducer.oneshot( + self.test_context, + self.redpanda, + self.topic_name, + self.record_size, + self.record_count, + batch_max_bytes=self.record_size * self.records_per_batch - 1, + fake_timestamp_ms=self.base_ts, + fake_timestamp_step_ms=self.timestamp_step_ms) + kcat = KafkaCat(self.redpanda) + + # The list of timestamps to be produced/consumed from. + timestamps_ms = [ + self.base_ts + i * self.timestamp_step_ms + for i in range(0, self.record_count) + ] + + for timestamp in timestamps_ms: + kcat.consume_one(self.topic_name, + 0, + offset=None, + first_timestamp=timestamp) + + metrics = self.redpanda.metrics(self.redpanda.nodes[0], + MetricsEndpoint.PUBLIC_METRICS) + read_dist_metric = [ + m for m in metrics if m.name == 'redpanda_kafka_read_distribution' + ][0] + cumulative_topic_read_dist = [ + (s.value, s.labels['le']) for s in read_dist_metric.samples if + s.labels['redpanda_topic'] == self.topic_name and 'le' in s.labels + ] + + num_expected_buckets = 16 + # There should be 16 + 1 buckets in the read distribution histogram (including inf bucket) + assert len(cumulative_topic_read_dist) == num_expected_buckets + 1 + + prev = 0 + topic_read_dist = {} + for v, le in cumulative_topic_read_dist: + try: + topic_read_dist[int(float(le))] = int(v - prev) + prev = v + except: + continue + + bucket_list = list(topic_read_dist.keys()) + # There should be 16 buckets in the bucket_list (after filtering out inf bucket) + assert len(bucket_list) == num_expected_buckets + + expected_read_dist = {bucket: 0 for bucket in topic_read_dist.keys()} + for timestamp_ms in timestamps_ms: + delta_minutes = round((self.curr_ts - timestamp_ms) / (1000 * 60)) + + #Find the bucket into which the timestamp delta in minutes + #is going to fall, using lower bound. + bucket = bucket_list[bisect.bisect_left(bucket_list, + delta_minutes)] + expected_read_dist[bucket] += 1 + + for bucket, count in expected_read_dist.items(): + assert topic_read_dist[bucket] == count