Skip to content

Commit

Permalink
tests: added test validating consuming from empty log
Browse files Browse the repository at this point in the history
Added test that validates if a consumer is able to continue consuming a
log that has been completely removed by delete retention.

Signed-off-by: Michal Maslanka <michal@redpanda.com>
(cherry picked from commit 89a8d97)
  • Loading branch information
mmaslankaprv authored and Michal Maslanka committed Jul 20, 2023
1 parent f6e3532 commit edf37e9
Showing 1 changed file with 92 additions and 0 deletions.
92 changes: 92 additions & 0 deletions tests/rptest/tests/log_segment_ms_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from rptest.clients.rpk import RpkTool
from rptest.clients.types import TopicSpec
from rptest.services.cluster import cluster
from rptest.services.kafka_cli_consumer import KafkaCliConsumer

from rptest.services.verifiable_consumer import VerifiableConsumer
from rptest.services.verifiable_producer import VerifiableProducer
from rptest.tests.redpanda_test import RedpandaTest
Expand Down Expand Up @@ -268,3 +270,93 @@ def test_segment_rolling_with_retention(self):
timeout_sec=120,
err_msg=f"Failed to consume messages")
consumer.stop()

@cluster(num_nodes=4)
def test_segment_rolling_with_retention_consumer(self):
self.redpanda.set_cluster_config({
"log_segment_ms": None,
"log_segment_ms_min": 10000
})
topic = TopicSpec(segment_bytes=(1024 * 1024),
replication_factor=1,
partition_count=1)

self.client().create_topic(topic)

producer = VerifiableProducer(context=self.test_context,
num_nodes=1,
redpanda=self.redpanda,
topic=topic.name,
throughput=10000)

producer.start()
wait_until(
lambda: self._total_segments_count(topic) >= 5,
timeout_sec=120,
err_msg=
"producer failed to produce enough messages to create 5 segments")
# stop producer
producer.stop()
producer.clean()
producer.free()
del producer
start_count = self._total_segments_count(topic)
self.client().alter_topic_config(topic.name, "segment.ms", "15000")

# wait for the segment.ms policy to roll the segment
wait_until(lambda: self._total_segments_count(topic) > start_count,
timeout_sec=60,
err_msg=f"failed waiting for the segment to roll")
group_id = "test-group"

consumer = VerifiableConsumer(context=self.test_context,
num_nodes=1,
redpanda=self.redpanda,
topic=topic.name,
group_id=group_id)
consumer.start()

self.client().alter_topic_config(topic.name,
"retention.local.target.ms", "10000")
self.client().alter_topic_config(topic.name, "retention.ms", "10000")

# wait for retention policy to trigger
wait_until(lambda: self._total_segments_count(topic) <= 1,
timeout_sec=60,
err_msg=f"failed waiting for retention policy")
consumer.stop()

producer = VerifiableProducer(context=self.test_context,
num_nodes=1,
redpanda=self.redpanda,
topic=topic.name,
throughput=10000)
producer.start()
wait_until(
lambda: self._total_segments_count(topic) >= 2,
timeout_sec=120,
err_msg=
f"producer failed to produce enough messages to create 5 segments")
producer.stop()

rpk = RpkTool(self.redpanda)

def no_lag_present():
group = rpk.group_describe(group_id)
self.logger.info(f"group: {group}")
if len(group.partitions) != topic.partition_count:
return False

return all([p.lag == 0 for p in group.partitions])

consumer_2 = KafkaCliConsumer(context=self.test_context,
redpanda=self.redpanda,
topic=topic.name,
group=group_id)
consumer_2.start()
# # Wait for any messages to be consumed,
# # (if there is an issue in the offsets handling it will result in consumer being stuck)
wait_until(no_lag_present,
timeout_sec=120,
err_msg=f"Failed to consume all messages from {topic.name}")
consumer_2.stop()

0 comments on commit edf37e9

Please sign in to comment.