Skip to content

Commit

Permalink
Merge pull request #20834 from mmaslankaprv/fix-20315-2
Browse files Browse the repository at this point in the history
tests: check progress in exactly once delivery test
  • Loading branch information
mmaslankaprv authored Jul 4, 2024
2 parents 5e277ca + babb227 commit bb612ad
Showing 1 changed file with 34 additions and 10 deletions.
44 changes: 34 additions & 10 deletions tests/rptest/transactions/tx_atomic_produce_consume_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def __init__(self,
tx_id: str = "transformer-tx",
group_id: str = "transformer-group",
commit_every: int = 50,
timeout_sec: int = 60):
progress_timeout_sec: int = 60):
self._src_topic = src_topic
self._partition_count = partition_count
self._dst_topic = dst_topic
Expand All @@ -58,7 +58,7 @@ def __init__(self,
self._produced_to_src = 0
self._lock = threading.Lock()
self._tasks = []
self._timeout_sec = timeout_sec
self._progress_timeout_sec = progress_timeout_sec
self._error = None
self._finished_partitions = set()
self._consumer_cnt = 0
Expand Down Expand Up @@ -136,7 +136,7 @@ def get_high_watermarks(self, topic):
return {p.id: p.high_watermark for p in rpk.describe_topic(topic)}

def tx_transform(self, tx_id):
start = time.time()
last_progress = time.time()
self._logger.info(f"Starting tx-transform with tx_id: {tx_id}")
producer = ck.Producer({
'bootstrap.servers': self._redpanda.brokers(),
Expand All @@ -162,17 +162,34 @@ def tx_transform(self, tx_id):
try:
with self._lock:
self._consumer_cnt += 1
last_positions = {}

def reached_end():
def made_progress(current_positions: dict):
self._logger.info(
f"[{tx_id}] last positions: {last_positions}, current positions: {current_positions}"
)
for p_id, pos in current_positions.items():
if p_id in last_positions and pos > last_positions[p_id]:
return True
elif p_id not in last_positions:
return True
return False

def get_consumer_positions() -> list:
assignments = {tp.partition for tp in consumer.assignment()}
if len(assignments) == 0:
return []
return consumer.position([
ck.TopicPartition(self._src_topic, p) for p in assignments
])

def reached_end(positions):
high_watermarks = self.get_high_watermarks(self._src_topic)

assignments = {tp.partition for tp in consumer.assignment()}
if len(assignments) == 0:
return False

positions = consumer.position([
ck.TopicPartition(self._src_topic, p) for p in assignments
])
end_for = {
p.partition
for p in positions if p.partition in high_watermarks
Expand Down Expand Up @@ -237,12 +254,19 @@ def on_revoke(consumer, partitions):
consumer.consumer_group_metadata())
producer.commit_transaction()
in_transaction = False
positions = get_consumer_positions()

if reached_end():
if reached_end(positions):
self._logger.info(f"{tx_id} reached end")
break
positions_dict = {p.partition: p.offset for p in positions}

if made_progress(positions_dict):
last_progress = time.time()

last_positions = positions_dict

if time.time() - start > self._timeout_sec:
if time.time() - last_progress > self._progress_timeout_sec:
self._logger.error(f"timeout waiting for {tx_id} producer")
self._error = TimeoutError(
"timeout waiting for {tx_id} producer")
Expand Down Expand Up @@ -366,7 +390,7 @@ def simple_transform(k, v):
simple_transform,
self.logger,
2000,
timeout_sec=180)
progress_timeout_sec=180)

transformer.start_workload(2)
transformer.wait_for_finish()
Expand Down

0 comments on commit bb612ad

Please sign in to comment.