Skip to content

Commit

Permalink
xform: Fix cold-start timequery bug
Browse files Browse the repository at this point in the history
Default behavior is for transforms to start processing input from an offset
equivalent to "the latest committed offset at deploy time". However, in
situations where no records are committed between the moment the transform
was deployed and the moment the transform starts, the timequery servicing
this logic returns an empty result, which is handled by returning offset::min,
which signals the transform to begin processing at the beginning of the topic.

This commit adjusts the logic in source::offset_at_timestamp to return nullopt
for an empty timequery result, and the handling of that nullopt to resolve
the transform start offset to latest.

Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
  • Loading branch information
oleiman committed Jun 25, 2024
1 parent 92a4a69 commit 2401fa3
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 10 deletions.
4 changes: 2 additions & 2 deletions src/v/transform/api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class partition_source final : public source {
return model::offset_cast(model::prev_offset(result.value()));
}

ss::future<kafka::offset>
ss::future<std::optional<kafka::offset>>
offset_at_timestamp(model::timestamp ts, ss::abort_source* as) final {
auto result = co_await _partition.timequery(storage::timequery_config(
_partition.start_offset(),
Expand All @@ -158,7 +158,7 @@ class partition_source final : public source {
/*as=*/*as,
/*client_addr=*/std::nullopt));
if (!result.has_value()) {
co_return kafka::offset::min();
co_return std::nullopt;
}
co_return model::offset_cast(result->offset);
}
Expand Down
5 changes: 3 additions & 2 deletions src/v/transform/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ class source {

/**
* The offset of a record the log for a given timestamp - if the log is
* empty then `kafka::offset::min()` is returned.
* empty or the timestamp is greater than max_timestamp, then std::nullopt
* is returned.
*/
virtual ss::future<kafka::offset>
virtual ss::future<std::optional<kafka::offset>>
offset_at_timestamp(model::timestamp, ss::abort_source*) = 0;

/**
Expand Down
4 changes: 2 additions & 2 deletions src/v/transform/tests/test_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ kafka::offset fake_source::latest_offset() {
return _batches.rbegin()->first;
}

ss::future<kafka::offset>
ss::future<std::optional<kafka::offset>>
fake_source::offset_at_timestamp(model::timestamp ts, ss::abort_source*) {
// Walk through the batches from most recent and look for the first batch
// where the timestamp bounds is inclusive of `ts`.
Expand All @@ -85,7 +85,7 @@ fake_source::offset_at_timestamp(model::timestamp ts, ss::abort_source*) {
co_return model::offset_cast(batch.second.header().last_offset());
}
}
co_return kafka::offset{};
co_return std::nullopt;
}

ss::future<model::record_batch_reader>
Expand Down
2 changes: 1 addition & 1 deletion src/v/transform/tests/test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class fake_source : public source {
ss::future<> start() override;
ss::future<> stop() override;
kafka::offset latest_offset() override;
ss::future<kafka::offset>
ss::future<std::optional<kafka::offset>>
offset_at_timestamp(model::timestamp, ss::abort_source*) override;
ss::future<model::record_batch_reader>
read_batch(kafka::offset offset, ss::abort_source* as) override;
Expand Down
13 changes: 10 additions & 3 deletions src/v/transform/transform_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,20 @@ processor::load_latest_committed() {
vlog(_logger.debug, "starting at latest: {}", latest);
return ssx::now(latest);
},
[this](model::timestamp ts) {
[this, &latest](model::timestamp ts) {
vlog(_logger.debug, "starting at timestamp: {}", ts);
// We want to *start at* this timestamp, so record that
// we're going to commit progress at the offset before, so
// we start inclusive of this offset.
// we start inclusive of this offset. If nothing has been
// committed since the start timestamp, commit progress at
// latest (i.e. start from the end)
return _source->offset_at_timestamp(ts, &_as).then(
kafka::prev_offset);
[&latest](std::optional<kafka::offset> o)
-> ss::future<kafka::offset> {
return ssx::now(
o.has_value() ? kafka::prev_offset(o.value())
: latest);
});
});
vlog(
_logger.debug, "resolved start offset: {}", *initial_offset);
Expand Down

0 comments on commit 2401fa3

Please sign in to comment.