-
Notifications
You must be signed in to change notification settings - Fork 592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Transforms: Fix deploy-time timequery bug #20127
Transforms: Fix deploy-time timequery bug #20127
Conversation
@@ -398,6 +398,24 @@ def env_is(env: dict[str, str]): | |||
err_msg="some partitions did not clear their envs", | |||
retry_on_exc=True) | |||
|
|||
@cluster(num_nodes=4) | |||
def test_consume_from_end(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did this test fail before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep
@@ -57,7 +57,7 @@ class source { | |||
* The offset of a record the log for a given timestamp - if the log is | |||
* empty then `kafka::offset::min()` is returned. | |||
*/ | |||
virtual ss::future<kafka::offset> | |||
virtual ss::future<std::optional<kafka::offset>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we update the docs (the comment about min is obsolete for example). What does std::nullopt mean? Can we confirm with the storage team?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update docs
ya, looks like I smashed the change while shuffling commits
what does std::nullopt mean
local case:
redpanda/src/v/storage/disk_log_impl.cc
Lines 2450 to 2470 in d692443
disk_log_impl::timequery(timequery_config cfg) { | |
vassert(!_closed, "timequery on closed log - {}", *this); | |
if (_segs.empty()) { | |
return ss::make_ready_future<std::optional<timequery_result>>(); | |
} | |
return make_reader(cfg).then([cfg](model::record_batch_reader reader) { | |
return model::consume_reader_to_memory( | |
std::move(reader), model::no_timeout) | |
.then([cfg](model::record_batch_reader::storage_t st) { | |
using ret_t = std::optional<timequery_result>; | |
auto& batches = std::get<model::record_batch_reader::data_t>(st); | |
if ( | |
!batches.empty() | |
&& batches.front().header().max_timestamp >= cfg.time) { | |
return ret_t(batch_timequery( | |
batches.front(), cfg.min_offset, cfg.time, cfg.max_offset)); | |
} | |
return ret_t(); | |
}); | |
}); | |
} |
I read that as one of:
a) log empty
b) no batches in timequery request offset range
c) max_timestamp in read batches < timequery request timestamp
which tracks I think? "empty query result".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(b) doesn't really apply here by design, and latest
seems like a sensible fallback for (a) or (c)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Default behavior is for transforms to start processing input from an offset equivalent to "the latest committed offset at deploy time". However, in situations where no records are committed between the moment the transform was deployed and the moment the transform starts, the timequery servicing this logic returns an empty result, which is handled by returning offset::min, which signals the transform to begin processing at the beginning of the topic. This commit adjusts the logic in source::offset_at_timestamp to return nullopt for an empty timequery result, and the handling of that nullopt to resolve the transform start offset to latest. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Specifically in situations where no records are produced between deploy time and start time. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
dfdac2f
to
0f49b21
Compare
force push - comments on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
/backport v24.1.x |
Failed to create a backport PR to v24.1.x branch. I tried:
|
Default behavior is for transforms to start processing input from an offset
equivalent to "the latest committed offset at deploy time". However, in
situations where no records are committed between the moment the transform
was deployed and the moment the transform starts, the timequery servicing
this logic returns nullopt and we erroneously start processing at the
beginning of the topic.
This commit corrects the handling of the timequery result to commit the
latest offset (computed at start time) in the situation described above.
Backports Required
Release Notes
Bug Fixes