Skip to content

Commit

Permalink
k/group: fixed mapping of transactional control batch replication errors
Browse files Browse the repository at this point in the history
When replication of transactional control batch fails group coordinator
steps down and return an error to transaction coordinator. Fixed the
incorrect mapping of replication errors which may lead to returning an
`unknown_server_error` to client.

With the new mapping policy the error handling in consistent for all the
replication batches. All the errors are translated to
`not_coordinator_error`. The errors indicating uncertain outcome like
shutting down and timeout are translated to timeout to let the client
know that the outcome of operation must be verified.

Signed-off-by: Michał Maślanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Jul 29, 2024
1 parent 711fb12 commit 93a0c7a
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 11 deletions.
60 changes: 49 additions & 11 deletions src/v/kafka/server/group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "kafka/protocol/sync_group.h"
#include "kafka/protocol/txn_offset_commit.h"
#include "kafka/protocol/wire.h"
#include "kafka/server/errors.h"
#include "kafka/server/group_metadata.h"
#include "kafka/server/logger.h"
#include "kafka/server/member.h"
Expand Down Expand Up @@ -1863,23 +1864,23 @@ group::begin_tx(cluster::begin_group_tx_request r) {
model::record_batch batch = make_tx_fence_batch(
r.pid, std::move(fence), use_dedicated_batch_type_for_fence());
auto reader = model::make_memory_record_batch_reader(std::move(batch));
auto res = co_await _partition->raft()->replicate(
auto result = co_await _partition->raft()->replicate(
_term,
std::move(reader),
raft::replicate_options(raft::consistency_level::quorum_ack));

if (!res) {
if (!result) {
vlog(
_ctx_txlog.warn,
"begin tx request {} failed - error replicating fencing batch - {}",
r,
res.error().message());
result.error().message());
if (
_partition->raft()->is_leader()
&& _partition->raft()->term() == _term) {
co_await _partition->raft()->step_down("group begin_tx failed");
}
co_return make_begin_tx_reply(cluster::tx::errc::leader_not_found);
co_return map_tx_replication_error(result.error());
}
auto [producer_it, _] = _producers.try_emplace(
r.pid.get_id(), r.pid.get_epoch());
Expand Down Expand Up @@ -1965,6 +1966,42 @@ group::abort_tx(cluster::abort_group_tx_request r) {
co_return co_await do_abort(r.group_id, r.pid, r.tx_seq);
}

cluster::tx::errc group::map_tx_replication_error(std::error_code ec) {
auto result_ec = cluster::tx::errc::none;
// All generic errors are mapped to not coordinator to force the client to
// retry, the errors like timeout and shutdown are mapped to timeout to
// indicate the uncertainty of the operation outcome
if (ec.category() == raft::error_category()) {
switch (static_cast<raft::errc>(ec.value())) {
case raft::errc::shutting_down:
case raft::errc::timeout:
result_ec = cluster::tx::errc::timeout;
break;
default:
result_ec = cluster::tx::errc::not_coordinator;
}
} else if (ec.category() == cluster::error_category()) {
switch (static_cast<cluster::errc>(ec.value())) {
case cluster::errc::shutting_down:
case cluster::errc::timeout:
result_ec = cluster::tx::errc::timeout;
break;
default:
result_ec = cluster::tx::errc::not_coordinator;
}
} else {
vlog(_ctx_txlog.warn, "unexpected replication error: {}", ec);
result_ec = cluster::tx::errc::not_coordinator;
}

vlog(
_ctx_txlog.info,
"transactional batch replication error: {}, mapped to: {}",
ec,
result_ec);
return result_ec;
}

ss::future<txn_offset_commit_response>
group::store_txn_offsets(txn_offset_commit_request r) {
// replaying the log, the term isn't set yet
Expand Down Expand Up @@ -2052,8 +2089,9 @@ group::store_txn_offsets(txn_offset_commit_request r) {
co_await _partition->raft()->step_down(
"group store_txn_offsets failed");
}
co_return txn_offset_commit_response(
r, error_code::unknown_server_error);
auto tx_ec = map_tx_replication_error(result.error());

co_return txn_offset_commit_response(r, map_tx_errc(tx_ec));
}

it = _producers.find(pid.get_id());
Expand Down Expand Up @@ -2852,23 +2890,23 @@ ss::future<cluster::abort_group_tx_reply> group::do_abort(
std::move(tx));
auto reader = model::make_memory_record_batch_reader(std::move(batch));

auto e = co_await _partition->raft()->replicate(
auto result = co_await _partition->raft()->replicate(
_term,
std::move(reader),
raft::replicate_options(raft::consistency_level::quorum_ack));

if (!e) {
if (!result) {
vlog(
_ctx_txlog.warn,
"Error \"{}\" on replicating pid:{} abort batch",
e.error(),
result.error(),
pid);
if (
_partition->raft()->is_leader()
&& _partition->raft()->term() == _term) {
co_await _partition->raft()->step_down("group do abort failed");
}
co_return make_abort_tx_reply(cluster::tx::errc::timeout);
co_return map_tx_replication_error(result.error());
}
auto it = _producers.find(pid.get_id());
if (it != _producers.end()) {
Expand Down Expand Up @@ -2948,7 +2986,7 @@ group::do_commit(kafka::group_id group_id, model::producer_identity pid) {
&& _partition->raft()->term() == _term) {
co_await _partition->raft()->step_down("group tx commit failed");
}
co_return make_commit_tx_reply(cluster::tx::errc::timeout);
co_return map_tx_replication_error(result.error());
}

it = _producers.find(pid.get_id());
Expand Down
2 changes: 2 additions & 0 deletions src/v/kafka/server/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,8 @@ class group final : public ss::enable_lw_shared_from_this<group> {
features::feature::group_tx_fence_dedicated_batch_type);
}

cluster::tx::errc map_tx_replication_error(std::error_code ec);

kafka::group_id _id;
group_state _state;
std::optional<model::timestamp> _state_timestamp;
Expand Down

0 comments on commit 93a0c7a

Please sign in to comment.