From f63c36cb36a14875c1b31b6ca08409218f5f10eb Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Thu, 5 Oct 2023 16:13:11 +0200 Subject: [PATCH 1/7] c/config_manager: populate configuration status on members notification When member is added to the cluster `cluster::configuration_manager` should be immediately aware of it instead of waiting for the first status update. Wired up cluster member added notification to update cluster configuration reconciliation status with initial state for joining member. Fixes: #13497 Fixes: #13503 Signed-off-by: Michal Maslanka (cherry picked from commit 57c8d7a639b2548a55c20017b3e6f6855389961e) --- src/v/cluster/config_manager.cc | 19 ++++++++++++++----- src/v/cluster/config_manager.h | 2 +- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/v/cluster/config_manager.cc b/src/v/cluster/config_manager.cc index e24915be744f6..d0b23804b0f0d 100644 --- a/src/v/cluster/config_manager.cc +++ b/src/v/cluster/config_manager.cc @@ -18,9 +18,11 @@ #include "cluster/logger.h" #include "cluster/members_table.h" #include "cluster/partition_leaders_table.h" +#include "cluster/types.h" #include "config/configuration.h" #include "config/node_config.h" #include "features/feature_table.h" +#include "model/metadata.h" #include "resource_mgmt/io_priority.h" #include "rpc/connection_cache.h" #include "utils/file_io.h" @@ -81,7 +83,7 @@ config_manager::config_manager( /** * Register notification immediately not to lose status updates. */ - _member_removed_notification + _member_update_notification = _members.local().register_members_updated_notification( [this](model::node_id id, model::membership_state new_state) { handle_cluster_members_update(id, new_state); @@ -229,17 +231,24 @@ ss::future<> config_manager::start() { } void config_manager::handle_cluster_members_update( model::node_id id, model::membership_state new_state) { - if (new_state != model::membership_state::removed) { - return; + vlog( + clusterlog.debug, + "Processing membership notification: {{id: {} state: {}}}", + id, + new_state); + if (new_state == model::membership_state::active) { + // add an empty status placeholder if node is not yet known + status.try_emplace(id, config_status{.node = id}); + } else if (new_state == model::membership_state::removed) { + status.erase(id); } - status.erase(id); } ss::future<> config_manager::stop() { vlog(clusterlog.info, "Stopping Config Manager..."); _reconcile_wait.broken(); _members.local().unregister_members_updated_notification( - _member_removed_notification); + _member_update_notification); _leaders.local().unregister_leadership_change_notification( _raft0_leader_changed_notification); co_await _gate.close(); diff --git a/src/v/cluster/config_manager.h b/src/v/cluster/config_manager.h index bd3e53e1f445e..3554a4b35f2e3 100644 --- a/src/v/cluster/config_manager.h +++ b/src/v/cluster/config_manager.h @@ -135,7 +135,7 @@ class config_manager final { ss::sharded& _leaders; ss::sharded& _feature_table; ss::sharded& _members; - notification_id_type _member_removed_notification; + notification_id_type _member_update_notification; notification_id_type _raft0_leader_changed_notification; ss::condition_variable _reconcile_wait; From b7ddcf0857d17af719c7cb2a673d65c24f024478 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Mon, 20 Nov 2023 16:16:03 +0100 Subject: [PATCH 2/7] r/consensus: log configuration replace event Signed-off-by: Michal Maslanka (cherry picked from commit cb4e6075344b50cc3606e4d7946a4ad985404c4c) --- src/v/raft/consensus.cc | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index c8dc032d4819f..442b2b6a05ef2 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -1115,11 +1115,17 @@ consensus::remove_member(vnode node, model::revision_id new_revision) { ss::future consensus::replace_configuration( std::vector nodes, model::revision_id new_revision) { - return change_configuration([nodes = std::move(nodes), new_revision]( + return change_configuration([this, nodes = std::move(nodes), new_revision]( group_configuration current) mutable { + auto old = current; + current.set_version(raft::group_configuration::v_5); current.replace(nodes, new_revision); - + vlog( + _ctxlog.debug, + "Replacing current configuration: {} with new configuration: {}", + old, + current); return result{std::move(current)}; }); } From e6195754c7c326a489c06698a864d22a5f0fd156 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Mon, 20 Nov 2023 16:42:47 +0100 Subject: [PATCH 3/7] r/recovery_stm: stop recovery when follower was already updated Recovery and replicate stms are not synchronized. It may be the case when both of stms are active at the same time that the same batch is delivered to the follower twice. In general this batch duplication is harmless as Raft is not vulnerable for messages redelivery but it may cause unnecessary truncation and latency increase. Added a check validating expected log end offset right before sending recovery append entries request. This will prevent sending the same set of batches twice to the follower. Fixes: #14413 Signed-off-by: Michal Maslanka (cherry picked from commit 7bb54d3c5e747f0809e7d9fc74a36e2a8a34e25a) --- src/v/raft/recovery_stm.cc | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/v/raft/recovery_stm.cc b/src/v/raft/recovery_stm.cc index 534752562b341..af8f85db02fd2 100644 --- a/src/v/raft/recovery_stm.cc +++ b/src/v/raft/recovery_stm.cc @@ -445,6 +445,18 @@ ss::future<> recovery_stm::replicate( _stop_requested = true; return ss::now(); } + if (meta.value()->expected_log_end_offset >= _last_batch_offset) { + vlog( + _ctxlog.trace, + "follower expected log end offset is already updated, stopping " + "recovery. Expected log end offset: {}, recovery range last offset: " + "{}", + meta.value()->expected_log_end_offset, + _last_batch_offset); + + _stop_requested = true; + return ss::now(); + } /** * Update follower expected log end. It is equal to the last batch in a set * of batches read for this recovery round. From f90845415ce31e6ac84e2803f498867d3e499b88 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Tue, 31 Oct 2023 11:39:40 +0100 Subject: [PATCH 4/7] c/types: fixed to string operators for some transactional types Some of the operators were incorrectly declared while all of them were lacking definition Signed-off-by: Michal Maslanka (cherry picked from commit f65999a837b1ffd82f53dfe2137d049492e832af) --- src/v/cluster/types.cc | 32 ++++++++++++++++++++++++++++++++ src/v/cluster/types.h | 2 +- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index d1b8c2ad739e6..2c532944ab17e 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -1083,6 +1083,38 @@ std::ostream& operator<<(std::ostream& o, const nt_revision& ntr) { return o; } +std::ostream& operator<<(std::ostream& o, const fetch_tx_reply& r) { + fmt::print( + o, + "{{ec: {}, pid: {}, last_pid: {}, tx_seq: {}, timeout_ms: {}, status: " + "{}, partitions: {}, groups: {}}}", + r.ec, + r.pid, + r.last_pid, + r.tx_seq, + r.timeout_ms.count(), + r.status, + r.partitions, + r.groups); + return o; +} + +std::ostream& operator<<(std::ostream& o, const fetch_tx_reply::tx_group& g) { + fmt::print(o, "{{etag: {}, group_id: {}}}", g.etag, g.group_id); + return o; +} + +std::ostream& +operator<<(std::ostream& o, const fetch_tx_reply::tx_partition& p) { + fmt::print( + o, + "{{etag: {}, ntp: {}, revision: {}}}", + p.etag, + p.ntp, + p.topic_revision); + + return o; +} } // namespace cluster namespace reflection { diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index 4c0aacd66c3a9..e31fbfdbbc6c9 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -425,7 +425,7 @@ struct fetch_tx_reply friend bool operator==(const tx_group&, const tx_group&) = default; - friend std::ostream& operator<<(std::ostream& o, const tx_partition& r); + friend std::ostream& operator<<(std::ostream& o, const tx_group& r); auto serde_fields() { return std::tie(group_id, etag); } }; From 9d630ba7bc569bc5385738e0d365e20d369037a4 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Fri, 3 Nov 2023 15:05:59 +0100 Subject: [PATCH 5/7] tx: improved logging while processing transactions Signed-off-by: Michal Maslanka (cherry picked from commit b8f63a76bc1f9943503a61951321381a6f62013a) --- src/v/cluster/tm_stm.cc | 197 +++++--- src/v/cluster/tm_stm_cache.cc | 43 +- src/v/cluster/tx_gateway_frontend.cc | 703 +++++++++++++++++---------- src/v/cluster/types.cc | 7 + src/v/cluster/types.h | 1 + 5 files changed, 613 insertions(+), 338 deletions(-) diff --git a/src/v/cluster/tm_stm.cc b/src/v/cluster/tm_stm.cc index bb9ee15d0ea6c..c3ff3fba2e4ed 100644 --- a/src/v/cluster/tm_stm.cc +++ b/src/v/cluster/tm_stm.cc @@ -159,6 +159,11 @@ ss::future tm_stm::try_init_hosted_transactions( ss::future tm_stm::include_hosted_transaction( model::term_id term, kafka::transactional_id tx_id) { + vlog( + _ctx_log.trace, + "[tx_id={}] including hosted transactions in term: {}", + tx_id, + term); if (!_hosted_txes.inited) { co_return op_status::unknown; } @@ -174,6 +179,11 @@ ss::future tm_stm::include_hosted_transaction( ss::future tm_stm::exclude_hosted_transaction( model::term_id term, kafka::transactional_id tx_id) { + vlog( + _ctx_log.trace, + "[tx_id={}] excluding hosted transactions in term: {}", + tx_id, + term); if (!_hosted_txes.inited) { co_return op_status::unknown; } @@ -358,7 +368,8 @@ ss::future tm_stm::do_update_hosted_transactions( offset, model::timeout_clock::now() + _sync_timeout)) { vlog( _ctx_log.info, - "timeout on waiting until {} is applied on updating hash_ranges", + "timeout on waiting until {} is applied on updating hosted " + "transactions", offset); if (_c->is_leader() && _c->term() == term) { co_await _c->step_down("txn coordinator apply timeout"); @@ -368,8 +379,8 @@ ss::future tm_stm::do_update_hosted_transactions( if (_c->term() != term) { vlog( _ctx_log.info, - "lost leadership while waiting until {} is applied on updating hash " - "ranges", + "lost leadership while waiting until {} is applied on updating " + "hosted transactions", offset); co_return op_status::unknown; } @@ -384,14 +395,19 @@ tm_stm::update_tx(tm_transaction tx, model::term_id term) { ss::future> tm_stm::do_update_tx(tm_transaction tx, model::term_id term) { + vlog( + _ctx_log.trace, + "[tx_id={}] updating transaction: {} in term: {}", + tx.id, + tx, + term); auto batch = serialize_tx(tx); auto r = co_await replicate_quorum_ack(term, std::move(batch)); if (!r) { vlog( _ctx_log.info, - "got error {} on updating tx:{} pid:{} etag:{} tx_seq:{}", - r.error(), + "[tx_id={}] error updating tx: {} - {}", tx.id, tx.pid, tx.etag, @@ -411,9 +427,7 @@ tm_stm::do_update_tx(tm_transaction tx, model::term_id term) { offset, model::timeout_clock::now() + _sync_timeout)) { vlog( _ctx_log.info, - "timeout on waiting until {} is applied on updating tx:{} pid:{} " - "tx_seq:{}", - offset, + "[tx_id={}] timeout waiting for offset {} to be applied tx: {}", tx.id, tx.pid, tx.tx_seq); @@ -425,12 +439,12 @@ tm_stm::do_update_tx(tm_transaction tx, model::term_id term) { if (_c->term() != term) { vlog( _ctx_log.info, - "lost leadership while waiting until {} is applied on updating tx:{} " - "pid:{} tx_seq:{}", - offset, + "[tx_id={}] leadership while waiting until offset {} is applied tx: " + "{}", tx.id, - tx.pid, - tx.tx_seq); + offset, + tx); + co_return tm_stm::op_status::unknown; } @@ -438,10 +452,9 @@ tm_stm::do_update_tx(tm_transaction tx, model::term_id term) { if (!tx_opt) { vlog( _ctx_log.warn, - "can't find an updated tx:{} pid:{} tx_seq:{} in the cache", + "[tx_id={}] can't find an updated tx: {} in the cache", tx.id, - tx.pid, - tx.tx_seq); + tx); // update_tx must return conflict only in this case, see expire_tx co_return tm_stm::op_status::conflict; } @@ -466,6 +479,11 @@ tm_stm::mark_tx_preparing( ss::future> tm_stm::mark_tx_aborting( model::term_id expected_term, kafka::transactional_id tx_id) { + vlog( + _ctx_log.trace, + "[tx_id={}] marking transaction as aborted in term: {}", + tx_id, + expected_term); auto ptx = co_await get_tx(tx_id); if (!ptx.has_value()) { co_return ptx; @@ -481,13 +499,18 @@ ss::future> tm_stm::mark_tx_aborting( ss::future> tm_stm::mark_tx_prepared( model::term_id expected_term, kafka::transactional_id tx_id) { + vlog( + _ctx_log.trace, + "[tx_id={}] marking transaction as prepared in term: {}", + tx_id, + expected_term); auto tx_opt = co_await get_tx(tx_id); if (!tx_opt.has_value()) { vlog( _ctx_log.trace, - "got {} on pulling tx {} to mark it prepared", - tx_opt.error(), - tx_id); + "[tx_id={}] error getting transaction - {}", + tx_id, + tx_opt.error()); co_return tx_opt; } auto tx = tx_opt.value(); @@ -498,10 +521,10 @@ ss::future> tm_stm::mark_tx_prepared( if (tx.status != check_status) { vlog( _ctx_log.warn, - "can't mark tx:{} pid:{} tx_seq:{} prepared wrong status {} != {}", - tx.id, - tx.pid, - tx.tx_seq, + "[tx_id={}] error marking transaction {} as prepared. Incorrect " + "status {} != {}", + tx_id, + tx, tx.status, check_status); co_return tm_stm::op_status::conflict; @@ -513,6 +536,11 @@ ss::future> tm_stm::mark_tx_prepared( ss::future> tm_stm::mark_tx_killed( model::term_id expected_term, kafka::transactional_id tx_id) { + vlog( + _ctx_log.trace, + "[tx_id={}] marking transaction as killed in term: {}", + tx_id, + expected_term); auto tx_opt = co_await get_tx(tx_id); if (!tx_opt.has_value()) { co_return tx_opt; @@ -530,6 +558,11 @@ ss::future> tm_stm::mark_tx_killed( ss::future> tm_stm::reset_transferring(model::term_id term, kafka::transactional_id tx_id) { + vlog( + _ctx_log.trace, + "[tx_id={}] resetting transfer of transaction in term: {}", + tx_id, + term); auto ptx = co_await get_tx(tx_id); if (!ptx.has_value()) { co_return ptx; @@ -541,17 +574,15 @@ tm_stm::reset_transferring(model::term_id term, kafka::transactional_id tx_id) { } vlog( _ctx_log.trace, - "observed a transferring tx:{} pid:{} etag:{} tx_seq:{} in term:{}", + "[tx_id={}] observed a transferring tx: {}, term: {}", tx_id, - tx.pid, - tx.etag, - tx.tx_seq, + tx, term); if (tx.etag == term) { // case 1 - Unlikely, just reset the transferring flag. vlog( _ctx_log.warn, - "tx: {} transferring within same term: {}, resetting.", + "[tx_id={}] transferring within same term: {}, resetting.", tx_id, tx.etag); } @@ -569,6 +600,11 @@ tm_stm::reset_transferring(model::term_id term, kafka::transactional_id tx_id) { ss::future> tm_stm::mark_tx_ongoing( model::term_id expected_term, kafka::transactional_id tx_id) { + vlog( + _ctx_log.trace, + "[tx_id={}] marking transaction as ongoing in term: {}", + tx_id, + expected_term); auto tx_opt = co_await get_tx(tx_id); if (!tx_opt.has_value()) { co_return tx_opt; @@ -577,7 +613,7 @@ ss::future> tm_stm::mark_tx_ongoing( if (tx.etag != expected_term) { vlog( _ctx_log.warn, - "An attempt to update state data tx:{} pid:{} tx_seq:{} etag:{} " + "[tx_id={}] attempt to update state data pid:{} tx_seq:{} etag:{} " "assuming etag is {}", tx.id, tx.pid, @@ -601,7 +637,13 @@ ss::future tm_stm::re_register_producer( std::chrono::milliseconds transaction_timeout_ms, model::producer_identity pid, model::producer_identity last_pid) { - vlog(_ctx_log.trace, "Registering existing tx: id={}, pid={}", tx_id, pid); + vlog( + _ctx_log.trace, + "[tx_id={}] Registering existing transaction with new pid: {}, previous " + "pid: {}", + tx_id, + pid, + last_pid); auto tx_opt = co_await get_tx(tx_id); if (!tx_opt.has_value()) { @@ -645,7 +687,12 @@ ss::future tm_stm::do_register_new_producer( kafka::transactional_id tx_id, std::chrono::milliseconds transaction_timeout_ms, model::producer_identity pid) { - vlog(_ctx_log.trace, "Registering new tx: id={}, pid={}", tx_id, pid); + vlog( + _ctx_log.trace, + "[tx_id={}] Registering new transaction pid: {}, term: {}", + tx_id, + pid, + expected_term); auto tx_opt = co_await get_tx(tx_id); if (tx_opt.has_value()) { @@ -696,32 +743,32 @@ ss::future tm_stm::add_partitions( std::vector partitions) { auto tx_opt = find_tx(tx_id); if (!tx_opt) { - vlog(_ctx_log.warn, "An ongoing transaction tx:{} isn't found", tx_id); + vlog( + _ctx_log.warn, + "[tx_id={}] unable to find ongoing transaction", + tx_id); + co_return tm_stm::op_status::unknown; } auto tx = tx_opt.value(); if (tx.status != tm_transaction::tx_status::ongoing) { vlog( _ctx_log.warn, - "Expected an ongoing txn, found tx:{} pid:{} tx_seq:{} etag:{} " - "status:{}", - tx.id, - tx.pid, - tx.tx_seq, - tx.etag, - tx.status); + "[tx_id={}] expected ongoing transaction, found: {} ", + tx_id, + tx); + co_return tm_stm::op_status::unknown; } if (tx.etag != expected_term) { vlog( _ctx_log.warn, - "An attempt to add partitions to tx:{} pid:{} tx_seq:{} etag:{} " - "assuming etag is {}", - tx.id, - tx.pid, - tx.tx_seq, - tx.etag, + "[tx_id={}] adding partition fenced transaction: {} expected term: " + "{}", + tx_id, + tx, expected_term); + co_return tm_stm::op_status::unknown; } @@ -748,6 +795,12 @@ ss::future tm_stm::add_partitions( } tx.last_update_ts = clock_type::now(); _cache->set_mem(tx.etag, tx_id, tx); + vlog( + _ctx_log.trace, + "[tx_id={}] transaction: {} added with etag: {}", + tx_id, + tx, + expected_term); co_return tm_stm::op_status::success; } @@ -759,31 +812,27 @@ ss::future tm_stm::add_group( model::term_id etag) { auto tx_opt = find_tx(tx_id); if (!tx_opt) { - vlog(_ctx_log.warn, "An ongoing transaction tx:{} isn't found", tx_id); + vlog( + _ctx_log.trace, + "[tx_id={}] unable to find ongoing transaction", + tx_id); co_return tm_stm::op_status::unknown; } auto tx = tx_opt.value(); if (tx.status != tm_transaction::tx_status::ongoing) { vlog( _ctx_log.warn, - "Expected an ongoing txn, found tx:{} pid:{} tx_seq:{} etag:{} " - "status:{}", - tx.id, - tx.pid, - tx.tx_seq, - tx.etag, - tx.status); + "[tx_id={}] expected ongoing transaction, found: {} ", + tx_id, + tx); co_return tm_stm::op_status::unknown; } if (tx.etag != expected_term) { vlog( _ctx_log.warn, - "An attempt to add group to tx:{} pid:{} tx_seq:{} etag:{} assuming " - "etag is {}", - tx.id, - tx.pid, - tx.tx_seq, - tx.etag, + "[tx_id={}] adding group fenced transaction: {} expected term: {}", + tx_id, + tx, expected_term); co_return tm_stm::op_status::unknown; } @@ -970,16 +1019,21 @@ tm_stm::apply_tm_update(model::record_batch_header hdr, model::record_batch b) { "broken model::record_batch_type::tm_update. expected tx.id {} got: {}", tx.id, tx_id); + vlog( + _ctx_log.trace, + "[tx_id={}] applying transaction: {} in term: {}", + tx.id, + tx, + _insync_term); if (tx.status == tm_transaction::tx_status::tombstone) { _cache->erase_log(tx.id); vlog( _ctx_log.trace, - "erasing {} (tombstone) pid:{} tx_seq:{} etag:{} in term:{} from mem", + "[tx_id={}] erasing (tombstone) transaction: {} in term: {} from " + "memory", tx.id, - tx.pid, - tx.tx_seq, - tx.etag, + tx, _insync_term); _cache->erase_mem(tx.id); _pid_tx_id.erase(tx.pid); @@ -995,16 +1049,12 @@ tm_stm::apply_tm_update(model::record_batch_header hdr, model::record_batch b) { _cache->erase_mem(tx.id); vlog( _ctx_log.trace, - "erasing {} (log overwrite) pid:{} tx_seq:{} etag:{} from mem in " - "term:{} by pid:{} etag:{} tx_seq:{}", - old_tx.id, - old_tx.pid, - old_tx.tx_seq, - old_tx.etag, + "[tx_id={}] erasing (log overwrite) transaction: {} in term: {} " + "from memory by new transaction: {}", + tx.id, + old_tx, _insync_term, - tx.pid, - tx.etag, - tx.tx_seq); + tx); } } @@ -1162,7 +1212,8 @@ tm_stm::expire_tx(model::term_id term, kafka::transactional_id tx_id) { if (r0.has_value()) { vlog( _ctx_log.error, - "written tombstone should evict tx:{} from the cache", + "[tx_id={}] written tombstone should evict transaction from the " + "cache", tx_id); co_return tm_stm::op_status::unknown; } diff --git a/src/v/cluster/tm_stm_cache.cc b/src/v/cluster/tm_stm_cache.cc index 3bf19c0c020a6..c3afaccc261df 100644 --- a/src/v/cluster/tm_stm_cache.cc +++ b/src/v/cluster/tm_stm_cache.cc @@ -21,6 +21,8 @@ #include #include +#include + #include #include @@ -36,7 +38,7 @@ std::ostream& operator<<(std::ostream& o, const tm_transaction& tx) { std::optional tm_stm_cache::find(model::term_id term, kafka::transactional_id tx_id) { - vlog(txlog.trace, "looking for tx:{} etag:{}", tx_id, term); + vlog(txlog.trace, "[tx_id={}] looking for tx with term: {}", tx_id, term); if (_mem_term && _mem_term.value() == term) { // when a node fetches a tx withing a term it means that it was // elected as a leader with a higher term and the request should @@ -48,9 +50,8 @@ tm_stm_cache::find(model::term_id term, kafka::transactional_id tx_id) { if (entry_it == _state.end()) { vlog( txlog.trace, - "looking for tx:{} etag:{}: can't find term:{} in _state", + "[tx_id={}] looking for tx with etag: {}, term not found", tx_id, - term, term); // tm_stm_cache_entry isn't found it means a node memory was wiped // and we can't guess it's last state @@ -72,18 +73,17 @@ tm_stm_cache::find(model::term_id term, kafka::transactional_id tx_id) { if (log_it == _log_txes.end()) { vlog( txlog.trace, - "looking for tx:{} etag:{}: can't find tx:{} in log", + "[tx_id={}] looking for tx with etag: {}, can't find tx_id in log", tx_id, - term, - tx_id); + term); return std::nullopt; } if (log_it->second.tx.etag != term) { vlog( txlog.trace, - "looking for tx:{} etag:{}: found a tx with etag:{} pid:{} tx_seq:{} " - "(wrong etag)", + "[tx_id={}] looking for tx with etag: {}: found a tx with etag: {} " + "pid: {} tx_seq: {} (wrong etag)", tx_id, term, log_it->second.tx.etag, @@ -91,7 +91,12 @@ tm_stm_cache::find(model::term_id term, kafka::transactional_id tx_id) { log_it->second.tx.tx_seq); return std::nullopt; } - + vlog( + txlog.trace, + "[tx_id={}] found tx with etag: {} - {}", + tx_id, + term, + log_it->second.tx); return log_it->second.tx; } @@ -125,7 +130,7 @@ tm_stm_cache::find_log(kafka::transactional_id tx_id) { void tm_stm_cache::set_log(tm_transaction tx) { vlog( txlog.trace, - "saving tx:{} etag:{} pid:{} tx_seq:{} to log", + "[tx_id={}] saving tx with etag: {} pid: {} tx_seq: {} to log", tx.id, tx.etag, tx.pid, @@ -156,7 +161,7 @@ void tm_stm_cache::erase_log(kafka::transactional_id tx_id) { auto& tx = tx_it->second.tx; vlog( txlog.trace, - "erasing tx:{} etag:{} pid:{} tx_seq:{} from log", + "[tx_id= {}] erasing tx with etag: {} pid: {} tx_seq: {} from log", tx.id, tx.etag, tx.pid, @@ -174,11 +179,15 @@ fragmented_vector tm_stm_cache::get_log_transactions() { void tm_stm_cache::set_mem( model::term_id term, kafka::transactional_id tx_id, tm_transaction tx) { - auto entry_it = _state.find(term); - if (entry_it == _state.end()) { - _state[term] = tm_stm_cache_entry{.term = term}; - entry_it = _state.find(term); - } + vlog( + txlog.trace, + "[tx_id={}] setting tx with etag: {} to {}", + tx_id, + term, + tx); + auto [entry_it, _] = _state.try_emplace( + term, tm_stm_cache_entry{.term = term}); + entry_it->second.txes[tx_id] = tx; if (!_mem_term) { @@ -217,7 +226,7 @@ void tm_stm_cache::erase_mem(kafka::transactional_id tx_id) { auto tx = tx_it->second; vlog( txlog.trace, - "erasing tx:{} etag:{} pid:{} tx_seq:{} from mem", + "[tx_id={}] erasing tx with etag: {} pid: {} tx_seq: {} from mem", tx.id, tx.etag, tx.pid, diff --git a/src/v/cluster/tx_gateway_frontend.cc b/src/v/cluster/tx_gateway_frontend.cc index 3597b9f0a7e09..de36c3de517e6 100644 --- a/src/v/cluster/tx_gateway_frontend.cc +++ b/src/v/cluster/tx_gateway_frontend.cc @@ -37,6 +37,7 @@ #include #include #include +#include #include #include @@ -81,7 +82,7 @@ static tm_transaction as_tx( kafka::transactional_id tx_id, model::term_id term, fetch_tx_reply reply) { vassert( reply.ec == tx_errc::none, - "can't extract a tx from a failed (ec:{}) reply", + "can't extract a tx from a failed (ec: {}) reply", reply.ec); tm_transaction tx; tx.id = tx_id; @@ -386,8 +387,7 @@ ss::future tx_gateway_frontend::do_hosts( vlog( txlog.trace, "[tx_id={}] stm hosts request, partition: " - "{}, " - "result: {}", + "{}, result: {}", tx_id, partition, result); @@ -430,6 +430,12 @@ ss::future tx_gateway_frontend::do_init_hosted_transactions( ss::future tx_gateway_frontend::fetch_tx_locally( kafka::transactional_id tx_id, model::term_id term, model::partition_id tm) { + vlog( + txlog.trace, + "[tx_id={}] fetching transaction locally term: {}, partition: {}", + tx_id, + term, + tm); auto map = [tx_id, term, tm]( tm_stm_cache_manager& cache_manager) -> std::optional { @@ -442,6 +448,9 @@ ss::future tx_gateway_frontend::fetch_tx_locally( auto tx_opt = co_await _tm_stm_cache_manager.map_reduce0( map, std::optional{}, reduce); + vlog( + txlog.trace, "[tx_id={}] fetching transaction result: {}", tx_id, tx_opt); + if (!tx_opt) { co_return fetch_tx_reply(tx_errc::tx_not_found); } @@ -480,18 +489,24 @@ ss::future tx_gateway_frontend::fetch_tx_locally( reply.partitions.reserve(tx.partitions.size()); for (auto& p : tx.partitions) { - reply.partitions.push_back( - fetch_tx_reply::tx_partition(p.ntp, p.etag, p.topic_revision)); + reply.partitions.emplace_back(p.ntp, p.etag, p.topic_revision); } reply.groups.reserve(tx.groups.size()); for (auto& g : tx.groups) { - reply.groups.push_back(fetch_tx_reply::tx_group(g.group_id, g.etag)); + reply.groups.emplace_back(g.group_id, g.etag); } co_return reply; } ss::future> tx_gateway_frontend::fetch_tx( kafka::transactional_id tx_id, model::term_id term, model::partition_id tm) { + vlog( + txlog.trace, + "[tx_id={}] fetching transactions from partition: {} in term: {}", + tx_id, + term, + tm); + auto outcome = ss::make_lw_shared< available_promise>>(); ssx::spawn_with_gate(_gate, [this, tx_id, term, outcome, tm] { @@ -527,7 +542,7 @@ ss::future<> tx_gateway_frontend::dispatch_fetch_tx( if (node_id == self) { vlog( txlog.trace, - "fetching tx:{} in term:{} tm:{} from {}", + "[tx_id={}] fetching tx in term: {} partition_id: {} from: {}", tx_id, term, tm, @@ -538,12 +553,13 @@ ss::future<> tx_gateway_frontend::dispatch_fetch_tx( [tx_id, term, outcome, node_id, tm](fetch_tx_reply reply) { vlog( txlog.trace, - "got {} on fetching tx:{} in term:{} tm:{} from {}", - reply.ec, + "[tx_id={}] error fetching tx in term: {} on " + "partition_id {} from {} - {}", tx_id, term, tm, - node_id); + node_id, + reply.ec); if (reply.ec == tx_errc::none) { if (!outcome->available()) { auto tx = as_tx(tx_id, term, reply); @@ -570,7 +586,8 @@ ss::future tx_gateway_frontend::dispatch_fetch_tx( outcome) { vlog( txlog.trace, - "fetching tx:{} in term:{} tm: {} from {}", + "[tx_id={}] dispatching fetch_tx request in term: {} for partition: {} " + "to {}", tx_id, term, tm, @@ -591,21 +608,21 @@ ss::future tx_gateway_frontend::dispatch_fetch_tx( if (r.has_error()) { vlog( txlog.warn, - "got error {} on fetching tx:{} in term:{} from {}", - r.error(), + "[tx_id={}] error fetching tx in term {} from {} - {}", tx_id, term, - target); + target, + r.error()); return fetch_tx_reply(tx_errc::unknown_server_error); } auto reply = r.value(); vlog( txlog.trace, - "got {} on fetching tx:{} in term:{} from {}", - reply.ec, + "[tx_id={}] fetching tx in term {} from {} result: {}", tx_id, term, - target); + target, + r.value()); if (reply.ec == tx_errc::none) { if (!outcome->available()) { auto tx = as_tx(tx_id, term, reply); @@ -767,6 +784,7 @@ ss::future tx_gateway_frontend::do_try_abort( .then([this, stm, pid, tx_seq, timeout]( checked term_opt) { if (!term_opt.has_value()) { + vlog(txlog.debug, "stm barrier error: {}", term_opt.error()); if (term_opt.error() == tm_stm::op_status::not_leader) { return ss::make_ready_future( try_abort_reply{tx_errc::not_coordinator}); @@ -779,7 +797,7 @@ ss::future tx_gateway_frontend::do_try_abort( if (!tx_id_opt) { vlog( txlog.trace, - "can't find tx by pid:{} considering it aborted", + "can't find tx by pid: {} considering it aborted", pid); return ss::make_ready_future( try_abort_reply::make_aborted()); @@ -841,20 +859,22 @@ ss::future tx_gateway_frontend::do_try_abort( if (!term_opt.has_value()) { if (term_opt.error() == tm_stm::op_status::not_leader) { vlog( - txlog.trace, - "this node isn't a leader for tx:{} coordinator", - tx_id); + txlog.trace, "[tx_id={}] this node isn't a coordinator", tx_id); co_return try_abort_reply{tx_errc::not_coordinator}; } vlog( txlog.warn, - "got error {} on sync'ing in-memory state", - term_opt.error(), - tx_id); + "[tx_id={}] error syncing in memory state - {}", + tx_id, + term_opt.error()); co_return try_abort_reply{tx_errc::timeout}; } if (term_opt.value() != term) { + vlog( + txlog.info, + "[tx_id={}] error syncing in memory state, empty term optional", + tx_id); co_return try_abort_reply{tx_errc::not_coordinator}; } @@ -863,7 +883,7 @@ ss::future tx_gateway_frontend::do_try_abort( if (tx_opt.error() == tm_stm::op_status::not_found) { vlog( txlog.trace, - "can't find a tx by id:{} (pid:{} tx_seq:{}) considering it " + "[tx_id={}] can't find a tx (pid: {} tx_seq: {}) considering it " "aborted", tx_id, pid, @@ -872,9 +892,9 @@ ss::future tx_gateway_frontend::do_try_abort( } else { vlog( txlog.warn, - "got error {} on lookin up tx:{}", - tx_opt.error(), - tx_id); + "[tx_id={}] error looking up the transaction - {}", + tx_id, + tx_opt.error()); co_return try_abort_reply{tx_errc::unknown_server_error}; } } @@ -886,9 +906,9 @@ ss::future tx_gateway_frontend::do_try_abort( if (!tx_opt.has_value()) { vlog( txlog.warn, - "got error {} on rehydrating tx:{}", - tx_opt.error(), - tx_id); + "[tx_id={}] error rehydrating transaction - {}", + tx_id, + tx_opt.error()); // any error on lookin up a tx is a retriable error co_return try_abort_reply{tx_errc::not_coordinator}; } @@ -896,6 +916,13 @@ ss::future tx_gateway_frontend::do_try_abort( } if (tx.etag > term) { + vlog( + txlog.trace, + "[tx_id={}] fenced aborting transaction, etag: {} is greater than " + "current term: {}", + tx_id, + tx.etag, + term); // tx was written by a future leader meaning current // node can't be a leader co_return try_abort_reply{tx_errc::not_coordinator}; @@ -925,8 +952,8 @@ ss::future tx_gateway_frontend::do_try_abort( if (tx.pid != pid || tx.tx_seq != tx_seq) { vlog( txlog.trace, - "found tx:{} has pid:{} tx_seq:{} (expecting pid:{} tx_seq:{}) " - "considering it aborted", + "[tx_id={}] found tx has pid: {} tx_seq: {} (expecting pid: {} " + "tx_seq: {}) considering it aborted", tx_id, tx.pid, tx.tx_seq, @@ -938,7 +965,8 @@ ss::future tx_gateway_frontend::do_try_abort( if (tx.status == tm_transaction::tx_status::prepared) { vlog( txlog.trace, - "tx id:{} pid:{} tx_seq:{} is prepared => considering it committed", + "[tx_id={}] pid: {} tx_seq: {} is prepared => considering it " + "committed", tx_id, tx.pid, tx.tx_seq); @@ -949,7 +977,8 @@ ss::future tx_gateway_frontend::do_try_abort( || tx.status == tm_transaction::tx_status::ready) { vlog( txlog.trace, - "tx id:{} pid:{} tx_seq:{} has status:{} => considering it aborted", + "[tx_id={}] pid: {} tx_seq: {} has status: {} => considering it " + "aborted", tx_id, tx.pid, tx.tx_seq, @@ -962,7 +991,8 @@ ss::future tx_gateway_frontend::do_try_abort( || tx.status == tm_transaction::tx_status::ongoing) { vlog( txlog.trace, - "tx id:{} pid:{} tx_seq:{} is ongoing => forcing it to be aborted", + "[tx_id={}] pid: {} tx_seq: {} is ongoing => forcing it to be " + "aborted", tx_id, tx.pid, tx.tx_seq); @@ -975,7 +1005,7 @@ ss::future tx_gateway_frontend::do_try_abort( } co_return try_abort_reply::make_aborted(); } else { - vlog(txlog.error, "unknown tx status: {}", tx.status); + vlog(txlog.error, "[tx_id={}] unknown status: {}", tx_id, tx.status); co_return try_abort_reply{tx_errc::unknown_server_error}; } } @@ -1102,7 +1132,7 @@ ss::future tx_gateway_frontend::init_tm_tx_locally( model::partition_id tm) { vlog( txlog.trace, - "[tx_id={}] processing name:init_tm_tx, timeout:{}", + "[tx_id={}] processing name:init_tm_tx, timeout: {}", tx_id, transaction_timeout_ms); @@ -1168,7 +1198,7 @@ ss::future tx_gateway_frontend::init_tm_tx_locally( vlog( txlog.trace, - "[tx_id={}] sending name:init_tm_tx, pid:{}, ec: {}", + "[tx_id={}] sending name:init_tm_tx, pid: {}, ec: {}", tx_id, reply.pid, reply.ec); @@ -1272,17 +1302,14 @@ ss::future tx_gateway_frontend::limit_init_tm_tx( auto term_opt = co_await stm->sync(); if (!term_opt.has_value()) { if (term_opt.error() == tm_stm::op_status::not_leader) { - vlog( - txlog.trace, - "this node isn't a leader for tx.id={} coordinator", - tx_id); + vlog(txlog.trace, "[tx_id={}] node is not a leader", tx_id); co_return init_tm_tx_reply{tx_errc::not_coordinator}; } vlog( txlog.warn, - "got error {} on syncing (initializing tx.id={})", - term_opt.error(), - tx_id); + "[tx_id={}] error syncing stm - {}", + tx_id, + term_opt.error()); co_return init_tm_tx_reply{tx_errc::not_coordinator}; } auto term = term_opt.value(); @@ -1370,17 +1397,23 @@ ss::future tx_gateway_frontend::do_init_tm_tx( model::timeout_clock::duration timeout, model::producer_identity expected_pid) { if (!stm->hosts(tx_id)) { + vlog( + txlog.debug, + "[tx_id={}] stm on partition {} doesn't host transaction_id", + tx_id, + stm->get_partition()); co_return tx_errc::not_coordinator; } auto r0 = co_await get_tx(term, stm, tx_id, timeout); + if (!r0.has_value()) { if (r0.error() != tx_errc::tx_not_found) { vlog( txlog.warn, - "got error {} on loading tx.id={}", - r0.error(), - tx_id); + "[tx_id={}] error getting transaction: {}", + tx_id, + r0.error()); co_return init_tm_tx_reply{tx_errc::not_coordinator}; } @@ -1388,7 +1421,11 @@ ss::future tx_gateway_frontend::do_init_tm_tx( allocate_id_reply pid_reply = co_await _id_allocator_frontend.local().allocate_id(timeout); if (pid_reply.ec != errc::success) { - vlog(txlog.warn, "allocate_id failed with {}", pid_reply.ec); + vlog( + txlog.warn, + "[tx_id={}] allocate_id failed - {}", + tx_id, + pid_reply.ec); co_return init_tm_tx_reply{tx_errc::not_coordinator}; } @@ -1402,23 +1439,20 @@ ss::future tx_gateway_frontend::do_init_tm_tx( } else if (op_status == tm_stm::op_status::conflict) { vlog( txlog.warn, - "got conflict on registering new producer {} for tx.id={}", - pid, - tx_id); + "[tx_id={}] got conflict on registering new producer {}", + tx_id, + pid); reply.ec = tx_errc::conflict; } else if (op_status == tm_stm::op_status::not_leader) { - vlog( - txlog.warn, - "this node isn't a leader for tx.id={} coordinator", - tx_id); + vlog(txlog.warn, "[tx_id={}] this node is not coordinator", tx_id); reply.ec = tx_errc::not_coordinator; } else { vlog( txlog.warn, - "got {} on registering new producer {} for tx.id={}", - op_status, + "[tx_id={}] error registering producer with pid: {} - {}", + tx_id, pid, - tx_id); + op_status); reply.ec = tx_errc::invalid_txn_state; } co_return reply; @@ -1426,14 +1460,22 @@ ss::future tx_gateway_frontend::do_init_tm_tx( auto tx = r0.value(); if (!is_valid_producer(tx, expected_pid)) { + vlog( + txlog.info, + "[tx_id={}] producer with pid: {} for {} is invalid", + tx_id, + expected_pid, + tx); co_return init_tm_tx_reply{tx_errc::invalid_producer_epoch}; } checked r(tx); if (tx.status == tm_transaction::tx_status::ongoing) { + vlog(txlog.info, "[tx_id={}] tx is ongoing, aborting", tx_id); r = co_await do_abort_tm_tx(term, stm, tx, timeout); } else if (tx.status == tm_transaction::tx_status::preparing) { + vlog(txlog.info, "[tx_id={}] tx is preparing, aborting", tx_id); // preparing is obsolete, also it isn't acked until // it's prepared si it's safe to abort it r = co_await do_abort_tm_tx(term, stm, tx, timeout); @@ -1442,10 +1484,10 @@ ss::future tx_gateway_frontend::do_init_tm_tx( if (!r.has_value()) { vlog( txlog.warn, - "got error {} on rolling previous tx.id={} with status={}", - r.error(), + "[tx_id={}] error rolling previous with status: {} - {}", tx_id, - tx.status); + tx.status, + r.error()); co_return init_tm_tx_reply{r.error()}; } @@ -1455,6 +1497,7 @@ ss::future tx_gateway_frontend::do_init_tm_tx( if (expected_pid == model::unknown_pid) { if (is_max_epoch(tx.pid.epoch)) { + vlog(txlog.trace, "[tx_id={}] allocating new producer id", tx_id); allocate_id_reply pid_reply = co_await _id_allocator_frontend.local().allocate_id(timeout); reply.pid = model::producer_identity{pid_reply.id, 0}; @@ -1480,7 +1523,12 @@ ss::future tx_gateway_frontend::do_init_tm_tx( co_return init_tm_tx_reply{tx_errc::invalid_producer_epoch}; } } - + vlog( + txlog.trace, + "[tx_id={}] re-registering producer with new pid: {}, last_pid: {}", + tx.id, + reply.pid, + last_pid); auto op_status = co_await stm->re_register_producer( term, tx.id, transaction_timeout_ms, reply.pid, last_pid); if (op_status == tm_stm::op_status::success) { @@ -1492,10 +1540,10 @@ ss::future tx_gateway_frontend::do_init_tm_tx( } else { vlog( txlog.warn, - "got error {} on re-registering a producer {} for tx.id={}", - op_status, + "[tx_id={}] error re-registering a producer {} - {}", + tx.id, reply.pid, - tx.id); + op_status); reply.ec = tx_errc::invalid_txn_state; } co_return reply; @@ -1505,6 +1553,12 @@ ss::future tx_gateway_frontend::add_partition_to_tx( add_paritions_tx_request request, model::timeout_clock::duration timeout) { auto tx_ntp_opt = co_await ntp_for_tx_id(request.transactional_id); if (!tx_ntp_opt) { + vlog( + txlog.trace, + "[tx_id={}] unable to find ntp, producer_id: {}, epoch: {}", + request.transactional_id, + request.producer_id, + request.producer_epoch); co_return make_add_partitions_error_response( request, tx_errc::coordinator_not_available); } @@ -1512,11 +1566,23 @@ ss::future tx_gateway_frontend::add_partition_to_tx( auto shard = _shard_table.local().shard_for(tx_ntp); if (shard == std::nullopt) { - vlog(txlog.trace, "can't find a shard for {}", tx_ntp); + vlog( + txlog.trace, + "[tx_id={}] can't find a shard for {}, producer_id: {}, epoch: {}", + request.transactional_id, + tx_ntp, + request.producer_id, + request.producer_epoch); co_return make_add_partitions_error_response( request, tx_errc::coordinator_not_available); } - + vlog( + txlog.trace, + "[tx_id={}] adding partition to tx. pid: {}, epoch: {}, topics: {}", + request.transactional_id, + request.producer_id, + request.producer_epoch, + request.topics); co_return co_await container().invoke_on( *shard, _ssg, @@ -1587,10 +1653,10 @@ ss::future tx_gateway_frontend::do_add_partition_to_tx( if (!r.has_value()) { vlog( txlog.trace, - "got {} on pulling ongoing tx:{} pid:{}", - r.error(), + "[tx_id={}] error getting ongoing transaction for pid: {} - {}", request.transactional_id, - pid); + pid, + r.error()); co_return make_add_partitions_error_response(request, r.error()); } auto tx = r.value(); @@ -1696,7 +1762,12 @@ ss::future tx_gateway_frontend::do_add_partition_to_tx( auto status = co_await stm->add_partitions(term, tx.id, partitions); auto has_added = status == tm_stm::op_status::success; if (!has_added) { - vlog(txlog.warn, "adding partitions failed with {}", status); + vlog( + txlog.warn, + "[tx_id={}] adding partitions failed pid: {} - {}", + request.transactional_id, + pid, + status); } for (auto& br : brs) { auto topic_it = std::find_if( @@ -1712,7 +1783,10 @@ ss::future tx_gateway_frontend::do_add_partition_to_tx( if (br.ec != tx_errc::none) { vlog( txlog.warn, - "begin_tx request to {} failed with {}", + "[tx_id={}] begin_tx request for pid: {} at ntp: {} failed - " + "{}", + request.transactional_id, + pid, br.ntp, br.ec); } @@ -1725,8 +1799,24 @@ ss::future tx_gateway_frontend::do_add_partition_to_tx( ss::future tx_gateway_frontend::add_offsets_to_tx( add_offsets_tx_request request, model::timeout_clock::duration timeout) { + vlog( + txlog.trace, + "[tx_id={}] adding offsets to tx, group_id: {}, producer id: " + "{}, producer epoch: {}", + request.transactional_id, + request.group_id, + request.producer_id, + request.producer_epoch); + auto tx_ntp_opt = co_await ntp_for_tx_id(request.transactional_id); if (!tx_ntp_opt) { + vlog( + txlog.warn, + "[tx_id={}] unable to find coordinator ntp, producer id: {}, " + "producer epoch: {}", + request.transactional_id, + request.producer_id, + request.producer_epoch); co_return add_offsets_tx_reply{ .error_code = tx_errc::coordinator_not_available}; } @@ -1734,7 +1824,14 @@ ss::future tx_gateway_frontend::add_offsets_to_tx( auto shard = _shard_table.local().shard_for(tx_ntp); if (shard == std::nullopt) { - vlog(txlog.warn, "can't find a shard for {}", tx_ntp); + vlog( + txlog.warn, + "[tx_id={}] can't find a shard for {} producer id: {}, " + "producer epoch: {}", + request.transactional_id, + tx_ntp, + request.producer_id, + request.producer_epoch); co_return add_offsets_tx_reply{ .error_code = tx_errc::coordinator_not_available}; } @@ -1804,11 +1901,11 @@ ss::future tx_gateway_frontend::do_add_offsets_to_tx( term, stm, pid, request.transactional_id, timeout); if (!r.has_value()) { vlog( - txlog.trace, - "got {} on pulling ongoing tx for pid:{} {}", - r.error(), + txlog.warn, + "[tx_id={}] error getting ongoing transaction for pid: {} - {}", + request.transactional_id, pid, - request.transactional_id); + r.error()); co_return add_offsets_tx_reply{.error_code = r.error()}; } auto tx = r.value(); @@ -1816,7 +1913,15 @@ ss::future tx_gateway_frontend::do_add_offsets_to_tx( auto group_info = co_await _rm_group_proxy->begin_group_tx( request.group_id, pid, tx.tx_seq, tx.timeout_ms, stm->get_partition()); if (group_info.ec != tx_errc::none) { - vlog(txlog.warn, "error on begining group tx: {}", group_info.ec); + vlog( + txlog.warn, + "[tx_id={}] error starting group transaction for pid: {}, group: {} " + "- {}", + request.transactional_id, + pid, + request.group_id, + group_info.ec); + co_return add_offsets_tx_reply{.error_code = group_info.ec}; } @@ -1824,7 +1929,12 @@ ss::future tx_gateway_frontend::do_add_offsets_to_tx( term, tx.id, request.group_id, group_info.etag); auto has_added = status == tm_stm::op_status::success; if (!has_added) { - vlog(txlog.warn, "can't add group to tm_stm: {}", status); + vlog( + txlog.warn, + "[tx_id={}] error adding group to tm_stm for pid: {} group: {}", + request.transactional_id, + pid, + request.group_id); co_return add_offsets_tx_reply{ .error_code = tx_errc::invalid_txn_state}; } @@ -1833,8 +1943,24 @@ ss::future tx_gateway_frontend::do_add_offsets_to_tx( ss::future tx_gateway_frontend::end_txn( end_tx_request request, model::timeout_clock::duration timeout) { + vlog( + txlog.trace, + "[tx_id={}] end transaction. producer id: {}, producer epoch: {}, " + "committed: {}", + request.transactional_id, + request.producer_id, + request.producer_epoch, + request.committed); + auto tx_ntp_opt = co_await ntp_for_tx_id(request.transactional_id); if (!tx_ntp_opt) { + vlog( + txlog.trace, + "[tx_id={}] can not find coordinator ntp, producer id: {}, producer " + "epoch: {}", + request.transactional_id, + request.producer_id, + request.producer_epoch); co_return end_tx_reply{ .error_code = tx_errc::coordinator_not_available}; } @@ -1842,7 +1968,14 @@ ss::future tx_gateway_frontend::end_txn( auto shard = _shard_table.local().shard_for(tx_ntp); if (shard == std::nullopt) { - vlog(txlog.warn, "can't find a shard for {}", tx_ntp); + vlog( + txlog.warn, + "[tx_id={}] can't find a shard for {}, producer id: {}, producer " + "epoch: {}", + request.transactional_id, + tx_ntp, + request.producer_id, + request.producer_epoch); co_return end_tx_reply{ .error_code = tx_errc::coordinator_not_available}; } @@ -1874,11 +2007,11 @@ ss::future tx_gateway_frontend::do_end_txn( model::producer_identity pid{ request.producer_id, request.producer_epoch}; vlog( - txlog.trace, - "got {} on pulling a stm for tx:{} pid:{}", - r.error(), + txlog.warn, + "[tx_id={}] error getting transaction from coordinator pid: {} - {}", request.transactional_id, - pid); + pid, + r.error()); return ss::make_ready_future( end_tx_reply{.error_code = r.error()}); } @@ -1897,7 +2030,7 @@ ss::future tx_gateway_frontend::do_end_txn( return ss::make_ready_future( end_tx_reply{.error_code = tx_errc::coordinator_not_available}); } - vlog(txlog.trace, "re-entered tm_stm's gate"); + auto h = stm->gate().hold(); ssx::spawn_with_gate( @@ -1939,13 +2072,13 @@ ss::future tx_gateway_frontend::do_end_txn( if (!outcome->available()) { vlog( txlog.warn, - "outcome for tx:{} pid:{} isn't set", + "[tx_id={}] outcome for transaction " + "is missing, pid: {}", tx_id, pid); outcome->set_value( tx_errc::unknown_server_error); } - vlog(txlog.trace, "left tm_stm's gate"); }); }) .finally([u = std::move(unit)] {}); @@ -1971,7 +2104,7 @@ tx_gateway_frontend::do_end_txn( } catch (...) { vlog( txlog.warn, - "sync on end_txn for tx:{} pid:{} failed with {}", + "[tx_id={}] sync on end_txn failed pid: {} - {}", request.transactional_id, pid, std::current_exception()); @@ -1985,10 +2118,10 @@ tx_gateway_frontend::do_end_txn( } vlog( txlog.warn, - "sync on end_txn for tx:{} pid:{} failed with {}", + "[tx_id={}] sync on end_txn failed pid: {} - {}", request.transactional_id, pid, - term_opt.error()); + std::current_exception()); outcome->set_value(tx_errc::invalid_txn_state); co_return tx_errc::invalid_txn_state; } @@ -2005,7 +2138,9 @@ tx_gateway_frontend::do_end_txn( if (err == tx_errc::tx_not_found) { vlog( txlog.warn, - "can't find an ongoing tx:{} pid:{} to commit / abort", + "[tx_id={}] can't find an ongoing transaction pid: {} to commit " + "/ " + "abort", request.transactional_id, pid); err = tx_errc::unknown_server_error; @@ -2020,8 +2155,8 @@ tx_gateway_frontend::do_end_txn( if (tx.status == tm_transaction::tx_status::killed) { vlog( txlog.warn, - "can't commit an expired tx:{} pid:{} etag:{} tx_seq:{} in " - "term:{}", + "[tx_id={}] can't commit an expired transaction pid: {} etag: {} " + "tx_seq: {} in term: {}", tx.id, tx.pid, tx.etag, @@ -2038,10 +2173,11 @@ tx_gateway_frontend::do_end_txn( } catch (...) { vlog( txlog.error, - "committing a tx:{} etag:{} pid:{} tx_seq:{} failed with {}", + "[tx_id={}] error committing transaction pid: {} etag: {} " + "tx_seq: {} - {}", tx.id, - tx.etag, tx.pid, + tx.etag, tx.tx_seq, std::current_exception()); if (!outcome->available()) { @@ -2052,8 +2188,8 @@ tx_gateway_frontend::do_end_txn( } else { vlog( txlog.warn, - "an ongoing tx:{} pid:{} etag:{} tx_seq:{} in term:{} has " - "unexpected status: {}", + "[tx_id={}] an ongoing transaction pid: {} etag: {} tx_seq: {} " + "in term: {} has an unexpected status: {}", tx.id, tx.pid, tx.etag, @@ -2067,8 +2203,8 @@ tx_gateway_frontend::do_end_txn( if (tx.status == tm_transaction::tx_status::killed) { vlog( txlog.warn, - "can't abort an expired tx:{} pid:{} etag:{} tx_seq:{} in " - "term:{}", + "[tx_id={}] can't abort an expired transaction pid: {} etag: {} " + "tx_seq: {} in term: {}", tx.id, tx.pid, tx.etag, @@ -2082,10 +2218,11 @@ tx_gateway_frontend::do_end_txn( } catch (...) { vlog( txlog.error, - "aborting a tx:{} etag:{} pid:{} tx_seq:{} failed with {}", + "[tx_id={}] error aborting transaction pid: {} etag: {} tx_seq: " + "{} - {}", tx.id, - tx.etag, tx.pid, + tx.etag, tx.tx_seq, std::current_exception()); outcome->set_value(tx_errc::unknown_server_error); @@ -2096,11 +2233,12 @@ tx_gateway_frontend::do_end_txn( } else { auto ec = r.error(); vlog( - txlog.info, - "aborting a tx:{} etag:{} pid:{} tx_seq:{} failed with {}", + txlog.error, + "[tx_id={}] error aborting transaction pid: {} etag: {} tx_seq: " + "{} - {}", tx.id, - tx.etag, tx.pid, + tx.etag, tx.tx_seq, ec); outcome->set_value(ec); @@ -2117,7 +2255,7 @@ tx_gateway_frontend::do_end_txn( if (!ongoing_tx.has_value()) { vlog( txlog.warn, - "can't mark {} as ongoing error:{}", + "[tx_id={}] can't mark transaction as ongoing - {}", tx.id, ongoing_tx.error()); co_return tx_errc::unknown_server_error; @@ -2146,17 +2284,17 @@ tx_gateway_frontend::remove_deleted_partitions_from_tx( if (result) { vlog( txlog.info, - "Deleted non existent partition {} from tx {}", - part.ntp, - tx); + "[tx_id={}] Deleted non existent partition {} from transaction", + tx.id, + part.ntp); tx = result.value(); } else { vlog( txlog.debug, - "Error {} deleting partition {} from tx {}", - result.error(), + "[tx_id={}] Error deleting partition {} from transaction - {}", + tx.id, part.ntp, - tx); + result.error()); break; } } @@ -2172,12 +2310,12 @@ tx_gateway_frontend::do_abort_tm_tx( if (!stm->is_actual_term(expected_term)) { vlog( txlog.trace, - "txn coordinator isn't synced with term:{} tx:{} etag:{} pid:{} " - "tx_seq:{}", - expected_term, + "[tx_id={}] txn coordinator isn't synced with term: {} pid: {} etag: " + "{} tx_seq: {}", tx.id, - tx.etag, + expected_term, tx.pid, + tx.etag, tx.tx_seq); co_return tx_errc::not_coordinator; } @@ -2192,12 +2330,12 @@ tx_gateway_frontend::do_abort_tm_tx( && tx.status != tm_transaction::tx_status::preparing) { vlog( txlog.warn, - "abort encontered a tx with unexpected status:{} (tx:{} etag:{} " - "pid:{} tx_seq:{}) in term:{}", - tx.status, + "[tx_id={}] abort encountered a transaction with unexpected status: " + "{}, pid: {}, etag: {} tx_seq: {} in term: {}", tx.id, - tx.etag, + tx.status, tx.pid, + tx.etag, tx.tx_seq, expected_term); co_return tx_errc::invalid_txn_state; @@ -2208,15 +2346,15 @@ tx_gateway_frontend::do_abort_tm_tx( if (is_fetch_tx_supported()) { vlog( txlog.trace, - "aborting tx:{} etag:{} pid:{} tx_seq:{} status:{} failed with " - "{} in term:{}", + "[tx_id={}] aborting transaction pid: {} etag: {} tx_seq: {} " + "status: {} in term: {} failed - {}", tx.id, - tx.etag, tx.pid, + tx.etag, tx.tx_seq, tx.status, - changed_tx.error(), - expected_term); + expected_term, + changed_tx.error()); co_return tx_errc::coordinator_not_available; } else { if (changed_tx.error() == tm_stm::op_status::not_leader) { @@ -2333,7 +2471,8 @@ tx_gateway_frontend::do_commit_tm_tx( vlog( txlog.trace, - "marking tx_id:{} pid:{} tx_seq:{} etag:{} as prepared in term:{}", + "[tx_id={}] marking transaction pid: {} tx_seq: {} etag: {} as prepared " + "in term: {}", tx.id, tx.pid, tx.tx_seq, @@ -2343,11 +2482,13 @@ tx_gateway_frontend::do_commit_tm_tx( if (!changed_tx.has_value()) { vlog( txlog.trace, - "got {} on committing {} pid:{} tx_seq:{}", - changed_tx.error(), + "[tx_id={}] error committing transaction pid: {} etag: {} tx_seq: {} " + "- {}", tx.id, tx.pid, - tx.tx_seq); + tx.etag, + tx.tx_seq, + changed_tx.error()); if (!is_fetch_tx_supported()) { if (changed_tx.error() == tm_stm::op_status::not_leader) { @@ -2412,8 +2553,9 @@ tx_gateway_frontend::recommit_tm_tx( rejected = true; vlog( txlog.warn, - "commit_tx on consumer groups tx:{} etag:{} pid:{} tx_seq:{} " - "status:{} in term:{} was rejected", + "[tx_id={}] commit_tx on consumer groups etag: {} pid: {} " + "tx_seq: {} " + "status: {} in term: {} was rejected", tx.id, tx.etag, tx.pid, @@ -2424,8 +2566,8 @@ tx_gateway_frontend::recommit_tm_tx( failed = true; vlog( txlog.trace, - "commit_tx on consumer groups tx:{} etag:{} pid:{} tx_seq:{} " - "status:{} in term:{} failed with {}", + "[tx_id={}] commit_tx on consumer groups etag: {} pid: {} " + "tx_seq: {} status: {} in term: {} failed with {}", tx.id, tx.etag, tx.pid, @@ -2442,8 +2584,8 @@ tx_gateway_frontend::recommit_tm_tx( rejected = true; vlog( txlog.warn, - "commit_tx on data partition tx:{} etag:{} pid:{} tx_seq:{} " - "status:{} in term:{} was rejected", + "[tx_id={}] commit_tx on data partition etag: {} pid: {} " + "tx_seq: {} status: {} in term: {} was rejected", tx.id, tx.etag, tx.pid, @@ -2454,8 +2596,8 @@ tx_gateway_frontend::recommit_tm_tx( failed = true; vlog( txlog.trace, - "commit_tx on data partition tx:{} etag:{} pid:{} " - "tx_seq:{} status:{} in term:{} failed with {}", + "[tx_id={}] commit_tx on data partition etag: {} pid: {} " + "tx_seq: {} status: {} in term: {} failed with {}", tx.id, tx.etag, tx.pid, @@ -2478,8 +2620,8 @@ tx_gateway_frontend::recommit_tm_tx( // request won't be ever processed vlog( txlog.warn, - "remote commit of tx:{} etag:{} pid:{} tx_seq:{} in term:{} " - "is rejected", + "[tx_id={}] remote commit etag: {} pid: {} tx_seq: {} in term: " + "{} rejected", tx.id, tx.etag, tx.pid, @@ -2490,7 +2632,13 @@ tx_gateway_frontend::recommit_tm_tx( tx = co_await remove_deleted_partitions_from_tx(stm, expected_term, tx); if (co_await sleep_abortable(delay_ms, _as)) { - vlog(txlog.trace, "retrying re-commit pid:{}", tx.pid); + vlog( + txlog.trace, + "[tx_id={}] retrying re-commit etag: {} pid: {} tx_seq: {}", + tx.id, + tx.etag, + tx.pid, + tx.tx_seq); } else { break; } @@ -2498,7 +2646,7 @@ tx_gateway_frontend::recommit_tm_tx( if (!done) { vlog( txlog.warn, - "remote commit of tx:{} etag:{} pid:{} tx_seq:{} in term:{} " + "[tx_id={}] remote commit etag: {} pid: {} tx_seq: {} in term: {} " "failed", tx.id, tx.etag, @@ -2542,8 +2690,8 @@ tx_gateway_frontend::reabort_tm_tx( rejected = true; vlog( txlog.warn, - "abort_tx on data partition tx:{} etag:{} pid:{} " - "tx_seq:{} status:{} in term:{} was rejected", + "[tx_id={}] abort_tx on data partition etag: {} pid: {} " + "tx_seq: {} status: {} in term: {} was rejected", tx.id, tx.etag, tx.pid, @@ -2553,9 +2701,9 @@ tx_gateway_frontend::reabort_tm_tx( } else if (r.ec != tx_errc::none) { failed = true; vlog( - txlog.trace, - "abort_tx on data partition tx:{} etag:{} pid:{} " - "tx_seq:{} status:{} in term:{} failed with {}", + txlog.info, + "[tx_id={}] abort_tx on data partition etag: {} pid: {} " + "tx_seq: {} status: {} in term: {} failed with {}", tx.id, tx.etag, tx.pid, @@ -2571,8 +2719,8 @@ tx_gateway_frontend::reabort_tm_tx( rejected = true; vlog( txlog.warn, - "abort_tx on consumer groups tx:{} etag:{} pid:{} " - "tx_seq:{} status:{} in term:{} was rejected", + "[tx_id={}] abort_tx on consumer groups etag: {} pid: {} " + "tx_seq: {} status: {} in term: {} was rejected", tx.id, tx.etag, tx.pid, @@ -2583,8 +2731,8 @@ tx_gateway_frontend::reabort_tm_tx( failed = true; vlog( txlog.trace, - "abort_tx on consumer groups tx:{} etag:{} pid:{} " - "tx_seq:{} status:{} in term:{} failed with {}", + "[tx_id={}] abort_tx on consumer groups etag: {} pid: {} " + "tx_seq: {} status: {} in term: {} failed with {}", tx.id, tx.etag, tx.pid, @@ -2602,7 +2750,7 @@ tx_gateway_frontend::reabort_tm_tx( if (rejected && !failed) { vlog( txlog.warn, - "remote abort of tx:{} etag:{} pid:{} tx_seq:{} in term:{} " + "[tx_id={}] remote abort etag: {} pid: {} tx_seq: {} in term: {} " "was rejected", tx.id, tx.etag, @@ -2619,7 +2767,7 @@ tx_gateway_frontend::reabort_tm_tx( if (!done) { vlog( txlog.warn, - "remote abort of tx:{} etag:{} pid:{} tx_seq:{} in term:{} " + "[tx_id={}] remote abort etag: {} pid: {} tx_seq: {} in term: {} " "failed", tx.id, tx.etag, @@ -2649,21 +2797,17 @@ ss::future> tx_gateway_frontend::bump_etag( if (!r1.has_value()) { // we invoke bump when we have the most up-to-date info // so reject isn't possible + vlog( + txlog.error, + "[tx_id={}] error rolling previous transaction with status: {} - " + "{}", + tx.id, + tx.status, + r1.error()); if (r1.error() == tx_errc::request_rejected) { - vlog( - txlog.error, - "got error {} on rolling previous tx.id={} with status={}", - r1.error(), - tx.id, - tx.status); co_return tx_errc::invalid_txn_state; } - vlog( - txlog.warn, - "got error {} on rolling previous tx.id={} with status={}", - r1.error(), - tx.id, - tx.status); + // until any decision is made it's ok to ask user retry co_return tx_errc::not_coordinator; } @@ -2673,10 +2817,10 @@ ss::future> tx_gateway_frontend::bump_etag( auto r2 = co_await stm->update_tx(tx, term); if (!r2) { vlog( - txlog.info, - "got {} on bumping etag on reading {}", - r1.error(), - tx.id); + txlog.error, + "[tx_id={}] error bumping etag - {}", + tx.id, + r2.error()); co_return tx_errc::not_coordinator; } co_return r2.value(); @@ -2705,10 +2849,10 @@ ss::future> tx_gateway_frontend::forget_tx( if (!r1.has_value() && r1.error() != tx_errc::request_rejected) { vlog( txlog.warn, - "got error {} on rolling previous tx.id={} with status={}", - r1.error(), + "[tx_id={}] error rolling previous id with status: {} - {}", tx.id, - tx.status); + tx.status, + r1.error()); // until any decision is made it's ok to ask user retry co_return tx_errc::not_coordinator; @@ -2716,7 +2860,8 @@ ss::future> tx_gateway_frontend::forget_tx( auto ec = co_await stm->expire_tx(term, tx.id); if (ec != tm_stm::op_status::success) { - vlog(txlog.warn, "got error {} on expiring tx.id={}", ec, tx.id); + vlog( + txlog.warn, "[tx_id={}] error expiring transaction - {}", tx.id, ec); co_return tx_errc::not_coordinator; } @@ -2736,9 +2881,9 @@ ss::future> tx_gateway_frontend::get_tx( } else { vlog( txlog.warn, - "got error {} on lookin up tx.id={}", - tx_opt.error(), - tid); + "[tx_id={}] error getting transaction - {}", + tid, + tx_opt.error()); // any error on lookin up a tx is a retriable error co_return tx_errc::not_coordinator; } @@ -2751,9 +2896,9 @@ ss::future> tx_gateway_frontend::get_tx( if (!tx_opt.has_value()) { vlog( txlog.warn, - "got error {} on rehydrating tx.id={}", - tx_opt.error(), - tid); + "[tx_id={}] error rehydrating transaction info - {}", + tid, + tx_opt.error()); // any error on lookin up a tx is a retriable error co_return tx_errc::not_coordinator; } @@ -2794,10 +2939,12 @@ ss::future> tx_gateway_frontend::get_tx( if (!r1.has_value()) { vlog( txlog.warn, - "got error {} on rolling previous tx.id={} with status={}", - r1.error(), - tx.id, - tx.status); + "[tx_id={}] error rolling previous transaction id with status: " + "{} - {}", + tid, + tx.status, + r1.error()); + // until any decision is made it's ok to ask user retry co_return tx_errc::not_coordinator; } @@ -2808,13 +2955,10 @@ ss::future> tx_gateway_frontend::get_tx( if (!r1) { vlog( txlog.warn, - "Can't fetch cached state of the persisted tx (pid:{} etag:{} " - "tx_seq:{} status:{}); true state is unknown, terminating session to " - "avoid data loss", - tx.pid, - tx.etag, - tx.tx_seq, - tx.status); + "[tx_id={}] Can't fetch cached state of the persisted tx: {}, true " + "state is unknown, terminating session to avoid data loss", + tx.id, + tx); co_return co_await forget_tx(term, stm, tx, timeout); } @@ -2830,9 +2974,10 @@ ss::future> tx_gateway_frontend::get_tx( if (old_tx.pid != tx.pid) { vlog( txlog.warn, - "A cached tx (tx:{} pid:{} etag:{} tx_seq:{} status:{}) of " - "the persisted (pid:{} etag:{} tx_seq:{} status:{}) should " - "have same pid; terminating session", + "[tx_id={}] A cached tx (tx: {} pid: {} etag: {} tx_seq: {} " + "status: {}) of the persisted (pid: {} etag: {} tx_seq: {} " + "status: {}) should have the same pid, terminating session", + tx.id, old_tx.id, old_tx.pid, old_tx.etag, @@ -2850,10 +2995,11 @@ ss::future> tx_gateway_frontend::get_tx( if (old_tx.status != tm_transaction::tx_status::ongoing) { vlog( txlog.warn, - "A cached tx (tx:{} pid:{} etag:{} tx_seq:{} status:{}) " - "of the persisted (pid:{} etag:{} tx_seq:{} status:{}) " - "may be in the tx seq future only if it's ongoing; " - "terminating session", + "[tx_id={}] A cached tx (tx: {} pid: {} etag: {} tx_seq: " + "{} status: {}) of the persisted (pid: {} etag: {} " + "tx_seq: {} status: {}) may be in the tx seq future only " + "if it's ongoing, terminating session", + tx.id, old_tx.id, old_tx.pid, old_tx.etag, @@ -2869,10 +3015,11 @@ ss::future> tx_gateway_frontend::get_tx( if (old_tx.status != tx.status) { vlog( txlog.warn, - "A cached tx (tx:{} pid:{} etag:{} tx_seq:{} status:{}) " - "of the persisted (pid:{} etag:{} tx_seq:{} status:{}) " - "with the same tx seq must have same status; terminating " - "session", + "[tx_id={}] A cached tx (tx: {} pid: {} etag: {} tx_seq: " + "{} status: {}) of the persisted (pid: {} etag: {} " + "tx_seq: {} status: {}) with the same tx seq must have " + "same status, terminating session", + tx.id, old_tx.id, old_tx.pid, old_tx.etag, @@ -2882,6 +3029,7 @@ ss::future> tx_gateway_frontend::get_tx( tx.etag, tx.tx_seq, tx.status); + co_return co_await forget_tx(term, stm, tx, timeout); } } @@ -2893,9 +3041,10 @@ ss::future> tx_gateway_frontend::get_tx( if (old_tx.pid != tx.pid) { vlog( txlog.warn, - "A cached tx (tx:{} pid:{} etag:{} tx_seq:{} status:{}) of the " - "persisted (pid:{} etag:{} tx_seq:{} status:{}) should have same " - "pid; terminating session", + "[tx_id={}] A cached tx (tx: {} pid: {} etag: {} tx_seq: " + "{} status: {}) of the persisted (pid: {} etag: {} " + "tx_seq: {} status: {}) should have same pid, terminating session", + tx.id, old_tx.id, old_tx.pid, old_tx.etag, @@ -2905,6 +3054,7 @@ ss::future> tx_gateway_frontend::get_tx( tx.etag, tx.tx_seq, tx.status); + co_return co_await forget_tx(term, stm, tx, timeout); } @@ -2921,10 +3071,11 @@ ss::future> tx_gateway_frontend::get_tx( if (old_tx.status != tm_transaction::tx_status::ongoing) { vlog( txlog.warn, - "A cached tx (tx:{} pid:{} etag:{} tx_seq:{} status:{}) " - "of the persisted (pid:{} etag:{} tx_seq:{} status:{}) " - "may be in the tx seq future only if it's ongoing; " - "terminating session", + "[tx_id={}] A cached tx (tx: {} pid: {} etag: {} tx_seq: " + "{} status: {}) of the persisted (pid: {} etag: {} " + "tx_seq: {} status: {}) may be in the tx seq future only " + "if it's ongoing, terminating session", + tx.id, old_tx.id, old_tx.pid, old_tx.etag, @@ -2944,17 +3095,20 @@ ss::future> tx_gateway_frontend::get_tx( && old_tx.status != tm_transaction::tx_status::aborting) { vlog( txlog.warn, - "A cached tx (tx:{} pid:{} etag:{} tx_seq:{} status:{}) " - "of the persisted (pid:{} etag:{} tx_seq:{} status:{}) " - "with the same tx seq must have either ongoing or " - "aborting status; terminating session", + "[tx_id={}] A cached tx (tx: {} pid: {} etag: {} tx_seq: " + "{} status: {}) of the persisted (pid: {} etag: {} " + "tx_seq: {} status: {}) with the same tx seq must have " + "either ongoing or aborting status; terminating session", + tx.id, old_tx.id, old_tx.pid, old_tx.etag, old_tx.tx_seq, + old_tx.status, tx.pid, tx.etag, - tx.tx_seq); + tx.tx_seq, + tx.status); co_return co_await forget_tx(term, stm, tx, timeout); } old_tx = tx; @@ -2977,10 +3131,11 @@ ss::future> tx_gateway_frontend::get_tx( if (old_tx.status != tm_transaction::tx_status::ongoing) { vlog( txlog.warn, - "A cached tx (tx:{} pid:{} etag:{} tx_seq:{} status:{}) " - "of the persisted (pid:{} etag:{} tx_seq:{} status:{}) " - "may be in the tx seq future only if it's ongoing; " - "terminating session", + "[tx_id={}] A cached tx (tx: {} pid: {} etag: {} tx_seq: " + "{} status: {}) of the persisted (pid: {} etag: {} " + "tx_seq: {} status: {}) may be in the tx seq future only " + "if it's ongoing, terminating session", + tx.id, old_tx.id, old_tx.pid, old_tx.etag, @@ -3000,17 +3155,20 @@ ss::future> tx_gateway_frontend::get_tx( && old_tx.status != tm_transaction::tx_status::prepared) { vlog( txlog.warn, - "A cached tx (tx:{} pid:{} etag:{} tx_seq:{} status:{}) " - "of the persisted (pid:{} etag:{} tx_seq:{} status:{}) " - "with the same tx seq must have either ongoing or " - "prepared status; terminating session", + "[tx_id={}] A cached tx (tx: {} pid: {} etag: {} tx_seq: " + "{} status: {}) of the persisted (pid: {} etag: {} " + "tx_seq: {} status: {}) with the same tx seq must have " + "either ongoing or prepared status, terminating session", + tx.id, old_tx.id, old_tx.pid, old_tx.etag, old_tx.tx_seq, + old_tx.status, tx.pid, tx.etag, - tx.tx_seq); + tx.tx_seq, + tx.status); co_return co_await forget_tx(term, stm, tx, timeout); } old_tx = tx; @@ -3027,9 +3185,11 @@ ss::future> tx_gateway_frontend::get_tx( // the same vlog( txlog.warn, - "A cached tx (tx:{} pid:{} etag:{} tx_seq:{} status:{}) of the " - "persisted (pid:{} etag:{} tx_seq:{} status:{}) should have same " - "txseq and status; terminating session", + "[tx_id={}] A cached tx (tx: {} pid: {} etag: {} tx_seq: " + "{} status: {}) of the persisted (pid: {} etag: {} " + "tx_seq: {} status: {}) should have the same tx_seq and status, " + "terminating session", + tx.id, old_tx.id, old_tx.pid, old_tx.etag, @@ -3049,8 +3209,9 @@ ss::future> tx_gateway_frontend::get_tx( vlog( txlog.warn, - "A persisted tx (pid:{} etag:{} tx_seq:{} status:{}) has unexpected " - "status", + "[tx_id={}] A persisted tx (pid: {} etag: {} tx_seq: {} status: {}) has " + "unexpected status", + tx.id, tx.pid, tx.etag, tx.tx_seq, @@ -3065,6 +3226,12 @@ ss::future> tx_gateway_frontend::get_latest_tx( model::producer_identity pid, kafka::transactional_id tx_id, model::timeout_clock::duration timeout) { + vlog( + txlog.trace, + "[tx_id={}] Getting latest tx for pid: {} in term: {}", + tx_id, + pid, + term); auto r0 = co_await get_tx(term, stm, tx_id, timeout); if (!r0.has_value()) { co_return r0.error(); @@ -3082,14 +3249,19 @@ ss::future> tx_gateway_frontend::get_latest_tx( if (tx.pid.id == pid.id && tx.pid.epoch > pid.epoch) { vlog( txlog.info, - "Producer {} (tx:{}) is fenced off by {}", - pid, + "[tx_id={}] producer {} is fenced of by {}", tx_id, + pid, tx.pid); co_return tx_errc::fenced; } + vlog( + txlog.info, + "[tx_id={}] transaction is mapped to {} not {}", + tx_id, + tx.pid, + pid); - vlog(txlog.info, "tx:{} is mapped to {} not {}", tx_id, tx.pid, pid); co_return tx_errc::invalid_producer_id_mapping; } @@ -3103,6 +3275,11 @@ tx_gateway_frontend::get_ongoing_tx( model::producer_identity pid, kafka::transactional_id tx_id, model::timeout_clock::duration timeout) { + vlog( + txlog.trace, + "[tx_id={}] Getting ongoing transactions for pid: {}", + tx_id, + pid); auto r0 = co_await get_latest_tx(term, stm, pid, tx_id, timeout); if (!r0.has_value()) { if (r0.error() == tx_errc::tx_not_found) { @@ -3133,7 +3310,8 @@ tx_gateway_frontend::get_ongoing_tx( // start a new transactions vlog( txlog.warn, - "can't modify an expired tx:{} pid:{} etag:{} tx_seq:{} in term:{}", + "[tx_id={} can't modify an expired transaction pid: {} etag: {} " + "tx_seq: {} in term: {}", tx.id, tx.pid, tx.etag, @@ -3157,9 +3335,14 @@ tx_gateway_frontend::get_ongoing_tx( if (!ongoing_tx.has_value()) { vlog( txlog.warn, - "resetting tx as ongoing failed with {} pid:{}", - ongoing_tx.error(), - pid); + "[tx_id={}] failed resetting ongoing transaction pid: {} etag: " + "{} tx_seq: {} term: {}", + tx.id, + tx.pid, + tx.etag, + tx.tx_seq, + term); + co_return tx_errc::invalid_txn_state; } co_return ongoing_tx.value(); @@ -3247,14 +3430,14 @@ ss::future<> tx_gateway_frontend::expire_old_tx( if (term_opt.error() == tm_stm::op_status::not_leader) { vlog( txlog.trace, - "this node isn't a leader for tx.id={} coordinator", + "[tx_id={}] this node is not current coordinator", tx_id); } vlog( txlog.warn, - "got error {} on syncing state machine (loading tx.id={})", - term_opt.error(), - tx_id); + "[tx_id={}] error syncing state machine - {}", + tx_id, + term_opt.error()); co_return; } @@ -3284,7 +3467,8 @@ ss::future tx_gateway_frontend::do_expire_old_tx( vlog( txlog.trace, - "attempting to expire tx id:{} pid:{} tx_seq:{} status:{}", + "[tx_id={}] attempting to expire transaction pid: {} tx_seq: {} status: " + "{}", tx.id, tx.pid, tx.tx_seq, @@ -3296,7 +3480,11 @@ ss::future tx_gateway_frontend::do_expire_old_tx( r = co_await do_abort_tm_tx(term, stm, tx, timeout); } if (!r.has_value()) { - vlog(txlog.warn, "got error {} on aborting tx.id={}", r.error(), tx.id); + vlog( + txlog.warn, + "[tx_id={}] error aborting transaction - {}", + tx.id, + r.error()); co_return r.error(); } @@ -3304,7 +3492,8 @@ ss::future tx_gateway_frontend::do_expire_old_tx( // it will be retried and it's an idempotent operation auto ec = co_await stm->expire_tx(term, tx.id); if (ec != tm_stm::op_status::success) { - vlog(txlog.warn, "got error {} on expiring tx.id={}", ec, tx.id); + vlog( + txlog.warn, "[tx_id={}] error expiring transaction - {}", tx.id, ec); co_return tx_errc::not_coordinator; } @@ -3413,7 +3602,11 @@ tx_gateway_frontend::describe_tx(kafka::transactional_id tid) { auto shard = _shard_table.local().shard_for(tm_ntp); if (!shard.has_value()) { - vlog(txlog.warn, "can't find a shard for {}", tm_ntp); + vlog( + txlog.warn, + "[tx_id={}] transaction manager {} partition shard not found", + tid, + tm_ntp); co_return tx_errc::shard_not_found; } @@ -3426,8 +3619,10 @@ tx_gateway_frontend::describe_tx(kafka::transactional_id tid) { if (!partition) { vlog( txlog.warn, - "transaction manager partition: {} not found", + "[tx_id={}] transaction manager {} partition not found", + tid, tm_ntp); + return ss::make_ready_future>( tx_errc::partition_not_found); } @@ -3436,7 +3631,10 @@ tx_gateway_frontend::describe_tx(kafka::transactional_id tid) { if (!stm) { vlog( - txlog.error, "can't get tm stm of the {}' partition", tm_ntp); + txlog.warn, + "[tx_id={}] can not get transactional manager stm for {}", + tid, + tm_ntp); return ss::make_ready_future>( tx_errc::stm_not_found); } @@ -3489,7 +3687,11 @@ ss::future tx_gateway_frontend::delete_partition_from_tx( } auto shard = _shard_table.local().shard_for(tm_ntp.value()); if (shard == std::nullopt) { - vlog(txlog.warn, "can't find a shard for {}", tm_ntp.value()); + vlog( + txlog.warn, + "[tx_id={}] transaction manager {} partition shard not found", + tid, + tm_ntp); co_return tx_errc::shard_not_found; } @@ -3497,7 +3699,11 @@ ss::future tx_gateway_frontend::delete_partition_from_tx( *shard, _ssg, [tid, ntp, tm_ntp](tx_gateway_frontend& self) { auto partition = self._partition_manager.local().get(tm_ntp.value()); if (!partition) { - vlog(txlog.warn, "can't get partition by {} ntp", tm_ntp.value()); + vlog( + txlog.warn, + "[tx_id={}] transaction manager {} partition not found", + tid, + tm_ntp); return ss::make_ready_future(tx_errc::invalid_txn_state); } @@ -3506,8 +3712,9 @@ ss::future tx_gateway_frontend::delete_partition_from_tx( if (!stm) { vlog( txlog.warn, - "can't get tm stm of the {}' partition", - tm_ntp.value()); + "[tx_id={}] can not get transactional manager stm for {}", + tid, + tm_ntp); return ss::make_ready_future(tx_errc::invalid_txn_state); } diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index 2c532944ab17e..abb33ca5537ee 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -1115,6 +1115,13 @@ operator<<(std::ostream& o, const fetch_tx_reply::tx_partition& p) { return o; } + +std::ostream& +operator<<(std::ostream& o, const add_paritions_tx_request::topic& t) { + fmt::print(o, "{{topic: {}, partitions: {}}}", t.name, t.partitions); + return o; +} + } // namespace cluster namespace reflection { diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index e31fbfdbbc6c9..b2e471b86c5b5 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -306,6 +306,7 @@ struct add_paritions_tx_request { struct topic { model::topic name{}; std::vector partitions{}; + friend std::ostream& operator<<(std::ostream&, const topic&); }; kafka::transactional_id transactional_id{}; kafka::producer_id producer_id{}; From 8d8ef01fad733a1051897682effb1abb0ddae97c Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Tue, 7 Nov 2023 16:34:15 +0100 Subject: [PATCH 6/7] rm_stm: clear metrics when stopping rm_stm When `rm_stm` is stopped it should release all used resources including metrics name. This will allow partition to be recreated if required. Signed-off-by: Michal Maslanka (cherry picked from commit 4a82177f69a475d5d11d263ba746fa5ca630ff53) --- src/v/cluster/rm_stm.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index d3b4cf1f30cd8..1279868fdb745 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -1189,6 +1189,7 @@ ss::future> rm_stm::do_replicate( ss::future<> rm_stm::stop() { auto_abort_timer.cancel(); _log_stats_timer.cancel(); + _metrics.clear(); return raft::state_machine::stop(); } From 3d338fb5241b2b7d9f0382749378dc06eb84aa81 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Mon, 20 Nov 2023 17:08:06 +0100 Subject: [PATCH 7/7] cs/manifest: allow updating kafka start offset with the same offset Previously validation was failing when `_kafka_start_offset` was overridden with the same offset. Since re-setting the offset to the same number doesn't change anything we may relax the validation here. Signed-off-by: Michal Maslanka (cherry picked from commit b1c1d014a1b6b263a465f5796e9586a9ba9150cf) --- src/v/cloud_storage/partition_manifest.cc | 2 +- src/v/cloud_storage/tests/partition_manifest_test.cc | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/v/cloud_storage/partition_manifest.cc b/src/v/cloud_storage/partition_manifest.cc index 85b89df4e6ec1..cffc7aea0bb3e 100644 --- a/src/v/cloud_storage/partition_manifest.cc +++ b/src/v/cloud_storage/partition_manifest.cc @@ -730,7 +730,7 @@ void partition_manifest::set_archive_clean_offset( bool partition_manifest::advance_start_kafka_offset( kafka::offset new_start_offset) { - if (_start_kafka_offset_override >= new_start_offset) { + if (_start_kafka_offset_override > new_start_offset) { return false; } _start_kafka_offset_override = new_start_offset; diff --git a/src/v/cloud_storage/tests/partition_manifest_test.cc b/src/v/cloud_storage/tests/partition_manifest_test.cc index cab061378d374..d27db9dc00d6d 100644 --- a/src/v/cloud_storage/tests/partition_manifest_test.cc +++ b/src/v/cloud_storage/tests/partition_manifest_test.cc @@ -1431,9 +1431,9 @@ SEASTAR_THREAD_TEST_CASE(test_partition_manifest_start_kafka_offset_advance) { m.get_start_kafka_offset_override(), kafka::offset(370)); BOOST_REQUIRE_EQUAL(m.get_start_offset(), model::offset(100)); BOOST_REQUIRE_EQUAL(m.get_start_kafka_offset(), kafka::offset(90)); - + // Allow update with the same value + BOOST_REQUIRE(m.advance_start_kafka_offset(kafka::offset(370))); // If trying to move back, it should no-op. - BOOST_REQUIRE(!m.advance_start_kafka_offset(kafka::offset(370))); BOOST_REQUIRE(!m.advance_start_kafka_offset(kafka::offset(369))); BOOST_REQUIRE_EQUAL( m.get_start_kafka_offset_override(), kafka::offset(370));