From edf37e96d2c32f0b0531d690e9cdac5deca93de1 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Mon, 3 Jul 2023 15:21:02 +0200 Subject: [PATCH] tests: added test validating consuming from empty log Added test that validates if a consumer is able to continue consuming a log that has been completely removed by delete retention. Signed-off-by: Michal Maslanka (cherry picked from commit 89a8d9702ec662134fda60399464753b36a1acd4) --- tests/rptest/tests/log_segment_ms_test.py | 92 +++++++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/tests/rptest/tests/log_segment_ms_test.py b/tests/rptest/tests/log_segment_ms_test.py index abbbff0c114d2..4f53ec4b88c22 100644 --- a/tests/rptest/tests/log_segment_ms_test.py +++ b/tests/rptest/tests/log_segment_ms_test.py @@ -6,6 +6,8 @@ from rptest.clients.rpk import RpkTool from rptest.clients.types import TopicSpec from rptest.services.cluster import cluster +from rptest.services.kafka_cli_consumer import KafkaCliConsumer + from rptest.services.verifiable_consumer import VerifiableConsumer from rptest.services.verifiable_producer import VerifiableProducer from rptest.tests.redpanda_test import RedpandaTest @@ -268,3 +270,93 @@ def test_segment_rolling_with_retention(self): timeout_sec=120, err_msg=f"Failed to consume messages") consumer.stop() + + @cluster(num_nodes=4) + def test_segment_rolling_with_retention_consumer(self): + self.redpanda.set_cluster_config({ + "log_segment_ms": None, + "log_segment_ms_min": 10000 + }) + topic = TopicSpec(segment_bytes=(1024 * 1024), + replication_factor=1, + partition_count=1) + + self.client().create_topic(topic) + + producer = VerifiableProducer(context=self.test_context, + num_nodes=1, + redpanda=self.redpanda, + topic=topic.name, + throughput=10000) + + producer.start() + wait_until( + lambda: self._total_segments_count(topic) >= 5, + timeout_sec=120, + err_msg= + "producer failed to produce enough messages to create 5 segments") + # stop producer + producer.stop() + producer.clean() + producer.free() + del producer + start_count = self._total_segments_count(topic) + self.client().alter_topic_config(topic.name, "segment.ms", "15000") + + # wait for the segment.ms policy to roll the segment + wait_until(lambda: self._total_segments_count(topic) > start_count, + timeout_sec=60, + err_msg=f"failed waiting for the segment to roll") + group_id = "test-group" + + consumer = VerifiableConsumer(context=self.test_context, + num_nodes=1, + redpanda=self.redpanda, + topic=topic.name, + group_id=group_id) + consumer.start() + + self.client().alter_topic_config(topic.name, + "retention.local.target.ms", "10000") + self.client().alter_topic_config(topic.name, "retention.ms", "10000") + + # wait for retention policy to trigger + wait_until(lambda: self._total_segments_count(topic) <= 1, + timeout_sec=60, + err_msg=f"failed waiting for retention policy") + consumer.stop() + + producer = VerifiableProducer(context=self.test_context, + num_nodes=1, + redpanda=self.redpanda, + topic=topic.name, + throughput=10000) + producer.start() + wait_until( + lambda: self._total_segments_count(topic) >= 2, + timeout_sec=120, + err_msg= + f"producer failed to produce enough messages to create 5 segments") + producer.stop() + + rpk = RpkTool(self.redpanda) + + def no_lag_present(): + group = rpk.group_describe(group_id) + self.logger.info(f"group: {group}") + if len(group.partitions) != topic.partition_count: + return False + + return all([p.lag == 0 for p in group.partitions]) + + consumer_2 = KafkaCliConsumer(context=self.test_context, + redpanda=self.redpanda, + topic=topic.name, + group=group_id) + consumer_2.start() + # # Wait for any messages to be consumed, + # # (if there is an issue in the offsets handling it will result in consumer being stuck) + wait_until(no_lag_present, + timeout_sec=120, + err_msg=f"Failed to consume all messages from {topic.name}") + consumer_2.stop()