diff --git a/src/v/transform/api.cc b/src/v/transform/api.cc index 3f3ef5308b78e..d91432a1db6ff 100644 --- a/src/v/transform/api.cc +++ b/src/v/transform/api.cc @@ -147,7 +147,7 @@ class partition_source final : public source { return model::offset_cast(model::prev_offset(result.value())); } - ss::future + ss::future> offset_at_timestamp(model::timestamp ts, ss::abort_source* as) final { auto result = co_await _partition.timequery(storage::timequery_config( _partition.start_offset(), @@ -158,7 +158,7 @@ class partition_source final : public source { /*as=*/*as, /*client_addr=*/std::nullopt)); if (!result.has_value()) { - co_return kafka::offset::min(); + co_return std::nullopt; } co_return model::offset_cast(result->offset); } diff --git a/src/v/transform/io.h b/src/v/transform/io.h index d1e20da3d8bf4..757cd81b46294 100644 --- a/src/v/transform/io.h +++ b/src/v/transform/io.h @@ -55,9 +55,10 @@ class source { /** * The offset of a record the log for a given timestamp - if the log is - * empty then `kafka::offset::min()` is returned. + * empty or the timestamp is greater than max_timestamp, then std::nullopt + * is returned. */ - virtual ss::future + virtual ss::future> offset_at_timestamp(model::timestamp, ss::abort_source*) = 0; /** diff --git a/src/v/transform/tests/test_fixture.cc b/src/v/transform/tests/test_fixture.cc index a7867b22c3855..59c27f02c5f6c 100644 --- a/src/v/transform/tests/test_fixture.cc +++ b/src/v/transform/tests/test_fixture.cc @@ -68,7 +68,7 @@ kafka::offset fake_source::latest_offset() { return _batches.rbegin()->first; } -ss::future +ss::future> fake_source::offset_at_timestamp(model::timestamp ts, ss::abort_source*) { // Walk through the batches from most recent and look for the first batch // where the timestamp bounds is inclusive of `ts`. @@ -85,7 +85,7 @@ fake_source::offset_at_timestamp(model::timestamp ts, ss::abort_source*) { co_return model::offset_cast(batch.second.header().last_offset()); } } - co_return kafka::offset{}; + co_return std::nullopt; } ss::future diff --git a/src/v/transform/tests/test_fixture.h b/src/v/transform/tests/test_fixture.h index 5f8ed596b9278..eed3f027efcff 100644 --- a/src/v/transform/tests/test_fixture.h +++ b/src/v/transform/tests/test_fixture.h @@ -77,7 +77,7 @@ class fake_source : public source { ss::future<> start() override; ss::future<> stop() override; kafka::offset latest_offset() override; - ss::future + ss::future> offset_at_timestamp(model::timestamp, ss::abort_source*) override; ss::future read_batch(kafka::offset offset, ss::abort_source* as) override; diff --git a/src/v/transform/transform_processor.cc b/src/v/transform/transform_processor.cc index a4e2e5e86e988..e138a0ed0c261 100644 --- a/src/v/transform/transform_processor.cc +++ b/src/v/transform/transform_processor.cc @@ -216,13 +216,20 @@ processor::load_latest_committed() { vlog(_logger.debug, "starting at latest: {}", latest); return ssx::now(latest); }, - [this](model::timestamp ts) { + [this, &latest](model::timestamp ts) { vlog(_logger.debug, "starting at timestamp: {}", ts); // We want to *start at* this timestamp, so record that // we're going to commit progress at the offset before, so - // we start inclusive of this offset. + // we start inclusive of this offset. If nothing has been + // committed since the start timestamp, commit progress at + // latest (i.e. start from the end) return _source->offset_at_timestamp(ts, &_as).then( - kafka::prev_offset); + [&latest](std::optional o) + -> ss::future { + return ssx::now( + o.has_value() ? kafka::prev_offset(o.value()) + : latest); + }); }); vlog( _logger.debug, "resolved start offset: {}", *initial_offset); diff --git a/tests/rptest/tests/data_transforms_test.py b/tests/rptest/tests/data_transforms_test.py index 838eafca8ae37..054a7e36bd4ca 100644 --- a/tests/rptest/tests/data_transforms_test.py +++ b/tests/rptest/tests/data_transforms_test.py @@ -398,6 +398,24 @@ def env_is(env: dict[str, str]): err_msg="some partitions did not clear their envs", retry_on_exc=True) + @cluster(num_nodes=4) + def test_consume_from_end(self): + """ + Test that by default transforms read from the end of the topic if no records + are produced between deploy time and transform start time. + """ + input_topic = self.topics[0] + output_topic = self.topics[1] + producer_status = self._produce_input_topic(topic=self.topics[0]) + self._deploy_wasm(name="identity-xform", + input_topic=input_topic, + output_topic=output_topic, + wait_running=True) + + with expect_exception(TimeoutError, lambda _: True): + consumer_status = self._consume_output_topic( + topic=self.topics[1], status=producer_status) + class DataTransformsChainingTest(BaseDataTransformsTest): """