Skip to content

Commit

Permalink
Merge pull request #22624 from vbotbuildovich/backport-pr-22618-v24.2…
Browse files Browse the repository at this point in the history
….x-646

[v24.2.x] k/group: fixed mapping of transactional control batch replication errors
  • Loading branch information
piyushredpanda authored Jul 29, 2024
2 parents c49772d + 70cc140 commit f758a43
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 11 deletions.
60 changes: 49 additions & 11 deletions src/v/kafka/server/group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "kafka/protocol/sync_group.h"
#include "kafka/protocol/txn_offset_commit.h"
#include "kafka/protocol/wire.h"
#include "kafka/server/errors.h"
#include "kafka/server/group_metadata.h"
#include "kafka/server/logger.h"
#include "kafka/server/member.h"
Expand Down Expand Up @@ -1863,23 +1864,23 @@ group::begin_tx(cluster::begin_group_tx_request r) {
model::record_batch batch = make_tx_fence_batch(
r.pid, std::move(fence), use_dedicated_batch_type_for_fence());
auto reader = model::make_memory_record_batch_reader(std::move(batch));
auto res = co_await _partition->raft()->replicate(
auto result = co_await _partition->raft()->replicate(
_term,
std::move(reader),
raft::replicate_options(raft::consistency_level::quorum_ack));

if (!res) {
if (!result) {
vlog(
_ctx_txlog.warn,
"begin tx request {} failed - error replicating fencing batch - {}",
r,
res.error().message());
result.error().message());
if (
_partition->raft()->is_leader()
&& _partition->raft()->term() == _term) {
co_await _partition->raft()->step_down("group begin_tx failed");
}
co_return make_begin_tx_reply(cluster::tx::errc::leader_not_found);
co_return map_tx_replication_error(result.error());
}
auto [producer_it, _] = _producers.try_emplace(
r.pid.get_id(), r.pid.get_epoch());
Expand Down Expand Up @@ -1965,6 +1966,42 @@ group::abort_tx(cluster::abort_group_tx_request r) {
co_return co_await do_abort(r.group_id, r.pid, r.tx_seq);
}

cluster::tx::errc group::map_tx_replication_error(std::error_code ec) {
auto result_ec = cluster::tx::errc::none;
// All generic errors are mapped to not coordinator to force the client to
// retry, the errors like timeout and shutdown are mapped to timeout to
// indicate the uncertainty of the operation outcome
if (ec.category() == raft::error_category()) {
switch (static_cast<raft::errc>(ec.value())) {
case raft::errc::shutting_down:
case raft::errc::timeout:
result_ec = cluster::tx::errc::timeout;
break;
default:
result_ec = cluster::tx::errc::not_coordinator;
}
} else if (ec.category() == cluster::error_category()) {
switch (static_cast<cluster::errc>(ec.value())) {
case cluster::errc::shutting_down:
case cluster::errc::timeout:
result_ec = cluster::tx::errc::timeout;
break;
default:
result_ec = cluster::tx::errc::not_coordinator;
}
} else {
vlog(_ctx_txlog.warn, "unexpected replication error: {}", ec);
result_ec = cluster::tx::errc::not_coordinator;
}

vlog(
_ctx_txlog.info,
"transactional batch replication error: {}, mapped to: {}",
ec,
result_ec);
return result_ec;
}

ss::future<txn_offset_commit_response>
group::store_txn_offsets(txn_offset_commit_request r) {
// replaying the log, the term isn't set yet
Expand Down Expand Up @@ -2052,8 +2089,9 @@ group::store_txn_offsets(txn_offset_commit_request r) {
co_await _partition->raft()->step_down(
"group store_txn_offsets failed");
}
co_return txn_offset_commit_response(
r, error_code::unknown_server_error);
auto tx_ec = map_tx_replication_error(result.error());

co_return txn_offset_commit_response(r, map_tx_errc(tx_ec));
}

it = _producers.find(pid.get_id());
Expand Down Expand Up @@ -2852,23 +2890,23 @@ ss::future<cluster::abort_group_tx_reply> group::do_abort(
std::move(tx));
auto reader = model::make_memory_record_batch_reader(std::move(batch));

auto e = co_await _partition->raft()->replicate(
auto result = co_await _partition->raft()->replicate(
_term,
std::move(reader),
raft::replicate_options(raft::consistency_level::quorum_ack));

if (!e) {
if (!result) {
vlog(
_ctx_txlog.warn,
"Error \"{}\" on replicating pid:{} abort batch",
e.error(),
result.error(),
pid);
if (
_partition->raft()->is_leader()
&& _partition->raft()->term() == _term) {
co_await _partition->raft()->step_down("group do abort failed");
}
co_return make_abort_tx_reply(cluster::tx::errc::timeout);
co_return map_tx_replication_error(result.error());
}
auto it = _producers.find(pid.get_id());
if (it != _producers.end()) {
Expand Down Expand Up @@ -2948,7 +2986,7 @@ group::do_commit(kafka::group_id group_id, model::producer_identity pid) {
&& _partition->raft()->term() == _term) {
co_await _partition->raft()->step_down("group tx commit failed");
}
co_return make_commit_tx_reply(cluster::tx::errc::timeout);
co_return map_tx_replication_error(result.error());
}

it = _producers.find(pid.get_id());
Expand Down
2 changes: 2 additions & 0 deletions src/v/kafka/server/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,8 @@ class group final : public ss::enable_lw_shared_from_this<group> {
features::feature::group_tx_fence_dedicated_batch_type);
}

cluster::tx::errc map_tx_replication_error(std::error_code ec);

kafka::group_id _id;
group_state _state;
std::optional<model::timestamp> _state_timestamp;
Expand Down

0 comments on commit f758a43

Please sign in to comment.