diff --git a/tests/rptest/tests/data_transforms_test.py b/tests/rptest/tests/data_transforms_test.py index 7b01e7f847ed5..638929e39699d 100644 --- a/tests/rptest/tests/data_transforms_test.py +++ b/tests/rptest/tests/data_transforms_test.py @@ -62,7 +62,8 @@ def _deploy_wasm(self, compression_type: TopicSpec.CompressionTypes | None = None, wait_running: bool = True, - retry_on_exc: bool = True): + retry_on_exc: bool = True, + from_offset: str | None = None): """ Deploy a wasm transform and wait for all processors to be running. """ @@ -74,7 +75,8 @@ def do_deploy(): input_topic.name, [o.name for o in output_topic], file=file, - compression_type=compression_type) + compression_type=compression_type, + from_offset=from_offset) return True wait_until( @@ -474,6 +476,77 @@ def compression_set(compression_type: TopicSpec.CompressionTypes): self.logger.info(f"{consumer_status}") assert consumer_status.invalid_records == 0, f"transform verification failed with invalid records: {consumer_status}" + @cluster(num_nodes=4) + @matrix(offset=["+0", "-1", f"@{int(time.time() * 1000)}"]) + def test_consume_from_offset(self, offset): + ''' + Verify that offset-delta based and timestamp based consumption works as expected. + That is, records produced prior to deployment should still be accessible given an + appropriate offset config. + ''' + input_topic = self.topics[0] + output_topic = self.topics[1] + producer_status = self._produce_input_topic(topic=self.topics[0]) + self._deploy_wasm(name="identity-xform", + input_topic=input_topic, + output_topic=output_topic, + from_offset=offset, + wait_running=True) + consumer_status = self._consume_output_topic(topic=self.topics[1], + status=producer_status) + + self.logger.info(f"{consumer_status}") + assert consumer_status.invalid_records == 0, f"transform verification failed with invalid records: {consumer_status}" + + @cluster(num_nodes=4) + @matrix(offset=[ + None, # No offest -> read from the end of the topic + "@33276193569000", # June 3024 + "-0", # '0 from end' should commit 'latest' + ]) + def test_consume_off_end(self, offset): + ''' + Verify that consuming off the end of the input topic works as expected. + That is, confirm that records produced _prior_ to deployment do not reach the transform. + ''' + input_topic = self.topics[0] + output_topic = self.topics[1] + producer_status = self._produce_input_topic(topic=self.topics[0]) + self._deploy_wasm(name="identity-xform", + input_topic=input_topic, + output_topic=output_topic, + from_offset=offset, + wait_running=True) + + with expect_exception(TimeoutError, lambda _: True): + _ = self._consume_output_topic(topic=self.topics[1], + status=producer_status) + + @cluster(num_nodes=3) + @matrix(offset=[ + "@9223372036854775807", # int64_max (out of range for millis) + "+NaN", # lexical cast error (literal NaN) + "-9223372036854775808", # lexical cast error (int64 overflow) + f"@{time.time() * 1000}", # lexical cast error (float value) + "@-10", # illegal negative value + "--10", # illegal negative value + "+-10", # illegal negative value + ]) + def test_consume_junk_off(self, offset): + ''' + Tests for junk data. Deployment should fail cleanly in the admin API or rpk. + ''' + input_topic = self.topics[0] + output_topic = self.topics[1] + with expect_exception(RpkException, + lambda e: print(e) or "Bad offset" in str(e)): + self._deploy_wasm(name="identity-xform", + input_topic=input_topic, + output_topic=output_topic, + from_offset=offset, + wait_running=False, + retry_on_exc=False) + class DataTransformsChainingTest(BaseDataTransformsTest): """