diff --git a/src/v/kafka/server/group_manager.cc b/src/v/kafka/server/group_manager.cc index 279fc9b76851..8d116e686f04 100644 --- a/src/v/kafka/server/group_manager.cc +++ b/src/v/kafka/server/group_manager.cc @@ -849,16 +849,31 @@ ss::future<> group_manager::handle_partition_leader_change( std::nullopt, std::nullopt, std::nullopt); - + auto expected_to_read = model::prev_offset( + p->partition->high_watermark()); return p->partition->make_reader(reader_config) - .then([this, term, p, timeout]( + .then([this, term, p, timeout, expected_to_read]( model::record_batch_reader reader) { return std::move(reader) .consume( group_recovery_consumer(_serializer_factory(), p->as), timeout) - .then([this, term, p]( + .then([this, term, p, expected_to_read]( group_recovery_consumer_state state) { + if (state.last_read_offset < expected_to_read) { + vlog( + klog.error, + "error recovering group state from {}. " + "Expected to read up to {} but last offset " + "consumed is equal to {}", + p->partition->ntp(), + expected_to_read, + state.last_read_offset); + // force step down to allow other node to + // recover group + return p->partition->raft()->step_down( + "unable to recover group, short read"); + } // avoid trying to recover if we stopped the // reader because an abort was requested if (p->as.abort_requested()) { diff --git a/src/v/kafka/server/group_recovery_consumer.cc b/src/v/kafka/server/group_recovery_consumer.cc index a7223721641b..834db2fd1724 100644 --- a/src/v/kafka/server/group_recovery_consumer.cc +++ b/src/v/kafka/server/group_recovery_consumer.cc @@ -74,6 +74,7 @@ group_recovery_consumer::operator()(model::record_batch batch) { if (_as.abort_requested()) { co_return ss::stop_iteration::yes; } + _state.last_read_offset = batch.last_offset(); if (batch.header().type == model::record_batch_type::raft_data) { _batch_base_offset = batch.base_offset(); co_await model::for_each_record(batch, [this](model::record& r) { diff --git a/src/v/kafka/server/group_recovery_consumer.h b/src/v/kafka/server/group_recovery_consumer.h index 0901017ea36f..c4e9830e430f 100644 --- a/src/v/kafka/server/group_recovery_consumer.h +++ b/src/v/kafka/server/group_recovery_consumer.h @@ -12,6 +12,7 @@ #include "kafka/server/group_metadata.h" #include "kafka/server/group_stm.h" +#include "model/fundamental.h" #include #include @@ -30,6 +31,7 @@ struct group_recovery_consumer_state { * retention feature is activated. see group::offset_metadata for more info. */ bool has_offset_retention_feature_fence{false}; + model::offset last_read_offset; }; class group_recovery_consumer {