Skip to content

Commit

Permalink
dt/xform: Add tests for 'from-offset' option
Browse files Browse the repository at this point in the history
- Consume records that were produced before the deploy
- Specify offsets that run off the end of the input topic
- Ill-formed offsets

Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
  • Loading branch information
oleiman committed Jul 2, 2024
1 parent f725aaf commit e1c3d29
Showing 1 changed file with 75 additions and 2 deletions.
77 changes: 75 additions & 2 deletions tests/rptest/tests/data_transforms_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ def _deploy_wasm(self,
compression_type: TopicSpec.CompressionTypes
| None = None,
wait_running: bool = True,
retry_on_exc: bool = True):
retry_on_exc: bool = True,
from_offset: str | None = None):
"""
Deploy a wasm transform and wait for all processors to be running.
"""
Expand All @@ -74,7 +75,8 @@ def do_deploy():
input_topic.name,
[o.name for o in output_topic],
file=file,
compression_type=compression_type)
compression_type=compression_type,
from_offset=from_offset)
return True

wait_until(
Expand Down Expand Up @@ -474,6 +476,77 @@ def compression_set(compression_type: TopicSpec.CompressionTypes):
self.logger.info(f"{consumer_status}")
assert consumer_status.invalid_records == 0, f"transform verification failed with invalid records: {consumer_status}"

@cluster(num_nodes=4)
@matrix(offset=["+0", "-1", f"@{int(time.time() * 1000)}"])
def test_consume_from_offset(self, offset):
'''
Verify that offset-delta based and timestamp based consumption works as expected.
That is, records produced prior to deployment should still be accessible given an
appropriate offset config.
'''
input_topic = self.topics[0]
output_topic = self.topics[1]
producer_status = self._produce_input_topic(topic=self.topics[0])
self._deploy_wasm(name="identity-xform",
input_topic=input_topic,
output_topic=output_topic,
from_offset=offset,
wait_running=True)
consumer_status = self._consume_output_topic(topic=self.topics[1],
status=producer_status)

self.logger.info(f"{consumer_status}")
assert consumer_status.invalid_records == 0, f"transform verification failed with invalid records: {consumer_status}"

@cluster(num_nodes=4)
@matrix(offset=[
None, # No offest -> read from the end of the topic
"@33276193569000", # June 3024
"-0", # '0 from end' should commit 'latest'
])
def test_consume_off_end(self, offset):
'''
Verify that consuming off the end of the input topic works as expected.
That is, confirm that records produced _prior_ to deployment do not reach the transform.
'''
input_topic = self.topics[0]
output_topic = self.topics[1]
producer_status = self._produce_input_topic(topic=self.topics[0])
self._deploy_wasm(name="identity-xform",
input_topic=input_topic,
output_topic=output_topic,
from_offset=offset,
wait_running=True)

with expect_exception(TimeoutError, lambda _: True):
_ = self._consume_output_topic(topic=self.topics[1],
status=producer_status)

@cluster(num_nodes=3)
@matrix(offset=[
"@9223372036854775807", # int64_max (out of range for millis)
"+NaN", # lexical cast error (literal NaN)
"-9223372036854775808", # lexical cast error (int64 overflow)
f"@{time.time() * 1000}", # lexical cast error (float value)
"@-10", # illegal negative value
"--10", # illegal negative value
"+-10", # illegal negative value
])
def test_consume_junk_off(self, offset):
'''
Tests for junk data. Deployment should fail cleanly in the admin API or rpk.
'''
input_topic = self.topics[0]
output_topic = self.topics[1]
with expect_exception(RpkException,
lambda e: print(e) or "Bad offset" in str(e)):
self._deploy_wasm(name="identity-xform",
input_topic=input_topic,
output_topic=output_topic,
from_offset=offset,
wait_running=False,
retry_on_exc=False)


class DataTransformsChainingTest(BaseDataTransformsTest):
"""
Expand Down

0 comments on commit e1c3d29

Please sign in to comment.