Skip to content

Commit

Permalink
k/replicated_partition: fixed fetch offset validation for follower fe…
Browse files Browse the repository at this point in the history
…tching

When consumer is requesting a fetch from the broker being a follower for
given replica the fetch offset validation logic changes comparing to the
validation on a leader. The out of range error should only be returned
if an offset is not available locally and it is not known to be
committed. That was not the case with the current code. The current code
returned out of range error every time the offset was not committed on
leader. This lead to many not necessary errors returned and resulted in
consumer offsets being reset.

Signed-off-by: Michal Maslanka <michal@redpanda.com>
(cherry picked from commit fb48e0a)
  • Loading branch information
mmaslankaprv authored and vbotbuildovich committed Jan 19, 2024
1 parent 5406f5a commit 97bd9ec
Showing 1 changed file with 13 additions and 6 deletions.
19 changes: 13 additions & 6 deletions src/v/kafka/server/replicated_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -460,14 +460,21 @@ ss::future<error_code> replicated_partition::validate_fetch_offset(
// offset validation logic on follower
if (reading_from_follower && !_partition->is_leader()) {
auto ec = error_code::none;
if (fetch_offset < start_offset()) {

const std::pair<model::offset, model::offset> bounds = std::minmax(
leader_high_watermark(), log_end_offset());
const auto effective_log_end_offset = bounds.second;
const auto available_to_read = bounds.first;
if (
fetch_offset < start_offset()
|| fetch_offset > effective_log_end_offset) {
ec = error_code::offset_out_of_range;
} else if (
fetch_offset > high_watermark()
&& fetch_offset <= leader_high_watermark()) {
} else if (fetch_offset > available_to_read) {
/**
* Offset know to be committed but not yet available on the
* follower.
*/
ec = error_code::offset_not_available;
} else if (fetch_offset > leader_high_watermark()) {
ec = error_code::offset_out_of_range;
}

if (ec != error_code::none) {
Expand Down

0 comments on commit 97bd9ec

Please sign in to comment.