Skip to content

Commit

Permalink
Merge pull request redpanda-data#17673 from mmaslankaprv/error-on-par…
Browse files Browse the repository at this point in the history
…tial-recovery

Abort recovery when the recovery read is incomplete
  • Loading branch information
mmaslankaprv authored Apr 9, 2024
2 parents 4c47fe9 + 9d75795 commit 768cdf2
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 3 deletions.
21 changes: 18 additions & 3 deletions src/v/kafka/server/group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -849,16 +849,31 @@ ss::future<> group_manager::handle_partition_leader_change(
std::nullopt,
std::nullopt,
std::nullopt);

auto expected_to_read = model::prev_offset(
p->partition->high_watermark());
return p->partition->make_reader(reader_config)
.then([this, term, p, timeout](
.then([this, term, p, timeout, expected_to_read](
model::record_batch_reader reader) {
return std::move(reader)
.consume(
group_recovery_consumer(_serializer_factory(), p->as),
timeout)
.then([this, term, p](
.then([this, term, p, expected_to_read](
group_recovery_consumer_state state) {
if (state.last_read_offset < expected_to_read) {
vlog(
klog.error,
"error recovering group state from {}. "
"Expected to read up to {} but last offset "
"consumed is equal to {}",
p->partition->ntp(),
expected_to_read,
state.last_read_offset);
// force step down to allow other node to
// recover group
return p->partition->raft()->step_down(
"unable to recover group, short read");
}
// avoid trying to recover if we stopped the
// reader because an abort was requested
if (p->as.abort_requested()) {
Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/server/group_recovery_consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ group_recovery_consumer::operator()(model::record_batch batch) {
if (_as.abort_requested()) {
co_return ss::stop_iteration::yes;
}
_state.last_read_offset = batch.last_offset();
if (batch.header().type == model::record_batch_type::raft_data) {
_batch_base_offset = batch.base_offset();
co_await model::for_each_record(batch, [this](model::record& r) {
Expand Down
2 changes: 2 additions & 0 deletions src/v/kafka/server/group_recovery_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "kafka/server/group_metadata.h"
#include "kafka/server/group_stm.h"
#include "model/fundamental.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/future.hh>
Expand All @@ -30,6 +31,7 @@ struct group_recovery_consumer_state {
* retention feature is activated. see group::offset_metadata for more info.
*/
bool has_offset_retention_feature_fence{false};
model::offset last_read_offset;
};

class group_recovery_consumer {
Expand Down

0 comments on commit 768cdf2

Please sign in to comment.