diff --git a/tests/rptest/transactions/tx_atomic_produce_consume_test.py b/tests/rptest/transactions/tx_atomic_produce_consume_test.py index ced1dcbf997e8..32a68172d6058 100644 --- a/tests/rptest/transactions/tx_atomic_produce_consume_test.py +++ b/tests/rptest/transactions/tx_atomic_produce_consume_test.py @@ -43,7 +43,7 @@ def __init__(self, tx_id: str = "transformer-tx", group_id: str = "transformer-group", commit_every: int = 50, - timeout_sec: int = 60): + progress_timeout_sec: int = 60): self._src_topic = src_topic self._partition_count = partition_count self._dst_topic = dst_topic @@ -58,7 +58,7 @@ def __init__(self, self._produced_to_src = 0 self._lock = threading.Lock() self._tasks = [] - self._timeout_sec = timeout_sec + self._progress_timeout_sec = progress_timeout_sec self._error = None self._finished_partitions = set() self._consumer_cnt = 0 @@ -136,7 +136,7 @@ def get_high_watermarks(self, topic): return {p.id: p.high_watermark for p in rpk.describe_topic(topic)} def tx_transform(self, tx_id): - start = time.time() + last_progress = time.time() self._logger.info(f"Starting tx-transform with tx_id: {tx_id}") producer = ck.Producer({ 'bootstrap.servers': self._redpanda.brokers(), @@ -162,17 +162,34 @@ def tx_transform(self, tx_id): try: with self._lock: self._consumer_cnt += 1 + last_positions = {} - def reached_end(): + def made_progress(current_positions: dict): + self._logger.info( + f"[{tx_id}] last positions: {last_positions}, current positions: {current_positions}" + ) + for p_id, pos in current_positions.items(): + if p_id in last_positions and pos > last_positions[p_id]: + return True + elif p_id not in last_positions: + return True + return False + + def get_consumer_positions() -> list: + assignments = {tp.partition for tp in consumer.assignment()} + if len(assignments) == 0: + return [] + return consumer.position([ + ck.TopicPartition(self._src_topic, p) for p in assignments + ]) + + def reached_end(positions): high_watermarks = self.get_high_watermarks(self._src_topic) assignments = {tp.partition for tp in consumer.assignment()} if len(assignments) == 0: return False - positions = consumer.position([ - ck.TopicPartition(self._src_topic, p) for p in assignments - ]) end_for = { p.partition for p in positions if p.partition in high_watermarks @@ -237,12 +254,19 @@ def on_revoke(consumer, partitions): consumer.consumer_group_metadata()) producer.commit_transaction() in_transaction = False + positions = get_consumer_positions() - if reached_end(): + if reached_end(positions): self._logger.info(f"{tx_id} reached end") break + positions_dict = {p.partition: p.offset for p in positions} + + if made_progress(positions_dict): + last_progress = time.time() + + last_positions = positions_dict - if time.time() - start > self._timeout_sec: + if time.time() - last_progress > self._progress_timeout_sec: self._logger.error(f"timeout waiting for {tx_id} producer") self._error = TimeoutError( "timeout waiting for {tx_id} producer") @@ -366,7 +390,7 @@ def simple_transform(k, v): simple_transform, self.logger, 2000, - timeout_sec=180) + progress_timeout_sec=180) transformer.start_workload(2) transformer.wait_for_finish()