Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: check progress in exactly once delivery test #20834

Merged
merged 1 commit into from
Jul 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 34 additions & 10 deletions tests/rptest/transactions/tx_atomic_produce_consume_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def __init__(self,
tx_id: str = "transformer-tx",
group_id: str = "transformer-group",
commit_every: int = 50,
timeout_sec: int = 60):
progress_timeout_sec: int = 60):
self._src_topic = src_topic
self._partition_count = partition_count
self._dst_topic = dst_topic
Expand All @@ -58,7 +58,7 @@ def __init__(self,
self._produced_to_src = 0
self._lock = threading.Lock()
self._tasks = []
self._timeout_sec = timeout_sec
self._progress_timeout_sec = progress_timeout_sec
self._error = None
self._finished_partitions = set()
self._consumer_cnt = 0
Expand Down Expand Up @@ -136,7 +136,7 @@ def get_high_watermarks(self, topic):
return {p.id: p.high_watermark for p in rpk.describe_topic(topic)}

def tx_transform(self, tx_id):
start = time.time()
last_progress = time.time()
self._logger.info(f"Starting tx-transform with tx_id: {tx_id}")
producer = ck.Producer({
'bootstrap.servers': self._redpanda.brokers(),
Expand All @@ -162,17 +162,34 @@ def tx_transform(self, tx_id):
try:
with self._lock:
self._consumer_cnt += 1
last_positions = {}

def reached_end():
def made_progress(current_positions: dict):
self._logger.info(
f"[{tx_id}] last positions: {last_positions}, current positions: {current_positions}"
)
for p_id, pos in current_positions.items():
if p_id in last_positions and pos > last_positions[p_id]:
return True
elif p_id not in last_positions:
return True
return False

def get_consumer_positions() -> list:
assignments = {tp.partition for tp in consumer.assignment()}
if len(assignments) == 0:
return []
return consumer.position([
ck.TopicPartition(self._src_topic, p) for p in assignments
])

def reached_end(positions):
high_watermarks = self.get_high_watermarks(self._src_topic)

assignments = {tp.partition for tp in consumer.assignment()}
if len(assignments) == 0:
return False

positions = consumer.position([
ck.TopicPartition(self._src_topic, p) for p in assignments
])
end_for = {
p.partition
for p in positions if p.partition in high_watermarks
Expand Down Expand Up @@ -237,12 +254,19 @@ def on_revoke(consumer, partitions):
consumer.consumer_group_metadata())
producer.commit_transaction()
in_transaction = False
positions = get_consumer_positions()

if reached_end():
if reached_end(positions):
self._logger.info(f"{tx_id} reached end")
break
positions_dict = {p.partition: p.offset for p in positions}

if made_progress(positions_dict):
last_progress = time.time()

last_positions = positions_dict

if time.time() - start > self._timeout_sec:
if time.time() - last_progress > self._progress_timeout_sec:
self._logger.error(f"timeout waiting for {tx_id} producer")
self._error = TimeoutError(
"timeout waiting for {tx_id} producer")
Expand Down Expand Up @@ -366,7 +390,7 @@ def simple_transform(k, v):
simple_transform,
self.logger,
2000,
timeout_sec=180)
progress_timeout_sec=180)

transformer.start_workload(2)
transformer.wait_for_finish()
Expand Down
Loading