Skip to content

Commit

Permalink
Merge pull request #17895 from ztlpn/raft-fix-delayed-requests
Browse files Browse the repository at this point in the history
raft: in append_entries skip batches that we already have
  • Loading branch information
ztlpn authored Apr 18, 2024
2 parents 1f5a423 + f0c5772 commit ff870e1
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 224 deletions.
41 changes: 41 additions & 0 deletions src/v/model/record_batch_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ class record_batch_reader final {
return do_consume(consumer, timeout);
});
}
template<typename ReferenceConsumer>
auto peek_each_ref(ReferenceConsumer c, timeout_clock::time_point tm) {
return ss::do_with(std::move(c), [this, tm](ReferenceConsumer& c) {
return do_peek_each_ref(c, tm);
});
}

private:
record_batch pop_batch() {
Expand Down Expand Up @@ -142,6 +148,31 @@ class record_batch_reader final {
return c(pop_batch());
});
}
template<typename ReferenceConsumer>
auto do_peek_each_ref(
ReferenceConsumer& refc, timeout_clock::time_point timeout) {
return do_action(refc, timeout, [this](ReferenceConsumer& c) {
return ss::visit(
_slice,
[&c](data_t& d) {
return c(d.front()).then([&](ss::stop_iteration stop) {
if (!stop) {
d.pop_front();
}
return stop;
});
},
[&c](foreign_data_t& d) {
return c((*d.buffer)[d.index])
.then([&](ss::stop_iteration stop) {
if (!stop) {
++d.index;
}
return stop;
});
});
});
}
template<typename ConsumerType, typename ActionFn>
auto do_action(
ConsumerType& consumer,
Expand Down Expand Up @@ -249,6 +280,16 @@ class record_batch_reader final {
});
}

/// Similar to for_each_ref, but advances only if the consumer returns
/// ss::stop_iteration::no. I.e. the batch where the consumer stopped
/// remains available for reading by subsequent consumers.
template<typename ReferenceConsumer>
requires ReferenceBatchReaderConsumer<ReferenceConsumer>
auto peek_each_ref(
ReferenceConsumer consumer, timeout_clock::time_point timeout) & {
return _impl->peek_each_ref(std::move(consumer), timeout);
}

std::unique_ptr<impl> release() && { return std::move(_impl); }

private:
Expand Down
Loading

0 comments on commit ff870e1

Please sign in to comment.