From 93428112eb256a4daabe107489f33bc6358bfa14 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Tue, 16 Apr 2024 17:46:53 +0200 Subject: [PATCH] model: add record_batch_reader::peek_each_ref It is similar to for_each_ref, but advances only if the consumer returns ss::stop_iteration::no. I.e. the batch where the consumer stopped remains available for reading by subsequent consumers. --- src/v/model/record_batch_reader.h | 41 +++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/src/v/model/record_batch_reader.h b/src/v/model/record_batch_reader.h index 847ff2ab0bc4a..225b1a65c8ca4 100644 --- a/src/v/model/record_batch_reader.h +++ b/src/v/model/record_batch_reader.h @@ -98,6 +98,12 @@ class record_batch_reader final { return do_consume(consumer, timeout); }); } + template + auto peek_each_ref(ReferenceConsumer c, timeout_clock::time_point tm) { + return ss::do_with(std::move(c), [this, tm](ReferenceConsumer& c) { + return do_peek_each_ref(c, tm); + }); + } private: record_batch pop_batch() { @@ -142,6 +148,31 @@ class record_batch_reader final { return c(pop_batch()); }); } + template + auto do_peek_each_ref( + ReferenceConsumer& refc, timeout_clock::time_point timeout) { + return do_action(refc, timeout, [this](ReferenceConsumer& c) { + return ss::visit( + _slice, + [&c](data_t& d) { + return c(d.front()).then([&](ss::stop_iteration stop) { + if (!stop) { + d.pop_front(); + } + return stop; + }); + }, + [&c](foreign_data_t& d) { + return c((*d.buffer)[d.index]) + .then([&](ss::stop_iteration stop) { + if (!stop) { + ++d.index; + } + return stop; + }); + }); + }); + } template auto do_action( ConsumerType& consumer, @@ -249,6 +280,16 @@ class record_batch_reader final { }); } + /// Similar to for_each_ref, but advances only if the consumer returns + /// ss::stop_iteration::no. I.e. the batch where the consumer stopped + /// remains available for reading by subsequent consumers. + template + requires ReferenceBatchReaderConsumer + auto peek_each_ref( + ReferenceConsumer consumer, timeout_clock::time_point timeout) & { + return _impl->peek_each_ref(std::move(consumer), timeout); + } + std::unique_ptr release() && { return std::move(_impl); } private: