Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.2.x] k/group: fixed mapping of transactional control batch replication errors #22624

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 49 additions & 11 deletions src/v/kafka/server/group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "kafka/protocol/sync_group.h"
#include "kafka/protocol/txn_offset_commit.h"
#include "kafka/protocol/wire.h"
#include "kafka/server/errors.h"
#include "kafka/server/group_metadata.h"
#include "kafka/server/logger.h"
#include "kafka/server/member.h"
Expand Down Expand Up @@ -1863,23 +1864,23 @@ group::begin_tx(cluster::begin_group_tx_request r) {
model::record_batch batch = make_tx_fence_batch(
r.pid, std::move(fence), use_dedicated_batch_type_for_fence());
auto reader = model::make_memory_record_batch_reader(std::move(batch));
auto res = co_await _partition->raft()->replicate(
auto result = co_await _partition->raft()->replicate(
_term,
std::move(reader),
raft::replicate_options(raft::consistency_level::quorum_ack));

if (!res) {
if (!result) {
vlog(
_ctx_txlog.warn,
"begin tx request {} failed - error replicating fencing batch - {}",
r,
res.error().message());
result.error().message());
if (
_partition->raft()->is_leader()
&& _partition->raft()->term() == _term) {
co_await _partition->raft()->step_down("group begin_tx failed");
}
co_return make_begin_tx_reply(cluster::tx::errc::leader_not_found);
co_return map_tx_replication_error(result.error());
}
auto [producer_it, _] = _producers.try_emplace(
r.pid.get_id(), r.pid.get_epoch());
Expand Down Expand Up @@ -1965,6 +1966,42 @@ group::abort_tx(cluster::abort_group_tx_request r) {
co_return co_await do_abort(r.group_id, r.pid, r.tx_seq);
}

cluster::tx::errc group::map_tx_replication_error(std::error_code ec) {
auto result_ec = cluster::tx::errc::none;
// All generic errors are mapped to not coordinator to force the client to
// retry, the errors like timeout and shutdown are mapped to timeout to
// indicate the uncertainty of the operation outcome
if (ec.category() == raft::error_category()) {
switch (static_cast<raft::errc>(ec.value())) {
case raft::errc::shutting_down:
case raft::errc::timeout:
result_ec = cluster::tx::errc::timeout;
break;
default:
result_ec = cluster::tx::errc::not_coordinator;
}
} else if (ec.category() == cluster::error_category()) {
switch (static_cast<cluster::errc>(ec.value())) {
case cluster::errc::shutting_down:
case cluster::errc::timeout:
result_ec = cluster::tx::errc::timeout;
break;
default:
result_ec = cluster::tx::errc::not_coordinator;
}
} else {
vlog(_ctx_txlog.warn, "unexpected replication error: {}", ec);
result_ec = cluster::tx::errc::not_coordinator;
}

vlog(
_ctx_txlog.info,
"transactional batch replication error: {}, mapped to: {}",
ec,
result_ec);
return result_ec;
}

ss::future<txn_offset_commit_response>
group::store_txn_offsets(txn_offset_commit_request r) {
// replaying the log, the term isn't set yet
Expand Down Expand Up @@ -2052,8 +2089,9 @@ group::store_txn_offsets(txn_offset_commit_request r) {
co_await _partition->raft()->step_down(
"group store_txn_offsets failed");
}
co_return txn_offset_commit_response(
r, error_code::unknown_server_error);
auto tx_ec = map_tx_replication_error(result.error());

co_return txn_offset_commit_response(r, map_tx_errc(tx_ec));
}

it = _producers.find(pid.get_id());
Expand Down Expand Up @@ -2852,23 +2890,23 @@ ss::future<cluster::abort_group_tx_reply> group::do_abort(
std::move(tx));
auto reader = model::make_memory_record_batch_reader(std::move(batch));

auto e = co_await _partition->raft()->replicate(
auto result = co_await _partition->raft()->replicate(
_term,
std::move(reader),
raft::replicate_options(raft::consistency_level::quorum_ack));

if (!e) {
if (!result) {
vlog(
_ctx_txlog.warn,
"Error \"{}\" on replicating pid:{} abort batch",
e.error(),
result.error(),
pid);
if (
_partition->raft()->is_leader()
&& _partition->raft()->term() == _term) {
co_await _partition->raft()->step_down("group do abort failed");
}
co_return make_abort_tx_reply(cluster::tx::errc::timeout);
co_return map_tx_replication_error(result.error());
}
auto it = _producers.find(pid.get_id());
if (it != _producers.end()) {
Expand Down Expand Up @@ -2948,7 +2986,7 @@ group::do_commit(kafka::group_id group_id, model::producer_identity pid) {
&& _partition->raft()->term() == _term) {
co_await _partition->raft()->step_down("group tx commit failed");
}
co_return make_commit_tx_reply(cluster::tx::errc::timeout);
co_return map_tx_replication_error(result.error());
}

it = _producers.find(pid.get_id());
Expand Down
2 changes: 2 additions & 0 deletions src/v/kafka/server/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,8 @@ class group final : public ss::enable_lw_shared_from_this<group> {
features::feature::group_tx_fence_dedicated_batch_type);
}

cluster::tx::errc map_tx_replication_error(std::error_code ec);

kafka::group_id _id;
group_state _state;
std::optional<model::timestamp> _state_timestamp;
Expand Down
Loading