From babb2271636d20e654069bb510c5bc00930e9865 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Thu, 4 Jul 2024 09:54:22 +0000 Subject: [PATCH] tests: check progress in exactly once delivery test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instead of asserting when test took longer than given timout we now check if consumers made progress and fail the test only if no progress been made for configurable period of time. Fixes: #20315 Signed-off-by: Michał Maślanka --- .../tx_atomic_produce_consume_test.py | 44 ++++++++++++++----- 1 file changed, 34 insertions(+), 10 deletions(-) diff --git a/tests/rptest/transactions/tx_atomic_produce_consume_test.py b/tests/rptest/transactions/tx_atomic_produce_consume_test.py index ced1dcbf997e8..32a68172d6058 100644 --- a/tests/rptest/transactions/tx_atomic_produce_consume_test.py +++ b/tests/rptest/transactions/tx_atomic_produce_consume_test.py @@ -43,7 +43,7 @@ def __init__(self, tx_id: str = "transformer-tx", group_id: str = "transformer-group", commit_every: int = 50, - timeout_sec: int = 60): + progress_timeout_sec: int = 60): self._src_topic = src_topic self._partition_count = partition_count self._dst_topic = dst_topic @@ -58,7 +58,7 @@ def __init__(self, self._produced_to_src = 0 self._lock = threading.Lock() self._tasks = [] - self._timeout_sec = timeout_sec + self._progress_timeout_sec = progress_timeout_sec self._error = None self._finished_partitions = set() self._consumer_cnt = 0 @@ -136,7 +136,7 @@ def get_high_watermarks(self, topic): return {p.id: p.high_watermark for p in rpk.describe_topic(topic)} def tx_transform(self, tx_id): - start = time.time() + last_progress = time.time() self._logger.info(f"Starting tx-transform with tx_id: {tx_id}") producer = ck.Producer({ 'bootstrap.servers': self._redpanda.brokers(), @@ -162,17 +162,34 @@ def tx_transform(self, tx_id): try: with self._lock: self._consumer_cnt += 1 + last_positions = {} - def reached_end(): + def made_progress(current_positions: dict): + self._logger.info( + f"[{tx_id}] last positions: {last_positions}, current positions: {current_positions}" + ) + for p_id, pos in current_positions.items(): + if p_id in last_positions and pos > last_positions[p_id]: + return True + elif p_id not in last_positions: + return True + return False + + def get_consumer_positions() -> list: + assignments = {tp.partition for tp in consumer.assignment()} + if len(assignments) == 0: + return [] + return consumer.position([ + ck.TopicPartition(self._src_topic, p) for p in assignments + ]) + + def reached_end(positions): high_watermarks = self.get_high_watermarks(self._src_topic) assignments = {tp.partition for tp in consumer.assignment()} if len(assignments) == 0: return False - positions = consumer.position([ - ck.TopicPartition(self._src_topic, p) for p in assignments - ]) end_for = { p.partition for p in positions if p.partition in high_watermarks @@ -237,12 +254,19 @@ def on_revoke(consumer, partitions): consumer.consumer_group_metadata()) producer.commit_transaction() in_transaction = False + positions = get_consumer_positions() - if reached_end(): + if reached_end(positions): self._logger.info(f"{tx_id} reached end") break + positions_dict = {p.partition: p.offset for p in positions} + + if made_progress(positions_dict): + last_progress = time.time() + + last_positions = positions_dict - if time.time() - start > self._timeout_sec: + if time.time() - last_progress > self._progress_timeout_sec: self._logger.error(f"timeout waiting for {tx_id} producer") self._error = TimeoutError( "timeout waiting for {tx_id} producer") @@ -366,7 +390,7 @@ def simple_transform(k, v): simple_transform, self.logger, 2000, - timeout_sec=180) + progress_timeout_sec=180) transformer.start_workload(2) transformer.wait_for_finish()