From 5f01cf439c0e89a0d4aae2e1b361c810f85afcb4 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Sun, 23 Jun 2024 16:45:09 -0700 Subject: [PATCH 1/2] xform: Fix cold-start timequery bug Default behavior is for transforms to start processing input from an offset equivalent to "the latest committed offset at deploy time". However, in situations where no records are committed between the moment the transform was deployed and the moment the transform starts, the timequery servicing this logic returns an empty result, which is handled by returning offset::min, which signals the transform to begin processing at the beginning of the topic. This commit adjusts the logic in source::offset_at_timestamp to return nullopt for an empty timequery result, and the handling of that nullopt to resolve the transform start offset to latest. Signed-off-by: Oren Leiman (cherry picked from commit 2401fa30fb0f6949cba7ab33e161d0315b7c819c) --- src/v/transform/api.cc | 4 ++-- src/v/transform/io.h | 5 +++-- src/v/transform/tests/test_fixture.cc | 4 ++-- src/v/transform/tests/test_fixture.h | 2 +- src/v/transform/transform_processor.cc | 13 ++++++++++--- 5 files changed, 18 insertions(+), 10 deletions(-) diff --git a/src/v/transform/api.cc b/src/v/transform/api.cc index d098511133056..494234f3d6c20 100644 --- a/src/v/transform/api.cc +++ b/src/v/transform/api.cc @@ -146,7 +146,7 @@ class partition_source final : public source { return model::offset_cast(model::prev_offset(result.value())); } - ss::future + ss::future> offset_at_timestamp(model::timestamp ts, ss::abort_source* as) final { auto result = co_await _partition.timequery(storage::timequery_config( _partition.start_offset(), @@ -157,7 +157,7 @@ class partition_source final : public source { /*as=*/*as, /*client_addr=*/std::nullopt)); if (!result.has_value()) { - co_return kafka::offset::min(); + co_return std::nullopt; } co_return model::offset_cast(result->offset); } diff --git a/src/v/transform/io.h b/src/v/transform/io.h index d1e20da3d8bf4..757cd81b46294 100644 --- a/src/v/transform/io.h +++ b/src/v/transform/io.h @@ -55,9 +55,10 @@ class source { /** * The offset of a record the log for a given timestamp - if the log is - * empty then `kafka::offset::min()` is returned. + * empty or the timestamp is greater than max_timestamp, then std::nullopt + * is returned. */ - virtual ss::future + virtual ss::future> offset_at_timestamp(model::timestamp, ss::abort_source*) = 0; /** diff --git a/src/v/transform/tests/test_fixture.cc b/src/v/transform/tests/test_fixture.cc index a7867b22c3855..59c27f02c5f6c 100644 --- a/src/v/transform/tests/test_fixture.cc +++ b/src/v/transform/tests/test_fixture.cc @@ -68,7 +68,7 @@ kafka::offset fake_source::latest_offset() { return _batches.rbegin()->first; } -ss::future +ss::future> fake_source::offset_at_timestamp(model::timestamp ts, ss::abort_source*) { // Walk through the batches from most recent and look for the first batch // where the timestamp bounds is inclusive of `ts`. @@ -85,7 +85,7 @@ fake_source::offset_at_timestamp(model::timestamp ts, ss::abort_source*) { co_return model::offset_cast(batch.second.header().last_offset()); } } - co_return kafka::offset{}; + co_return std::nullopt; } ss::future diff --git a/src/v/transform/tests/test_fixture.h b/src/v/transform/tests/test_fixture.h index 5f8ed596b9278..eed3f027efcff 100644 --- a/src/v/transform/tests/test_fixture.h +++ b/src/v/transform/tests/test_fixture.h @@ -77,7 +77,7 @@ class fake_source : public source { ss::future<> start() override; ss::future<> stop() override; kafka::offset latest_offset() override; - ss::future + ss::future> offset_at_timestamp(model::timestamp, ss::abort_source*) override; ss::future read_batch(kafka::offset offset, ss::abort_source* as) override; diff --git a/src/v/transform/transform_processor.cc b/src/v/transform/transform_processor.cc index 96c2d04abac98..2199223112007 100644 --- a/src/v/transform/transform_processor.cc +++ b/src/v/transform/transform_processor.cc @@ -219,13 +219,20 @@ processor::load_latest_committed() { vlog(_logger.debug, "starting at latest: {}", latest); return ssx::now(latest); }, - [this](model::timestamp ts) { + [this, &latest](model::timestamp ts) { vlog(_logger.debug, "starting at timestamp: {}", ts); // We want to *start at* this timestamp, so record that // we're going to commit progress at the offset before, so - // we start inclusive of this offset. + // we start inclusive of this offset. If nothing has been + // committed since the start timestamp, commit progress at + // latest (i.e. start from the end) return _source->offset_at_timestamp(ts, &_as).then( - kafka::prev_offset); + [&latest](std::optional o) + -> ss::future { + return ssx::now( + o.has_value() ? kafka::prev_offset(o.value()) + : latest); + }); }); vlog( _logger.debug, "resolved start offset: {}", *initial_offset); From bda09b8ecb429c66440f542ceb2636798f026bce Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Mon, 24 Jun 2024 16:35:20 -0700 Subject: [PATCH 2/2] dt/xform: Test that transforms consume from end of topic Specifically in situations where no records are produced between deploy time and start time. Signed-off-by: Oren Leiman (cherry picked from commit 0f49b21313a5e1b8d3df64ca1674c6d570faf8fa) --- tests/rptest/tests/data_transforms_test.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tests/rptest/tests/data_transforms_test.py b/tests/rptest/tests/data_transforms_test.py index 0b4eee308987e..430cf42191a5c 100644 --- a/tests/rptest/tests/data_transforms_test.py +++ b/tests/rptest/tests/data_transforms_test.py @@ -23,6 +23,7 @@ from rptest.services.cluster import cluster from rptest.services.redpanda import MetricSamples, MetricsEndpoint from ducktape.utils.util import wait_until +from ducktape.errors import TimeoutError from rptest.services.transform_verifier_service import TransformVerifierProduceConfig, TransformVerifierProduceStatus, TransformVerifierService, TransformVerifierConsumeConfig, TransformVerifierConsumeStatus from rptest.services.admin import Admin, CommittedWasmOffset @@ -317,6 +318,24 @@ def all_offsets_removed(): retry_on_exc=True, ) + @cluster(num_nodes=4) + def test_consume_from_end(self): + """ + Test that by default transforms read from the end of the topic if no records + are produced between deploy time and transform start time. + """ + input_topic = self.topics[0] + output_topic = self.topics[1] + producer_status = self._produce_input_topic(topic=self.topics[0]) + self._deploy_wasm(name="identity-xform", + input_topic=input_topic, + output_topic=output_topic, + wait_running=True) + + with expect_exception(TimeoutError, lambda _: True): + consumer_status = self._consume_output_topic( + topic=self.topics[1], status=producer_status) + class DataTransformsChainingTest(BaseDataTransformsTest): """