Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transforms: Fix deploy-time timequery bug #20127

Merged
merged 2 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/v/transform/api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class partition_source final : public source {
return model::offset_cast(model::prev_offset(result.value()));
}

ss::future<kafka::offset>
ss::future<std::optional<kafka::offset>>
offset_at_timestamp(model::timestamp ts, ss::abort_source* as) final {
auto result = co_await _partition.timequery(storage::timequery_config(
_partition.start_offset(),
Expand All @@ -158,7 +158,7 @@ class partition_source final : public source {
/*as=*/*as,
/*client_addr=*/std::nullopt));
if (!result.has_value()) {
co_return kafka::offset::min();
co_return std::nullopt;
}
co_return model::offset_cast(result->offset);
}
Expand Down
5 changes: 3 additions & 2 deletions src/v/transform/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ class source {

/**
* The offset of a record the log for a given timestamp - if the log is
* empty then `kafka::offset::min()` is returned.
* empty or the timestamp is greater than max_timestamp, then std::nullopt
* is returned.
*/
virtual ss::future<kafka::offset>
virtual ss::future<std::optional<kafka::offset>>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we update the docs (the comment about min is obsolete for example). What does std::nullopt mean? Can we confirm with the storage team?

Copy link
Member Author

@oleiman oleiman Jun 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update docs

ya, looks like I smashed the change while shuffling commits

what does std::nullopt mean

local case:

disk_log_impl::timequery(timequery_config cfg) {
vassert(!_closed, "timequery on closed log - {}", *this);
if (_segs.empty()) {
return ss::make_ready_future<std::optional<timequery_result>>();
}
return make_reader(cfg).then([cfg](model::record_batch_reader reader) {
return model::consume_reader_to_memory(
std::move(reader), model::no_timeout)
.then([cfg](model::record_batch_reader::storage_t st) {
using ret_t = std::optional<timequery_result>;
auto& batches = std::get<model::record_batch_reader::data_t>(st);
if (
!batches.empty()
&& batches.front().header().max_timestamp >= cfg.time) {
return ret_t(batch_timequery(
batches.front(), cfg.min_offset, cfg.time, cfg.max_offset));
}
return ret_t();
});
});
}

I read that as one of:

a) log empty
b) no batches in timequery request offset range
c) max_timestamp in read batches < timequery request timestamp

which tracks I think? "empty query result".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(b) doesn't really apply here by design, and latest seems like a sensible fallback for (a) or (c)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

offset_at_timestamp(model::timestamp, ss::abort_source*) = 0;

/**
Expand Down
4 changes: 2 additions & 2 deletions src/v/transform/tests/test_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ kafka::offset fake_source::latest_offset() {
return _batches.rbegin()->first;
}

ss::future<kafka::offset>
ss::future<std::optional<kafka::offset>>
fake_source::offset_at_timestamp(model::timestamp ts, ss::abort_source*) {
// Walk through the batches from most recent and look for the first batch
// where the timestamp bounds is inclusive of `ts`.
Expand All @@ -85,7 +85,7 @@ fake_source::offset_at_timestamp(model::timestamp ts, ss::abort_source*) {
co_return model::offset_cast(batch.second.header().last_offset());
}
}
co_return kafka::offset{};
co_return std::nullopt;
}

ss::future<model::record_batch_reader>
Expand Down
2 changes: 1 addition & 1 deletion src/v/transform/tests/test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class fake_source : public source {
ss::future<> start() override;
ss::future<> stop() override;
kafka::offset latest_offset() override;
ss::future<kafka::offset>
ss::future<std::optional<kafka::offset>>
offset_at_timestamp(model::timestamp, ss::abort_source*) override;
ss::future<model::record_batch_reader>
read_batch(kafka::offset offset, ss::abort_source* as) override;
Expand Down
13 changes: 10 additions & 3 deletions src/v/transform/transform_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,20 @@ processor::load_latest_committed() {
vlog(_logger.debug, "starting at latest: {}", latest);
return ssx::now(latest);
},
[this](model::timestamp ts) {
[this, &latest](model::timestamp ts) {
vlog(_logger.debug, "starting at timestamp: {}", ts);
// We want to *start at* this timestamp, so record that
// we're going to commit progress at the offset before, so
// we start inclusive of this offset.
// we start inclusive of this offset. If nothing has been
// committed since the start timestamp, commit progress at
// latest (i.e. start from the end)
return _source->offset_at_timestamp(ts, &_as).then(
kafka::prev_offset);
[&latest](std::optional<kafka::offset> o)
-> ss::future<kafka::offset> {
return ssx::now(
o.has_value() ? kafka::prev_offset(o.value())
: latest);
});
});
vlog(
_logger.debug, "resolved start offset: {}", *initial_offset);
Expand Down
18 changes: 18 additions & 0 deletions tests/rptest/tests/data_transforms_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,24 @@ def env_is(env: dict[str, str]):
err_msg="some partitions did not clear their envs",
retry_on_exc=True)

@cluster(num_nodes=4)
def test_consume_from_end(self):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this test fail before?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

"""
Test that by default transforms read from the end of the topic if no records
are produced between deploy time and transform start time.
"""
input_topic = self.topics[0]
output_topic = self.topics[1]
producer_status = self._produce_input_topic(topic=self.topics[0])
self._deploy_wasm(name="identity-xform",
input_topic=input_topic,
output_topic=output_topic,
wait_running=True)

with expect_exception(TimeoutError, lambda _: True):
consumer_status = self._consume_output_topic(
topic=self.topics[1], status=producer_status)


class DataTransformsChainingTest(BaseDataTransformsTest):
"""
Expand Down