From 93a0c7a03115d28bcfca893eff5d697e495cf809 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Mon, 29 Jul 2024 07:05:54 +0000 Subject: [PATCH] k/group: fixed mapping of transactional control batch replication errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When replication of transactional control batch fails group coordinator steps down and return an error to transaction coordinator. Fixed the incorrect mapping of replication errors which may lead to returning an `unknown_server_error` to client. With the new mapping policy the error handling in consistent for all the replication batches. All the errors are translated to `not_coordinator_error`. The errors indicating uncertain outcome like shutting down and timeout are translated to timeout to let the client know that the outcome of operation must be verified. Signed-off-by: Michał Maślanka --- src/v/kafka/server/group.cc | 60 ++++++++++++++++++++++++++++++------- src/v/kafka/server/group.h | 2 ++ 2 files changed, 51 insertions(+), 11 deletions(-) diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index 48dc8d3ecae2..03b8201f637f 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -28,6 +28,7 @@ #include "kafka/protocol/sync_group.h" #include "kafka/protocol/txn_offset_commit.h" #include "kafka/protocol/wire.h" +#include "kafka/server/errors.h" #include "kafka/server/group_metadata.h" #include "kafka/server/logger.h" #include "kafka/server/member.h" @@ -1863,23 +1864,23 @@ group::begin_tx(cluster::begin_group_tx_request r) { model::record_batch batch = make_tx_fence_batch( r.pid, std::move(fence), use_dedicated_batch_type_for_fence()); auto reader = model::make_memory_record_batch_reader(std::move(batch)); - auto res = co_await _partition->raft()->replicate( + auto result = co_await _partition->raft()->replicate( _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); - if (!res) { + if (!result) { vlog( _ctx_txlog.warn, "begin tx request {} failed - error replicating fencing batch - {}", r, - res.error().message()); + result.error().message()); if ( _partition->raft()->is_leader() && _partition->raft()->term() == _term) { co_await _partition->raft()->step_down("group begin_tx failed"); } - co_return make_begin_tx_reply(cluster::tx::errc::leader_not_found); + co_return map_tx_replication_error(result.error()); } auto [producer_it, _] = _producers.try_emplace( r.pid.get_id(), r.pid.get_epoch()); @@ -1965,6 +1966,42 @@ group::abort_tx(cluster::abort_group_tx_request r) { co_return co_await do_abort(r.group_id, r.pid, r.tx_seq); } +cluster::tx::errc group::map_tx_replication_error(std::error_code ec) { + auto result_ec = cluster::tx::errc::none; + // All generic errors are mapped to not coordinator to force the client to + // retry, the errors like timeout and shutdown are mapped to timeout to + // indicate the uncertainty of the operation outcome + if (ec.category() == raft::error_category()) { + switch (static_cast(ec.value())) { + case raft::errc::shutting_down: + case raft::errc::timeout: + result_ec = cluster::tx::errc::timeout; + break; + default: + result_ec = cluster::tx::errc::not_coordinator; + } + } else if (ec.category() == cluster::error_category()) { + switch (static_cast(ec.value())) { + case cluster::errc::shutting_down: + case cluster::errc::timeout: + result_ec = cluster::tx::errc::timeout; + break; + default: + result_ec = cluster::tx::errc::not_coordinator; + } + } else { + vlog(_ctx_txlog.warn, "unexpected replication error: {}", ec); + result_ec = cluster::tx::errc::not_coordinator; + } + + vlog( + _ctx_txlog.info, + "transactional batch replication error: {}, mapped to: {}", + ec, + result_ec); + return result_ec; +} + ss::future group::store_txn_offsets(txn_offset_commit_request r) { // replaying the log, the term isn't set yet @@ -2052,8 +2089,9 @@ group::store_txn_offsets(txn_offset_commit_request r) { co_await _partition->raft()->step_down( "group store_txn_offsets failed"); } - co_return txn_offset_commit_response( - r, error_code::unknown_server_error); + auto tx_ec = map_tx_replication_error(result.error()); + + co_return txn_offset_commit_response(r, map_tx_errc(tx_ec)); } it = _producers.find(pid.get_id()); @@ -2852,23 +2890,23 @@ ss::future group::do_abort( std::move(tx)); auto reader = model::make_memory_record_batch_reader(std::move(batch)); - auto e = co_await _partition->raft()->replicate( + auto result = co_await _partition->raft()->replicate( _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); - if (!e) { + if (!result) { vlog( _ctx_txlog.warn, "Error \"{}\" on replicating pid:{} abort batch", - e.error(), + result.error(), pid); if ( _partition->raft()->is_leader() && _partition->raft()->term() == _term) { co_await _partition->raft()->step_down("group do abort failed"); } - co_return make_abort_tx_reply(cluster::tx::errc::timeout); + co_return map_tx_replication_error(result.error()); } auto it = _producers.find(pid.get_id()); if (it != _producers.end()) { @@ -2948,7 +2986,7 @@ group::do_commit(kafka::group_id group_id, model::producer_identity pid) { && _partition->raft()->term() == _term) { co_await _partition->raft()->step_down("group tx commit failed"); } - co_return make_commit_tx_reply(cluster::tx::errc::timeout); + co_return map_tx_replication_error(result.error()); } it = _producers.find(pid.get_id()); diff --git a/src/v/kafka/server/group.h b/src/v/kafka/server/group.h index cacccf302e8a..a78857bd7ce3 100644 --- a/src/v/kafka/server/group.h +++ b/src/v/kafka/server/group.h @@ -896,6 +896,8 @@ class group final : public ss::enable_lw_shared_from_this { features::feature::group_tx_fence_dedicated_batch_type); } + cluster::tx::errc map_tx_replication_error(std::error_code ec); + kafka::group_id _id; group_state _state; std::optional _state_timestamp;