diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index 48dc8d3ecae2..03b8201f637f 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -28,6 +28,7 @@ #include "kafka/protocol/sync_group.h" #include "kafka/protocol/txn_offset_commit.h" #include "kafka/protocol/wire.h" +#include "kafka/server/errors.h" #include "kafka/server/group_metadata.h" #include "kafka/server/logger.h" #include "kafka/server/member.h" @@ -1863,23 +1864,23 @@ group::begin_tx(cluster::begin_group_tx_request r) { model::record_batch batch = make_tx_fence_batch( r.pid, std::move(fence), use_dedicated_batch_type_for_fence()); auto reader = model::make_memory_record_batch_reader(std::move(batch)); - auto res = co_await _partition->raft()->replicate( + auto result = co_await _partition->raft()->replicate( _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); - if (!res) { + if (!result) { vlog( _ctx_txlog.warn, "begin tx request {} failed - error replicating fencing batch - {}", r, - res.error().message()); + result.error().message()); if ( _partition->raft()->is_leader() && _partition->raft()->term() == _term) { co_await _partition->raft()->step_down("group begin_tx failed"); } - co_return make_begin_tx_reply(cluster::tx::errc::leader_not_found); + co_return map_tx_replication_error(result.error()); } auto [producer_it, _] = _producers.try_emplace( r.pid.get_id(), r.pid.get_epoch()); @@ -1965,6 +1966,42 @@ group::abort_tx(cluster::abort_group_tx_request r) { co_return co_await do_abort(r.group_id, r.pid, r.tx_seq); } +cluster::tx::errc group::map_tx_replication_error(std::error_code ec) { + auto result_ec = cluster::tx::errc::none; + // All generic errors are mapped to not coordinator to force the client to + // retry, the errors like timeout and shutdown are mapped to timeout to + // indicate the uncertainty of the operation outcome + if (ec.category() == raft::error_category()) { + switch (static_cast(ec.value())) { + case raft::errc::shutting_down: + case raft::errc::timeout: + result_ec = cluster::tx::errc::timeout; + break; + default: + result_ec = cluster::tx::errc::not_coordinator; + } + } else if (ec.category() == cluster::error_category()) { + switch (static_cast(ec.value())) { + case cluster::errc::shutting_down: + case cluster::errc::timeout: + result_ec = cluster::tx::errc::timeout; + break; + default: + result_ec = cluster::tx::errc::not_coordinator; + } + } else { + vlog(_ctx_txlog.warn, "unexpected replication error: {}", ec); + result_ec = cluster::tx::errc::not_coordinator; + } + + vlog( + _ctx_txlog.info, + "transactional batch replication error: {}, mapped to: {}", + ec, + result_ec); + return result_ec; +} + ss::future group::store_txn_offsets(txn_offset_commit_request r) { // replaying the log, the term isn't set yet @@ -2052,8 +2089,9 @@ group::store_txn_offsets(txn_offset_commit_request r) { co_await _partition->raft()->step_down( "group store_txn_offsets failed"); } - co_return txn_offset_commit_response( - r, error_code::unknown_server_error); + auto tx_ec = map_tx_replication_error(result.error()); + + co_return txn_offset_commit_response(r, map_tx_errc(tx_ec)); } it = _producers.find(pid.get_id()); @@ -2852,23 +2890,23 @@ ss::future group::do_abort( std::move(tx)); auto reader = model::make_memory_record_batch_reader(std::move(batch)); - auto e = co_await _partition->raft()->replicate( + auto result = co_await _partition->raft()->replicate( _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); - if (!e) { + if (!result) { vlog( _ctx_txlog.warn, "Error \"{}\" on replicating pid:{} abort batch", - e.error(), + result.error(), pid); if ( _partition->raft()->is_leader() && _partition->raft()->term() == _term) { co_await _partition->raft()->step_down("group do abort failed"); } - co_return make_abort_tx_reply(cluster::tx::errc::timeout); + co_return map_tx_replication_error(result.error()); } auto it = _producers.find(pid.get_id()); if (it != _producers.end()) { @@ -2948,7 +2986,7 @@ group::do_commit(kafka::group_id group_id, model::producer_identity pid) { && _partition->raft()->term() == _term) { co_await _partition->raft()->step_down("group tx commit failed"); } - co_return make_commit_tx_reply(cluster::tx::errc::timeout); + co_return map_tx_replication_error(result.error()); } it = _producers.find(pid.get_id()); diff --git a/src/v/kafka/server/group.h b/src/v/kafka/server/group.h index cacccf302e8a..a78857bd7ce3 100644 --- a/src/v/kafka/server/group.h +++ b/src/v/kafka/server/group.h @@ -896,6 +896,8 @@ class group final : public ss::enable_lw_shared_from_this { features::feature::group_tx_fence_dedicated_batch_type); } + cluster::tx::errc map_tx_replication_error(std::error_code ec); + kafka::group_id _id; group_state _state; std::optional _state_timestamp;