Skip to content

Commit

Permalink
Merge pull request #20127 from oleiman/xform/noticket/timequery-bug
Browse files Browse the repository at this point in the history
Transforms: Fix deploy-time timequery bug
  • Loading branch information
oleiman authored Jun 25, 2024
2 parents d3cd417 + 0f49b21 commit b2f732f
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 10 deletions.
4 changes: 2 additions & 2 deletions src/v/transform/api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class partition_source final : public source {
return model::offset_cast(model::prev_offset(result.value()));
}

ss::future<kafka::offset>
ss::future<std::optional<kafka::offset>>
offset_at_timestamp(model::timestamp ts, ss::abort_source* as) final {
auto result = co_await _partition.timequery(storage::timequery_config(
_partition.start_offset(),
Expand All @@ -158,7 +158,7 @@ class partition_source final : public source {
/*as=*/*as,
/*client_addr=*/std::nullopt));
if (!result.has_value()) {
co_return kafka::offset::min();
co_return std::nullopt;
}
co_return model::offset_cast(result->offset);
}
Expand Down
5 changes: 3 additions & 2 deletions src/v/transform/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ class source {

/**
* The offset of a record the log for a given timestamp - if the log is
* empty then `kafka::offset::min()` is returned.
* empty or the timestamp is greater than max_timestamp, then std::nullopt
* is returned.
*/
virtual ss::future<kafka::offset>
virtual ss::future<std::optional<kafka::offset>>
offset_at_timestamp(model::timestamp, ss::abort_source*) = 0;

/**
Expand Down
4 changes: 2 additions & 2 deletions src/v/transform/tests/test_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ kafka::offset fake_source::latest_offset() {
return _batches.rbegin()->first;
}

ss::future<kafka::offset>
ss::future<std::optional<kafka::offset>>
fake_source::offset_at_timestamp(model::timestamp ts, ss::abort_source*) {
// Walk through the batches from most recent and look for the first batch
// where the timestamp bounds is inclusive of `ts`.
Expand All @@ -85,7 +85,7 @@ fake_source::offset_at_timestamp(model::timestamp ts, ss::abort_source*) {
co_return model::offset_cast(batch.second.header().last_offset());
}
}
co_return kafka::offset{};
co_return std::nullopt;
}

ss::future<model::record_batch_reader>
Expand Down
2 changes: 1 addition & 1 deletion src/v/transform/tests/test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class fake_source : public source {
ss::future<> start() override;
ss::future<> stop() override;
kafka::offset latest_offset() override;
ss::future<kafka::offset>
ss::future<std::optional<kafka::offset>>
offset_at_timestamp(model::timestamp, ss::abort_source*) override;
ss::future<model::record_batch_reader>
read_batch(kafka::offset offset, ss::abort_source* as) override;
Expand Down
13 changes: 10 additions & 3 deletions src/v/transform/transform_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,20 @@ processor::load_latest_committed() {
vlog(_logger.debug, "starting at latest: {}", latest);
return ssx::now(latest);
},
[this](model::timestamp ts) {
[this, &latest](model::timestamp ts) {
vlog(_logger.debug, "starting at timestamp: {}", ts);
// We want to *start at* this timestamp, so record that
// we're going to commit progress at the offset before, so
// we start inclusive of this offset.
// we start inclusive of this offset. If nothing has been
// committed since the start timestamp, commit progress at
// latest (i.e. start from the end)
return _source->offset_at_timestamp(ts, &_as).then(
kafka::prev_offset);
[&latest](std::optional<kafka::offset> o)
-> ss::future<kafka::offset> {
return ssx::now(
o.has_value() ? kafka::prev_offset(o.value())
: latest);
});
});
vlog(
_logger.debug, "resolved start offset: {}", *initial_offset);
Expand Down
18 changes: 18 additions & 0 deletions tests/rptest/tests/data_transforms_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,24 @@ def env_is(env: dict[str, str]):
err_msg="some partitions did not clear their envs",
retry_on_exc=True)

@cluster(num_nodes=4)
def test_consume_from_end(self):
"""
Test that by default transforms read from the end of the topic if no records
are produced between deploy time and transform start time.
"""
input_topic = self.topics[0]
output_topic = self.topics[1]
producer_status = self._produce_input_topic(topic=self.topics[0])
self._deploy_wasm(name="identity-xform",
input_topic=input_topic,
output_topic=output_topic,
wait_running=True)

with expect_exception(TimeoutError, lambda _: True):
consumer_status = self._consume_output_topic(
topic=self.topics[1], status=producer_status)


class DataTransformsChainingTest(BaseDataTransformsTest):
"""
Expand Down

0 comments on commit b2f732f

Please sign in to comment.