Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.1.x] Transforms: Fix deploy-time timequery bug #20139

Merged
merged 2 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/v/transform/api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class partition_source final : public source {
return model::offset_cast(model::prev_offset(result.value()));
}

ss::future<kafka::offset>
ss::future<std::optional<kafka::offset>>
offset_at_timestamp(model::timestamp ts, ss::abort_source* as) final {
auto result = co_await _partition.timequery(storage::timequery_config(
_partition.start_offset(),
Expand All @@ -157,7 +157,7 @@ class partition_source final : public source {
/*as=*/*as,
/*client_addr=*/std::nullopt));
if (!result.has_value()) {
co_return kafka::offset::min();
co_return std::nullopt;
}
co_return model::offset_cast(result->offset);
}
Expand Down
5 changes: 3 additions & 2 deletions src/v/transform/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ class source {

/**
* The offset of a record the log for a given timestamp - if the log is
* empty then `kafka::offset::min()` is returned.
* empty or the timestamp is greater than max_timestamp, then std::nullopt
* is returned.
*/
virtual ss::future<kafka::offset>
virtual ss::future<std::optional<kafka::offset>>
offset_at_timestamp(model::timestamp, ss::abort_source*) = 0;

/**
Expand Down
4 changes: 2 additions & 2 deletions src/v/transform/tests/test_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ kafka::offset fake_source::latest_offset() {
return _batches.rbegin()->first;
}

ss::future<kafka::offset>
ss::future<std::optional<kafka::offset>>
fake_source::offset_at_timestamp(model::timestamp ts, ss::abort_source*) {
// Walk through the batches from most recent and look for the first batch
// where the timestamp bounds is inclusive of `ts`.
Expand All @@ -85,7 +85,7 @@ fake_source::offset_at_timestamp(model::timestamp ts, ss::abort_source*) {
co_return model::offset_cast(batch.second.header().last_offset());
}
}
co_return kafka::offset{};
co_return std::nullopt;
}

ss::future<model::record_batch_reader>
Expand Down
2 changes: 1 addition & 1 deletion src/v/transform/tests/test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class fake_source : public source {
ss::future<> start() override;
ss::future<> stop() override;
kafka::offset latest_offset() override;
ss::future<kafka::offset>
ss::future<std::optional<kafka::offset>>
offset_at_timestamp(model::timestamp, ss::abort_source*) override;
ss::future<model::record_batch_reader>
read_batch(kafka::offset offset, ss::abort_source* as) override;
Expand Down
13 changes: 10 additions & 3 deletions src/v/transform/transform_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,20 @@ processor::load_latest_committed() {
vlog(_logger.debug, "starting at latest: {}", latest);
return ssx::now(latest);
},
[this](model::timestamp ts) {
[this, &latest](model::timestamp ts) {
vlog(_logger.debug, "starting at timestamp: {}", ts);
// We want to *start at* this timestamp, so record that
// we're going to commit progress at the offset before, so
// we start inclusive of this offset.
// we start inclusive of this offset. If nothing has been
// committed since the start timestamp, commit progress at
// latest (i.e. start from the end)
return _source->offset_at_timestamp(ts, &_as).then(
kafka::prev_offset);
[&latest](std::optional<kafka::offset> o)
-> ss::future<kafka::offset> {
return ssx::now(
o.has_value() ? kafka::prev_offset(o.value())
: latest);
});
});
vlog(
_logger.debug, "resolved start offset: {}", *initial_offset);
Expand Down
19 changes: 19 additions & 0 deletions tests/rptest/tests/data_transforms_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from rptest.services.cluster import cluster
from rptest.services.redpanda import MetricSamples, MetricsEndpoint
from ducktape.utils.util import wait_until
from ducktape.errors import TimeoutError
from rptest.services.transform_verifier_service import TransformVerifierProduceConfig, TransformVerifierProduceStatus, TransformVerifierService, TransformVerifierConsumeConfig, TransformVerifierConsumeStatus
from rptest.services.admin import Admin, CommittedWasmOffset

Expand Down Expand Up @@ -317,6 +318,24 @@ def all_offsets_removed():
retry_on_exc=True,
)

@cluster(num_nodes=4)
def test_consume_from_end(self):
"""
Test that by default transforms read from the end of the topic if no records
are produced between deploy time and transform start time.
"""
input_topic = self.topics[0]
output_topic = self.topics[1]
producer_status = self._produce_input_topic(topic=self.topics[0])
self._deploy_wasm(name="identity-xform",
input_topic=input_topic,
output_topic=output_topic,
wait_running=True)

with expect_exception(TimeoutError, lambda _: True):
consumer_status = self._consume_output_topic(
topic=self.topics[1], status=producer_status)


class DataTransformsChainingTest(BaseDataTransformsTest):
"""
Expand Down