diff --git a/src/v/cloud_storage/partition_manifest.cc b/src/v/cloud_storage/partition_manifest.cc index 85b89df4e6ec1..cffc7aea0bb3e 100644 --- a/src/v/cloud_storage/partition_manifest.cc +++ b/src/v/cloud_storage/partition_manifest.cc @@ -730,7 +730,7 @@ void partition_manifest::set_archive_clean_offset( bool partition_manifest::advance_start_kafka_offset( kafka::offset new_start_offset) { - if (_start_kafka_offset_override >= new_start_offset) { + if (_start_kafka_offset_override > new_start_offset) { return false; } _start_kafka_offset_override = new_start_offset; diff --git a/src/v/cloud_storage/tests/partition_manifest_test.cc b/src/v/cloud_storage/tests/partition_manifest_test.cc index cab061378d374..d27db9dc00d6d 100644 --- a/src/v/cloud_storage/tests/partition_manifest_test.cc +++ b/src/v/cloud_storage/tests/partition_manifest_test.cc @@ -1431,9 +1431,9 @@ SEASTAR_THREAD_TEST_CASE(test_partition_manifest_start_kafka_offset_advance) { m.get_start_kafka_offset_override(), kafka::offset(370)); BOOST_REQUIRE_EQUAL(m.get_start_offset(), model::offset(100)); BOOST_REQUIRE_EQUAL(m.get_start_kafka_offset(), kafka::offset(90)); - + // Allow update with the same value + BOOST_REQUIRE(m.advance_start_kafka_offset(kafka::offset(370))); // If trying to move back, it should no-op. - BOOST_REQUIRE(!m.advance_start_kafka_offset(kafka::offset(370))); BOOST_REQUIRE(!m.advance_start_kafka_offset(kafka::offset(369))); BOOST_REQUIRE_EQUAL( m.get_start_kafka_offset_override(), kafka::offset(370)); diff --git a/src/v/cluster/config_manager.cc b/src/v/cluster/config_manager.cc index e24915be744f6..d0b23804b0f0d 100644 --- a/src/v/cluster/config_manager.cc +++ b/src/v/cluster/config_manager.cc @@ -18,9 +18,11 @@ #include "cluster/logger.h" #include "cluster/members_table.h" #include "cluster/partition_leaders_table.h" +#include "cluster/types.h" #include "config/configuration.h" #include "config/node_config.h" #include "features/feature_table.h" +#include "model/metadata.h" #include "resource_mgmt/io_priority.h" #include "rpc/connection_cache.h" #include "utils/file_io.h" @@ -81,7 +83,7 @@ config_manager::config_manager( /** * Register notification immediately not to lose status updates. */ - _member_removed_notification + _member_update_notification = _members.local().register_members_updated_notification( [this](model::node_id id, model::membership_state new_state) { handle_cluster_members_update(id, new_state); @@ -229,17 +231,24 @@ ss::future<> config_manager::start() { } void config_manager::handle_cluster_members_update( model::node_id id, model::membership_state new_state) { - if (new_state != model::membership_state::removed) { - return; + vlog( + clusterlog.debug, + "Processing membership notification: {{id: {} state: {}}}", + id, + new_state); + if (new_state == model::membership_state::active) { + // add an empty status placeholder if node is not yet known + status.try_emplace(id, config_status{.node = id}); + } else if (new_state == model::membership_state::removed) { + status.erase(id); } - status.erase(id); } ss::future<> config_manager::stop() { vlog(clusterlog.info, "Stopping Config Manager..."); _reconcile_wait.broken(); _members.local().unregister_members_updated_notification( - _member_removed_notification); + _member_update_notification); _leaders.local().unregister_leadership_change_notification( _raft0_leader_changed_notification); co_await _gate.close(); diff --git a/src/v/cluster/config_manager.h b/src/v/cluster/config_manager.h index bd3e53e1f445e..3554a4b35f2e3 100644 --- a/src/v/cluster/config_manager.h +++ b/src/v/cluster/config_manager.h @@ -135,7 +135,7 @@ class config_manager final { ss::sharded& _leaders; ss::sharded& _feature_table; ss::sharded& _members; - notification_id_type _member_removed_notification; + notification_id_type _member_update_notification; notification_id_type _raft0_leader_changed_notification; ss::condition_variable _reconcile_wait; diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index d3b4cf1f30cd8..1279868fdb745 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -1189,6 +1189,7 @@ ss::future> rm_stm::do_replicate( ss::future<> rm_stm::stop() { auto_abort_timer.cancel(); _log_stats_timer.cancel(); + _metrics.clear(); return raft::state_machine::stop(); } diff --git a/src/v/cluster/tm_stm.cc b/src/v/cluster/tm_stm.cc index bb9ee15d0ea6c..c3ff3fba2e4ed 100644 --- a/src/v/cluster/tm_stm.cc +++ b/src/v/cluster/tm_stm.cc @@ -159,6 +159,11 @@ ss::future tm_stm::try_init_hosted_transactions( ss::future tm_stm::include_hosted_transaction( model::term_id term, kafka::transactional_id tx_id) { + vlog( + _ctx_log.trace, + "[tx_id={}] including hosted transactions in term: {}", + tx_id, + term); if (!_hosted_txes.inited) { co_return op_status::unknown; } @@ -174,6 +179,11 @@ ss::future tm_stm::include_hosted_transaction( ss::future tm_stm::exclude_hosted_transaction( model::term_id term, kafka::transactional_id tx_id) { + vlog( + _ctx_log.trace, + "[tx_id={}] excluding hosted transactions in term: {}", + tx_id, + term); if (!_hosted_txes.inited) { co_return op_status::unknown; } @@ -358,7 +368,8 @@ ss::future tm_stm::do_update_hosted_transactions( offset, model::timeout_clock::now() + _sync_timeout)) { vlog( _ctx_log.info, - "timeout on waiting until {} is applied on updating hash_ranges", + "timeout on waiting until {} is applied on updating hosted " + "transactions", offset); if (_c->is_leader() && _c->term() == term) { co_await _c->step_down("txn coordinator apply timeout"); @@ -368,8 +379,8 @@ ss::future tm_stm::do_update_hosted_transactions( if (_c->term() != term) { vlog( _ctx_log.info, - "lost leadership while waiting until {} is applied on updating hash " - "ranges", + "lost leadership while waiting until {} is applied on updating " + "hosted transactions", offset); co_return op_status::unknown; } @@ -384,14 +395,19 @@ tm_stm::update_tx(tm_transaction tx, model::term_id term) { ss::future> tm_stm::do_update_tx(tm_transaction tx, model::term_id term) { + vlog( + _ctx_log.trace, + "[tx_id={}] updating transaction: {} in term: {}", + tx.id, + tx, + term); auto batch = serialize_tx(tx); auto r = co_await replicate_quorum_ack(term, std::move(batch)); if (!r) { vlog( _ctx_log.info, - "got error {} on updating tx:{} pid:{} etag:{} tx_seq:{}", - r.error(), + "[tx_id={}] error updating tx: {} - {}", tx.id, tx.pid, tx.etag, @@ -411,9 +427,7 @@ tm_stm::do_update_tx(tm_transaction tx, model::term_id term) { offset, model::timeout_clock::now() + _sync_timeout)) { vlog( _ctx_log.info, - "timeout on waiting until {} is applied on updating tx:{} pid:{} " - "tx_seq:{}", - offset, + "[tx_id={}] timeout waiting for offset {} to be applied tx: {}", tx.id, tx.pid, tx.tx_seq); @@ -425,12 +439,12 @@ tm_stm::do_update_tx(tm_transaction tx, model::term_id term) { if (_c->term() != term) { vlog( _ctx_log.info, - "lost leadership while waiting until {} is applied on updating tx:{} " - "pid:{} tx_seq:{}", - offset, + "[tx_id={}] leadership while waiting until offset {} is applied tx: " + "{}", tx.id, - tx.pid, - tx.tx_seq); + offset, + tx); + co_return tm_stm::op_status::unknown; } @@ -438,10 +452,9 @@ tm_stm::do_update_tx(tm_transaction tx, model::term_id term) { if (!tx_opt) { vlog( _ctx_log.warn, - "can't find an updated tx:{} pid:{} tx_seq:{} in the cache", + "[tx_id={}] can't find an updated tx: {} in the cache", tx.id, - tx.pid, - tx.tx_seq); + tx); // update_tx must return conflict only in this case, see expire_tx co_return tm_stm::op_status::conflict; } @@ -466,6 +479,11 @@ tm_stm::mark_tx_preparing( ss::future> tm_stm::mark_tx_aborting( model::term_id expected_term, kafka::transactional_id tx_id) { + vlog( + _ctx_log.trace, + "[tx_id={}] marking transaction as aborted in term: {}", + tx_id, + expected_term); auto ptx = co_await get_tx(tx_id); if (!ptx.has_value()) { co_return ptx; @@ -481,13 +499,18 @@ ss::future> tm_stm::mark_tx_aborting( ss::future> tm_stm::mark_tx_prepared( model::term_id expected_term, kafka::transactional_id tx_id) { + vlog( + _ctx_log.trace, + "[tx_id={}] marking transaction as prepared in term: {}", + tx_id, + expected_term); auto tx_opt = co_await get_tx(tx_id); if (!tx_opt.has_value()) { vlog( _ctx_log.trace, - "got {} on pulling tx {} to mark it prepared", - tx_opt.error(), - tx_id); + "[tx_id={}] error getting transaction - {}", + tx_id, + tx_opt.error()); co_return tx_opt; } auto tx = tx_opt.value(); @@ -498,10 +521,10 @@ ss::future> tm_stm::mark_tx_prepared( if (tx.status != check_status) { vlog( _ctx_log.warn, - "can't mark tx:{} pid:{} tx_seq:{} prepared wrong status {} != {}", - tx.id, - tx.pid, - tx.tx_seq, + "[tx_id={}] error marking transaction {} as prepared. Incorrect " + "status {} != {}", + tx_id, + tx, tx.status, check_status); co_return tm_stm::op_status::conflict; @@ -513,6 +536,11 @@ ss::future> tm_stm::mark_tx_prepared( ss::future> tm_stm::mark_tx_killed( model::term_id expected_term, kafka::transactional_id tx_id) { + vlog( + _ctx_log.trace, + "[tx_id={}] marking transaction as killed in term: {}", + tx_id, + expected_term); auto tx_opt = co_await get_tx(tx_id); if (!tx_opt.has_value()) { co_return tx_opt; @@ -530,6 +558,11 @@ ss::future> tm_stm::mark_tx_killed( ss::future> tm_stm::reset_transferring(model::term_id term, kafka::transactional_id tx_id) { + vlog( + _ctx_log.trace, + "[tx_id={}] resetting transfer of transaction in term: {}", + tx_id, + term); auto ptx = co_await get_tx(tx_id); if (!ptx.has_value()) { co_return ptx; @@ -541,17 +574,15 @@ tm_stm::reset_transferring(model::term_id term, kafka::transactional_id tx_id) { } vlog( _ctx_log.trace, - "observed a transferring tx:{} pid:{} etag:{} tx_seq:{} in term:{}", + "[tx_id={}] observed a transferring tx: {}, term: {}", tx_id, - tx.pid, - tx.etag, - tx.tx_seq, + tx, term); if (tx.etag == term) { // case 1 - Unlikely, just reset the transferring flag. vlog( _ctx_log.warn, - "tx: {} transferring within same term: {}, resetting.", + "[tx_id={}] transferring within same term: {}, resetting.", tx_id, tx.etag); } @@ -569,6 +600,11 @@ tm_stm::reset_transferring(model::term_id term, kafka::transactional_id tx_id) { ss::future> tm_stm::mark_tx_ongoing( model::term_id expected_term, kafka::transactional_id tx_id) { + vlog( + _ctx_log.trace, + "[tx_id={}] marking transaction as ongoing in term: {}", + tx_id, + expected_term); auto tx_opt = co_await get_tx(tx_id); if (!tx_opt.has_value()) { co_return tx_opt; @@ -577,7 +613,7 @@ ss::future> tm_stm::mark_tx_ongoing( if (tx.etag != expected_term) { vlog( _ctx_log.warn, - "An attempt to update state data tx:{} pid:{} tx_seq:{} etag:{} " + "[tx_id={}] attempt to update state data pid:{} tx_seq:{} etag:{} " "assuming etag is {}", tx.id, tx.pid, @@ -601,7 +637,13 @@ ss::future tm_stm::re_register_producer( std::chrono::milliseconds transaction_timeout_ms, model::producer_identity pid, model::producer_identity last_pid) { - vlog(_ctx_log.trace, "Registering existing tx: id={}, pid={}", tx_id, pid); + vlog( + _ctx_log.trace, + "[tx_id={}] Registering existing transaction with new pid: {}, previous " + "pid: {}", + tx_id, + pid, + last_pid); auto tx_opt = co_await get_tx(tx_id); if (!tx_opt.has_value()) { @@ -645,7 +687,12 @@ ss::future tm_stm::do_register_new_producer( kafka::transactional_id tx_id, std::chrono::milliseconds transaction_timeout_ms, model::producer_identity pid) { - vlog(_ctx_log.trace, "Registering new tx: id={}, pid={}", tx_id, pid); + vlog( + _ctx_log.trace, + "[tx_id={}] Registering new transaction pid: {}, term: {}", + tx_id, + pid, + expected_term); auto tx_opt = co_await get_tx(tx_id); if (tx_opt.has_value()) { @@ -696,32 +743,32 @@ ss::future tm_stm::add_partitions( std::vector partitions) { auto tx_opt = find_tx(tx_id); if (!tx_opt) { - vlog(_ctx_log.warn, "An ongoing transaction tx:{} isn't found", tx_id); + vlog( + _ctx_log.warn, + "[tx_id={}] unable to find ongoing transaction", + tx_id); + co_return tm_stm::op_status::unknown; } auto tx = tx_opt.value(); if (tx.status != tm_transaction::tx_status::ongoing) { vlog( _ctx_log.warn, - "Expected an ongoing txn, found tx:{} pid:{} tx_seq:{} etag:{} " - "status:{}", - tx.id, - tx.pid, - tx.tx_seq, - tx.etag, - tx.status); + "[tx_id={}] expected ongoing transaction, found: {} ", + tx_id, + tx); + co_return tm_stm::op_status::unknown; } if (tx.etag != expected_term) { vlog( _ctx_log.warn, - "An attempt to add partitions to tx:{} pid:{} tx_seq:{} etag:{} " - "assuming etag is {}", - tx.id, - tx.pid, - tx.tx_seq, - tx.etag, + "[tx_id={}] adding partition fenced transaction: {} expected term: " + "{}", + tx_id, + tx, expected_term); + co_return tm_stm::op_status::unknown; } @@ -748,6 +795,12 @@ ss::future tm_stm::add_partitions( } tx.last_update_ts = clock_type::now(); _cache->set_mem(tx.etag, tx_id, tx); + vlog( + _ctx_log.trace, + "[tx_id={}] transaction: {} added with etag: {}", + tx_id, + tx, + expected_term); co_return tm_stm::op_status::success; } @@ -759,31 +812,27 @@ ss::future tm_stm::add_group( model::term_id etag) { auto tx_opt = find_tx(tx_id); if (!tx_opt) { - vlog(_ctx_log.warn, "An ongoing transaction tx:{} isn't found", tx_id); + vlog( + _ctx_log.trace, + "[tx_id={}] unable to find ongoing transaction", + tx_id); co_return tm_stm::op_status::unknown; } auto tx = tx_opt.value(); if (tx.status != tm_transaction::tx_status::ongoing) { vlog( _ctx_log.warn, - "Expected an ongoing txn, found tx:{} pid:{} tx_seq:{} etag:{} " - "status:{}", - tx.id, - tx.pid, - tx.tx_seq, - tx.etag, - tx.status); + "[tx_id={}] expected ongoing transaction, found: {} ", + tx_id, + tx); co_return tm_stm::op_status::unknown; } if (tx.etag != expected_term) { vlog( _ctx_log.warn, - "An attempt to add group to tx:{} pid:{} tx_seq:{} etag:{} assuming " - "etag is {}", - tx.id, - tx.pid, - tx.tx_seq, - tx.etag, + "[tx_id={}] adding group fenced transaction: {} expected term: {}", + tx_id, + tx, expected_term); co_return tm_stm::op_status::unknown; } @@ -970,16 +1019,21 @@ tm_stm::apply_tm_update(model::record_batch_header hdr, model::record_batch b) { "broken model::record_batch_type::tm_update. expected tx.id {} got: {}", tx.id, tx_id); + vlog( + _ctx_log.trace, + "[tx_id={}] applying transaction: {} in term: {}", + tx.id, + tx, + _insync_term); if (tx.status == tm_transaction::tx_status::tombstone) { _cache->erase_log(tx.id); vlog( _ctx_log.trace, - "erasing {} (tombstone) pid:{} tx_seq:{} etag:{} in term:{} from mem", + "[tx_id={}] erasing (tombstone) transaction: {} in term: {} from " + "memory", tx.id, - tx.pid, - tx.tx_seq, - tx.etag, + tx, _insync_term); _cache->erase_mem(tx.id); _pid_tx_id.erase(tx.pid); @@ -995,16 +1049,12 @@ tm_stm::apply_tm_update(model::record_batch_header hdr, model::record_batch b) { _cache->erase_mem(tx.id); vlog( _ctx_log.trace, - "erasing {} (log overwrite) pid:{} tx_seq:{} etag:{} from mem in " - "term:{} by pid:{} etag:{} tx_seq:{}", - old_tx.id, - old_tx.pid, - old_tx.tx_seq, - old_tx.etag, + "[tx_id={}] erasing (log overwrite) transaction: {} in term: {} " + "from memory by new transaction: {}", + tx.id, + old_tx, _insync_term, - tx.pid, - tx.etag, - tx.tx_seq); + tx); } } @@ -1162,7 +1212,8 @@ tm_stm::expire_tx(model::term_id term, kafka::transactional_id tx_id) { if (r0.has_value()) { vlog( _ctx_log.error, - "written tombstone should evict tx:{} from the cache", + "[tx_id={}] written tombstone should evict transaction from the " + "cache", tx_id); co_return tm_stm::op_status::unknown; } diff --git a/src/v/cluster/tm_stm_cache.cc b/src/v/cluster/tm_stm_cache.cc index 3bf19c0c020a6..c3afaccc261df 100644 --- a/src/v/cluster/tm_stm_cache.cc +++ b/src/v/cluster/tm_stm_cache.cc @@ -21,6 +21,8 @@ #include #include +#include + #include #include @@ -36,7 +38,7 @@ std::ostream& operator<<(std::ostream& o, const tm_transaction& tx) { std::optional tm_stm_cache::find(model::term_id term, kafka::transactional_id tx_id) { - vlog(txlog.trace, "looking for tx:{} etag:{}", tx_id, term); + vlog(txlog.trace, "[tx_id={}] looking for tx with term: {}", tx_id, term); if (_mem_term && _mem_term.value() == term) { // when a node fetches a tx withing a term it means that it was // elected as a leader with a higher term and the request should @@ -48,9 +50,8 @@ tm_stm_cache::find(model::term_id term, kafka::transactional_id tx_id) { if (entry_it == _state.end()) { vlog( txlog.trace, - "looking for tx:{} etag:{}: can't find term:{} in _state", + "[tx_id={}] looking for tx with etag: {}, term not found", tx_id, - term, term); // tm_stm_cache_entry isn't found it means a node memory was wiped // and we can't guess it's last state @@ -72,18 +73,17 @@ tm_stm_cache::find(model::term_id term, kafka::transactional_id tx_id) { if (log_it == _log_txes.end()) { vlog( txlog.trace, - "looking for tx:{} etag:{}: can't find tx:{} in log", + "[tx_id={}] looking for tx with etag: {}, can't find tx_id in log", tx_id, - term, - tx_id); + term); return std::nullopt; } if (log_it->second.tx.etag != term) { vlog( txlog.trace, - "looking for tx:{} etag:{}: found a tx with etag:{} pid:{} tx_seq:{} " - "(wrong etag)", + "[tx_id={}] looking for tx with etag: {}: found a tx with etag: {} " + "pid: {} tx_seq: {} (wrong etag)", tx_id, term, log_it->second.tx.etag, @@ -91,7 +91,12 @@ tm_stm_cache::find(model::term_id term, kafka::transactional_id tx_id) { log_it->second.tx.tx_seq); return std::nullopt; } - + vlog( + txlog.trace, + "[tx_id={}] found tx with etag: {} - {}", + tx_id, + term, + log_it->second.tx); return log_it->second.tx; } @@ -125,7 +130,7 @@ tm_stm_cache::find_log(kafka::transactional_id tx_id) { void tm_stm_cache::set_log(tm_transaction tx) { vlog( txlog.trace, - "saving tx:{} etag:{} pid:{} tx_seq:{} to log", + "[tx_id={}] saving tx with etag: {} pid: {} tx_seq: {} to log", tx.id, tx.etag, tx.pid, @@ -156,7 +161,7 @@ void tm_stm_cache::erase_log(kafka::transactional_id tx_id) { auto& tx = tx_it->second.tx; vlog( txlog.trace, - "erasing tx:{} etag:{} pid:{} tx_seq:{} from log", + "[tx_id= {}] erasing tx with etag: {} pid: {} tx_seq: {} from log", tx.id, tx.etag, tx.pid, @@ -174,11 +179,15 @@ fragmented_vector tm_stm_cache::get_log_transactions() { void tm_stm_cache::set_mem( model::term_id term, kafka::transactional_id tx_id, tm_transaction tx) { - auto entry_it = _state.find(term); - if (entry_it == _state.end()) { - _state[term] = tm_stm_cache_entry{.term = term}; - entry_it = _state.find(term); - } + vlog( + txlog.trace, + "[tx_id={}] setting tx with etag: {} to {}", + tx_id, + term, + tx); + auto [entry_it, _] = _state.try_emplace( + term, tm_stm_cache_entry{.term = term}); + entry_it->second.txes[tx_id] = tx; if (!_mem_term) { @@ -217,7 +226,7 @@ void tm_stm_cache::erase_mem(kafka::transactional_id tx_id) { auto tx = tx_it->second; vlog( txlog.trace, - "erasing tx:{} etag:{} pid:{} tx_seq:{} from mem", + "[tx_id={}] erasing tx with etag: {} pid: {} tx_seq: {} from mem", tx.id, tx.etag, tx.pid, diff --git a/src/v/cluster/tx_gateway_frontend.cc b/src/v/cluster/tx_gateway_frontend.cc index 3597b9f0a7e09..de36c3de517e6 100644 --- a/src/v/cluster/tx_gateway_frontend.cc +++ b/src/v/cluster/tx_gateway_frontend.cc @@ -37,6 +37,7 @@ #include #include #include +#include #include #include @@ -81,7 +82,7 @@ static tm_transaction as_tx( kafka::transactional_id tx_id, model::term_id term, fetch_tx_reply reply) { vassert( reply.ec == tx_errc::none, - "can't extract a tx from a failed (ec:{}) reply", + "can't extract a tx from a failed (ec: {}) reply", reply.ec); tm_transaction tx; tx.id = tx_id; @@ -386,8 +387,7 @@ ss::future tx_gateway_frontend::do_hosts( vlog( txlog.trace, "[tx_id={}] stm hosts request, partition: " - "{}, " - "result: {}", + "{}, result: {}", tx_id, partition, result); @@ -430,6 +430,12 @@ ss::future tx_gateway_frontend::do_init_hosted_transactions( ss::future tx_gateway_frontend::fetch_tx_locally( kafka::transactional_id tx_id, model::term_id term, model::partition_id tm) { + vlog( + txlog.trace, + "[tx_id={}] fetching transaction locally term: {}, partition: {}", + tx_id, + term, + tm); auto map = [tx_id, term, tm]( tm_stm_cache_manager& cache_manager) -> std::optional { @@ -442,6 +448,9 @@ ss::future tx_gateway_frontend::fetch_tx_locally( auto tx_opt = co_await _tm_stm_cache_manager.map_reduce0( map, std::optional{}, reduce); + vlog( + txlog.trace, "[tx_id={}] fetching transaction result: {}", tx_id, tx_opt); + if (!tx_opt) { co_return fetch_tx_reply(tx_errc::tx_not_found); } @@ -480,18 +489,24 @@ ss::future tx_gateway_frontend::fetch_tx_locally( reply.partitions.reserve(tx.partitions.size()); for (auto& p : tx.partitions) { - reply.partitions.push_back( - fetch_tx_reply::tx_partition(p.ntp, p.etag, p.topic_revision)); + reply.partitions.emplace_back(p.ntp, p.etag, p.topic_revision); } reply.groups.reserve(tx.groups.size()); for (auto& g : tx.groups) { - reply.groups.push_back(fetch_tx_reply::tx_group(g.group_id, g.etag)); + reply.groups.emplace_back(g.group_id, g.etag); } co_return reply; } ss::future> tx_gateway_frontend::fetch_tx( kafka::transactional_id tx_id, model::term_id term, model::partition_id tm) { + vlog( + txlog.trace, + "[tx_id={}] fetching transactions from partition: {} in term: {}", + tx_id, + term, + tm); + auto outcome = ss::make_lw_shared< available_promise>>(); ssx::spawn_with_gate(_gate, [this, tx_id, term, outcome, tm] { @@ -527,7 +542,7 @@ ss::future<> tx_gateway_frontend::dispatch_fetch_tx( if (node_id == self) { vlog( txlog.trace, - "fetching tx:{} in term:{} tm:{} from {}", + "[tx_id={}] fetching tx in term: {} partition_id: {} from: {}", tx_id, term, tm, @@ -538,12 +553,13 @@ ss::future<> tx_gateway_frontend::dispatch_fetch_tx( [tx_id, term, outcome, node_id, tm](fetch_tx_reply reply) { vlog( txlog.trace, - "got {} on fetching tx:{} in term:{} tm:{} from {}", - reply.ec, + "[tx_id={}] error fetching tx in term: {} on " + "partition_id {} from {} - {}", tx_id, term, tm, - node_id); + node_id, + reply.ec); if (reply.ec == tx_errc::none) { if (!outcome->available()) { auto tx = as_tx(tx_id, term, reply); @@ -570,7 +586,8 @@ ss::future tx_gateway_frontend::dispatch_fetch_tx( outcome) { vlog( txlog.trace, - "fetching tx:{} in term:{} tm: {} from {}", + "[tx_id={}] dispatching fetch_tx request in term: {} for partition: {} " + "to {}", tx_id, term, tm, @@ -591,21 +608,21 @@ ss::future tx_gateway_frontend::dispatch_fetch_tx( if (r.has_error()) { vlog( txlog.warn, - "got error {} on fetching tx:{} in term:{} from {}", - r.error(), + "[tx_id={}] error fetching tx in term {} from {} - {}", tx_id, term, - target); + target, + r.error()); return fetch_tx_reply(tx_errc::unknown_server_error); } auto reply = r.value(); vlog( txlog.trace, - "got {} on fetching tx:{} in term:{} from {}", - reply.ec, + "[tx_id={}] fetching tx in term {} from {} result: {}", tx_id, term, - target); + target, + r.value()); if (reply.ec == tx_errc::none) { if (!outcome->available()) { auto tx = as_tx(tx_id, term, reply); @@ -767,6 +784,7 @@ ss::future tx_gateway_frontend::do_try_abort( .then([this, stm, pid, tx_seq, timeout]( checked term_opt) { if (!term_opt.has_value()) { + vlog(txlog.debug, "stm barrier error: {}", term_opt.error()); if (term_opt.error() == tm_stm::op_status::not_leader) { return ss::make_ready_future( try_abort_reply{tx_errc::not_coordinator}); @@ -779,7 +797,7 @@ ss::future tx_gateway_frontend::do_try_abort( if (!tx_id_opt) { vlog( txlog.trace, - "can't find tx by pid:{} considering it aborted", + "can't find tx by pid: {} considering it aborted", pid); return ss::make_ready_future( try_abort_reply::make_aborted()); @@ -841,20 +859,22 @@ ss::future tx_gateway_frontend::do_try_abort( if (!term_opt.has_value()) { if (term_opt.error() == tm_stm::op_status::not_leader) { vlog( - txlog.trace, - "this node isn't a leader for tx:{} coordinator", - tx_id); + txlog.trace, "[tx_id={}] this node isn't a coordinator", tx_id); co_return try_abort_reply{tx_errc::not_coordinator}; } vlog( txlog.warn, - "got error {} on sync'ing in-memory state", - term_opt.error(), - tx_id); + "[tx_id={}] error syncing in memory state - {}", + tx_id, + term_opt.error()); co_return try_abort_reply{tx_errc::timeout}; } if (term_opt.value() != term) { + vlog( + txlog.info, + "[tx_id={}] error syncing in memory state, empty term optional", + tx_id); co_return try_abort_reply{tx_errc::not_coordinator}; } @@ -863,7 +883,7 @@ ss::future tx_gateway_frontend::do_try_abort( if (tx_opt.error() == tm_stm::op_status::not_found) { vlog( txlog.trace, - "can't find a tx by id:{} (pid:{} tx_seq:{}) considering it " + "[tx_id={}] can't find a tx (pid: {} tx_seq: {}) considering it " "aborted", tx_id, pid, @@ -872,9 +892,9 @@ ss::future tx_gateway_frontend::do_try_abort( } else { vlog( txlog.warn, - "got error {} on lookin up tx:{}", - tx_opt.error(), - tx_id); + "[tx_id={}] error looking up the transaction - {}", + tx_id, + tx_opt.error()); co_return try_abort_reply{tx_errc::unknown_server_error}; } } @@ -886,9 +906,9 @@ ss::future tx_gateway_frontend::do_try_abort( if (!tx_opt.has_value()) { vlog( txlog.warn, - "got error {} on rehydrating tx:{}", - tx_opt.error(), - tx_id); + "[tx_id={}] error rehydrating transaction - {}", + tx_id, + tx_opt.error()); // any error on lookin up a tx is a retriable error co_return try_abort_reply{tx_errc::not_coordinator}; } @@ -896,6 +916,13 @@ ss::future tx_gateway_frontend::do_try_abort( } if (tx.etag > term) { + vlog( + txlog.trace, + "[tx_id={}] fenced aborting transaction, etag: {} is greater than " + "current term: {}", + tx_id, + tx.etag, + term); // tx was written by a future leader meaning current // node can't be a leader co_return try_abort_reply{tx_errc::not_coordinator}; @@ -925,8 +952,8 @@ ss::future tx_gateway_frontend::do_try_abort( if (tx.pid != pid || tx.tx_seq != tx_seq) { vlog( txlog.trace, - "found tx:{} has pid:{} tx_seq:{} (expecting pid:{} tx_seq:{}) " - "considering it aborted", + "[tx_id={}] found tx has pid: {} tx_seq: {} (expecting pid: {} " + "tx_seq: {}) considering it aborted", tx_id, tx.pid, tx.tx_seq, @@ -938,7 +965,8 @@ ss::future tx_gateway_frontend::do_try_abort( if (tx.status == tm_transaction::tx_status::prepared) { vlog( txlog.trace, - "tx id:{} pid:{} tx_seq:{} is prepared => considering it committed", + "[tx_id={}] pid: {} tx_seq: {} is prepared => considering it " + "committed", tx_id, tx.pid, tx.tx_seq); @@ -949,7 +977,8 @@ ss::future tx_gateway_frontend::do_try_abort( || tx.status == tm_transaction::tx_status::ready) { vlog( txlog.trace, - "tx id:{} pid:{} tx_seq:{} has status:{} => considering it aborted", + "[tx_id={}] pid: {} tx_seq: {} has status: {} => considering it " + "aborted", tx_id, tx.pid, tx.tx_seq, @@ -962,7 +991,8 @@ ss::future tx_gateway_frontend::do_try_abort( || tx.status == tm_transaction::tx_status::ongoing) { vlog( txlog.trace, - "tx id:{} pid:{} tx_seq:{} is ongoing => forcing it to be aborted", + "[tx_id={}] pid: {} tx_seq: {} is ongoing => forcing it to be " + "aborted", tx_id, tx.pid, tx.tx_seq); @@ -975,7 +1005,7 @@ ss::future tx_gateway_frontend::do_try_abort( } co_return try_abort_reply::make_aborted(); } else { - vlog(txlog.error, "unknown tx status: {}", tx.status); + vlog(txlog.error, "[tx_id={}] unknown status: {}", tx_id, tx.status); co_return try_abort_reply{tx_errc::unknown_server_error}; } } @@ -1102,7 +1132,7 @@ ss::future tx_gateway_frontend::init_tm_tx_locally( model::partition_id tm) { vlog( txlog.trace, - "[tx_id={}] processing name:init_tm_tx, timeout:{}", + "[tx_id={}] processing name:init_tm_tx, timeout: {}", tx_id, transaction_timeout_ms); @@ -1168,7 +1198,7 @@ ss::future tx_gateway_frontend::init_tm_tx_locally( vlog( txlog.trace, - "[tx_id={}] sending name:init_tm_tx, pid:{}, ec: {}", + "[tx_id={}] sending name:init_tm_tx, pid: {}, ec: {}", tx_id, reply.pid, reply.ec); @@ -1272,17 +1302,14 @@ ss::future tx_gateway_frontend::limit_init_tm_tx( auto term_opt = co_await stm->sync(); if (!term_opt.has_value()) { if (term_opt.error() == tm_stm::op_status::not_leader) { - vlog( - txlog.trace, - "this node isn't a leader for tx.id={} coordinator", - tx_id); + vlog(txlog.trace, "[tx_id={}] node is not a leader", tx_id); co_return init_tm_tx_reply{tx_errc::not_coordinator}; } vlog( txlog.warn, - "got error {} on syncing (initializing tx.id={})", - term_opt.error(), - tx_id); + "[tx_id={}] error syncing stm - {}", + tx_id, + term_opt.error()); co_return init_tm_tx_reply{tx_errc::not_coordinator}; } auto term = term_opt.value(); @@ -1370,17 +1397,23 @@ ss::future tx_gateway_frontend::do_init_tm_tx( model::timeout_clock::duration timeout, model::producer_identity expected_pid) { if (!stm->hosts(tx_id)) { + vlog( + txlog.debug, + "[tx_id={}] stm on partition {} doesn't host transaction_id", + tx_id, + stm->get_partition()); co_return tx_errc::not_coordinator; } auto r0 = co_await get_tx(term, stm, tx_id, timeout); + if (!r0.has_value()) { if (r0.error() != tx_errc::tx_not_found) { vlog( txlog.warn, - "got error {} on loading tx.id={}", - r0.error(), - tx_id); + "[tx_id={}] error getting transaction: {}", + tx_id, + r0.error()); co_return init_tm_tx_reply{tx_errc::not_coordinator}; } @@ -1388,7 +1421,11 @@ ss::future tx_gateway_frontend::do_init_tm_tx( allocate_id_reply pid_reply = co_await _id_allocator_frontend.local().allocate_id(timeout); if (pid_reply.ec != errc::success) { - vlog(txlog.warn, "allocate_id failed with {}", pid_reply.ec); + vlog( + txlog.warn, + "[tx_id={}] allocate_id failed - {}", + tx_id, + pid_reply.ec); co_return init_tm_tx_reply{tx_errc::not_coordinator}; } @@ -1402,23 +1439,20 @@ ss::future tx_gateway_frontend::do_init_tm_tx( } else if (op_status == tm_stm::op_status::conflict) { vlog( txlog.warn, - "got conflict on registering new producer {} for tx.id={}", - pid, - tx_id); + "[tx_id={}] got conflict on registering new producer {}", + tx_id, + pid); reply.ec = tx_errc::conflict; } else if (op_status == tm_stm::op_status::not_leader) { - vlog( - txlog.warn, - "this node isn't a leader for tx.id={} coordinator", - tx_id); + vlog(txlog.warn, "[tx_id={}] this node is not coordinator", tx_id); reply.ec = tx_errc::not_coordinator; } else { vlog( txlog.warn, - "got {} on registering new producer {} for tx.id={}", - op_status, + "[tx_id={}] error registering producer with pid: {} - {}", + tx_id, pid, - tx_id); + op_status); reply.ec = tx_errc::invalid_txn_state; } co_return reply; @@ -1426,14 +1460,22 @@ ss::future tx_gateway_frontend::do_init_tm_tx( auto tx = r0.value(); if (!is_valid_producer(tx, expected_pid)) { + vlog( + txlog.info, + "[tx_id={}] producer with pid: {} for {} is invalid", + tx_id, + expected_pid, + tx); co_return init_tm_tx_reply{tx_errc::invalid_producer_epoch}; } checked r(tx); if (tx.status == tm_transaction::tx_status::ongoing) { + vlog(txlog.info, "[tx_id={}] tx is ongoing, aborting", tx_id); r = co_await do_abort_tm_tx(term, stm, tx, timeout); } else if (tx.status == tm_transaction::tx_status::preparing) { + vlog(txlog.info, "[tx_id={}] tx is preparing, aborting", tx_id); // preparing is obsolete, also it isn't acked until // it's prepared si it's safe to abort it r = co_await do_abort_tm_tx(term, stm, tx, timeout); @@ -1442,10 +1484,10 @@ ss::future tx_gateway_frontend::do_init_tm_tx( if (!r.has_value()) { vlog( txlog.warn, - "got error {} on rolling previous tx.id={} with status={}", - r.error(), + "[tx_id={}] error rolling previous with status: {} - {}", tx_id, - tx.status); + tx.status, + r.error()); co_return init_tm_tx_reply{r.error()}; } @@ -1455,6 +1497,7 @@ ss::future tx_gateway_frontend::do_init_tm_tx( if (expected_pid == model::unknown_pid) { if (is_max_epoch(tx.pid.epoch)) { + vlog(txlog.trace, "[tx_id={}] allocating new producer id", tx_id); allocate_id_reply pid_reply = co_await _id_allocator_frontend.local().allocate_id(timeout); reply.pid = model::producer_identity{pid_reply.id, 0}; @@ -1480,7 +1523,12 @@ ss::future tx_gateway_frontend::do_init_tm_tx( co_return init_tm_tx_reply{tx_errc::invalid_producer_epoch}; } } - + vlog( + txlog.trace, + "[tx_id={}] re-registering producer with new pid: {}, last_pid: {}", + tx.id, + reply.pid, + last_pid); auto op_status = co_await stm->re_register_producer( term, tx.id, transaction_timeout_ms, reply.pid, last_pid); if (op_status == tm_stm::op_status::success) { @@ -1492,10 +1540,10 @@ ss::future tx_gateway_frontend::do_init_tm_tx( } else { vlog( txlog.warn, - "got error {} on re-registering a producer {} for tx.id={}", - op_status, + "[tx_id={}] error re-registering a producer {} - {}", + tx.id, reply.pid, - tx.id); + op_status); reply.ec = tx_errc::invalid_txn_state; } co_return reply; @@ -1505,6 +1553,12 @@ ss::future tx_gateway_frontend::add_partition_to_tx( add_paritions_tx_request request, model::timeout_clock::duration timeout) { auto tx_ntp_opt = co_await ntp_for_tx_id(request.transactional_id); if (!tx_ntp_opt) { + vlog( + txlog.trace, + "[tx_id={}] unable to find ntp, producer_id: {}, epoch: {}", + request.transactional_id, + request.producer_id, + request.producer_epoch); co_return make_add_partitions_error_response( request, tx_errc::coordinator_not_available); } @@ -1512,11 +1566,23 @@ ss::future tx_gateway_frontend::add_partition_to_tx( auto shard = _shard_table.local().shard_for(tx_ntp); if (shard == std::nullopt) { - vlog(txlog.trace, "can't find a shard for {}", tx_ntp); + vlog( + txlog.trace, + "[tx_id={}] can't find a shard for {}, producer_id: {}, epoch: {}", + request.transactional_id, + tx_ntp, + request.producer_id, + request.producer_epoch); co_return make_add_partitions_error_response( request, tx_errc::coordinator_not_available); } - + vlog( + txlog.trace, + "[tx_id={}] adding partition to tx. pid: {}, epoch: {}, topics: {}", + request.transactional_id, + request.producer_id, + request.producer_epoch, + request.topics); co_return co_await container().invoke_on( *shard, _ssg, @@ -1587,10 +1653,10 @@ ss::future tx_gateway_frontend::do_add_partition_to_tx( if (!r.has_value()) { vlog( txlog.trace, - "got {} on pulling ongoing tx:{} pid:{}", - r.error(), + "[tx_id={}] error getting ongoing transaction for pid: {} - {}", request.transactional_id, - pid); + pid, + r.error()); co_return make_add_partitions_error_response(request, r.error()); } auto tx = r.value(); @@ -1696,7 +1762,12 @@ ss::future tx_gateway_frontend::do_add_partition_to_tx( auto status = co_await stm->add_partitions(term, tx.id, partitions); auto has_added = status == tm_stm::op_status::success; if (!has_added) { - vlog(txlog.warn, "adding partitions failed with {}", status); + vlog( + txlog.warn, + "[tx_id={}] adding partitions failed pid: {} - {}", + request.transactional_id, + pid, + status); } for (auto& br : brs) { auto topic_it = std::find_if( @@ -1712,7 +1783,10 @@ ss::future tx_gateway_frontend::do_add_partition_to_tx( if (br.ec != tx_errc::none) { vlog( txlog.warn, - "begin_tx request to {} failed with {}", + "[tx_id={}] begin_tx request for pid: {} at ntp: {} failed - " + "{}", + request.transactional_id, + pid, br.ntp, br.ec); } @@ -1725,8 +1799,24 @@ ss::future tx_gateway_frontend::do_add_partition_to_tx( ss::future tx_gateway_frontend::add_offsets_to_tx( add_offsets_tx_request request, model::timeout_clock::duration timeout) { + vlog( + txlog.trace, + "[tx_id={}] adding offsets to tx, group_id: {}, producer id: " + "{}, producer epoch: {}", + request.transactional_id, + request.group_id, + request.producer_id, + request.producer_epoch); + auto tx_ntp_opt = co_await ntp_for_tx_id(request.transactional_id); if (!tx_ntp_opt) { + vlog( + txlog.warn, + "[tx_id={}] unable to find coordinator ntp, producer id: {}, " + "producer epoch: {}", + request.transactional_id, + request.producer_id, + request.producer_epoch); co_return add_offsets_tx_reply{ .error_code = tx_errc::coordinator_not_available}; } @@ -1734,7 +1824,14 @@ ss::future tx_gateway_frontend::add_offsets_to_tx( auto shard = _shard_table.local().shard_for(tx_ntp); if (shard == std::nullopt) { - vlog(txlog.warn, "can't find a shard for {}", tx_ntp); + vlog( + txlog.warn, + "[tx_id={}] can't find a shard for {} producer id: {}, " + "producer epoch: {}", + request.transactional_id, + tx_ntp, + request.producer_id, + request.producer_epoch); co_return add_offsets_tx_reply{ .error_code = tx_errc::coordinator_not_available}; } @@ -1804,11 +1901,11 @@ ss::future tx_gateway_frontend::do_add_offsets_to_tx( term, stm, pid, request.transactional_id, timeout); if (!r.has_value()) { vlog( - txlog.trace, - "got {} on pulling ongoing tx for pid:{} {}", - r.error(), + txlog.warn, + "[tx_id={}] error getting ongoing transaction for pid: {} - {}", + request.transactional_id, pid, - request.transactional_id); + r.error()); co_return add_offsets_tx_reply{.error_code = r.error()}; } auto tx = r.value(); @@ -1816,7 +1913,15 @@ ss::future tx_gateway_frontend::do_add_offsets_to_tx( auto group_info = co_await _rm_group_proxy->begin_group_tx( request.group_id, pid, tx.tx_seq, tx.timeout_ms, stm->get_partition()); if (group_info.ec != tx_errc::none) { - vlog(txlog.warn, "error on begining group tx: {}", group_info.ec); + vlog( + txlog.warn, + "[tx_id={}] error starting group transaction for pid: {}, group: {} " + "- {}", + request.transactional_id, + pid, + request.group_id, + group_info.ec); + co_return add_offsets_tx_reply{.error_code = group_info.ec}; } @@ -1824,7 +1929,12 @@ ss::future tx_gateway_frontend::do_add_offsets_to_tx( term, tx.id, request.group_id, group_info.etag); auto has_added = status == tm_stm::op_status::success; if (!has_added) { - vlog(txlog.warn, "can't add group to tm_stm: {}", status); + vlog( + txlog.warn, + "[tx_id={}] error adding group to tm_stm for pid: {} group: {}", + request.transactional_id, + pid, + request.group_id); co_return add_offsets_tx_reply{ .error_code = tx_errc::invalid_txn_state}; } @@ -1833,8 +1943,24 @@ ss::future tx_gateway_frontend::do_add_offsets_to_tx( ss::future tx_gateway_frontend::end_txn( end_tx_request request, model::timeout_clock::duration timeout) { + vlog( + txlog.trace, + "[tx_id={}] end transaction. producer id: {}, producer epoch: {}, " + "committed: {}", + request.transactional_id, + request.producer_id, + request.producer_epoch, + request.committed); + auto tx_ntp_opt = co_await ntp_for_tx_id(request.transactional_id); if (!tx_ntp_opt) { + vlog( + txlog.trace, + "[tx_id={}] can not find coordinator ntp, producer id: {}, producer " + "epoch: {}", + request.transactional_id, + request.producer_id, + request.producer_epoch); co_return end_tx_reply{ .error_code = tx_errc::coordinator_not_available}; } @@ -1842,7 +1968,14 @@ ss::future tx_gateway_frontend::end_txn( auto shard = _shard_table.local().shard_for(tx_ntp); if (shard == std::nullopt) { - vlog(txlog.warn, "can't find a shard for {}", tx_ntp); + vlog( + txlog.warn, + "[tx_id={}] can't find a shard for {}, producer id: {}, producer " + "epoch: {}", + request.transactional_id, + tx_ntp, + request.producer_id, + request.producer_epoch); co_return end_tx_reply{ .error_code = tx_errc::coordinator_not_available}; } @@ -1874,11 +2007,11 @@ ss::future tx_gateway_frontend::do_end_txn( model::producer_identity pid{ request.producer_id, request.producer_epoch}; vlog( - txlog.trace, - "got {} on pulling a stm for tx:{} pid:{}", - r.error(), + txlog.warn, + "[tx_id={}] error getting transaction from coordinator pid: {} - {}", request.transactional_id, - pid); + pid, + r.error()); return ss::make_ready_future( end_tx_reply{.error_code = r.error()}); } @@ -1897,7 +2030,7 @@ ss::future tx_gateway_frontend::do_end_txn( return ss::make_ready_future( end_tx_reply{.error_code = tx_errc::coordinator_not_available}); } - vlog(txlog.trace, "re-entered tm_stm's gate"); + auto h = stm->gate().hold(); ssx::spawn_with_gate( @@ -1939,13 +2072,13 @@ ss::future tx_gateway_frontend::do_end_txn( if (!outcome->available()) { vlog( txlog.warn, - "outcome for tx:{} pid:{} isn't set", + "[tx_id={}] outcome for transaction " + "is missing, pid: {}", tx_id, pid); outcome->set_value( tx_errc::unknown_server_error); } - vlog(txlog.trace, "left tm_stm's gate"); }); }) .finally([u = std::move(unit)] {}); @@ -1971,7 +2104,7 @@ tx_gateway_frontend::do_end_txn( } catch (...) { vlog( txlog.warn, - "sync on end_txn for tx:{} pid:{} failed with {}", + "[tx_id={}] sync on end_txn failed pid: {} - {}", request.transactional_id, pid, std::current_exception()); @@ -1985,10 +2118,10 @@ tx_gateway_frontend::do_end_txn( } vlog( txlog.warn, - "sync on end_txn for tx:{} pid:{} failed with {}", + "[tx_id={}] sync on end_txn failed pid: {} - {}", request.transactional_id, pid, - term_opt.error()); + std::current_exception()); outcome->set_value(tx_errc::invalid_txn_state); co_return tx_errc::invalid_txn_state; } @@ -2005,7 +2138,9 @@ tx_gateway_frontend::do_end_txn( if (err == tx_errc::tx_not_found) { vlog( txlog.warn, - "can't find an ongoing tx:{} pid:{} to commit / abort", + "[tx_id={}] can't find an ongoing transaction pid: {} to commit " + "/ " + "abort", request.transactional_id, pid); err = tx_errc::unknown_server_error; @@ -2020,8 +2155,8 @@ tx_gateway_frontend::do_end_txn( if (tx.status == tm_transaction::tx_status::killed) { vlog( txlog.warn, - "can't commit an expired tx:{} pid:{} etag:{} tx_seq:{} in " - "term:{}", + "[tx_id={}] can't commit an expired transaction pid: {} etag: {} " + "tx_seq: {} in term: {}", tx.id, tx.pid, tx.etag, @@ -2038,10 +2173,11 @@ tx_gateway_frontend::do_end_txn( } catch (...) { vlog( txlog.error, - "committing a tx:{} etag:{} pid:{} tx_seq:{} failed with {}", + "[tx_id={}] error committing transaction pid: {} etag: {} " + "tx_seq: {} - {}", tx.id, - tx.etag, tx.pid, + tx.etag, tx.tx_seq, std::current_exception()); if (!outcome->available()) { @@ -2052,8 +2188,8 @@ tx_gateway_frontend::do_end_txn( } else { vlog( txlog.warn, - "an ongoing tx:{} pid:{} etag:{} tx_seq:{} in term:{} has " - "unexpected status: {}", + "[tx_id={}] an ongoing transaction pid: {} etag: {} tx_seq: {} " + "in term: {} has an unexpected status: {}", tx.id, tx.pid, tx.etag, @@ -2067,8 +2203,8 @@ tx_gateway_frontend::do_end_txn( if (tx.status == tm_transaction::tx_status::killed) { vlog( txlog.warn, - "can't abort an expired tx:{} pid:{} etag:{} tx_seq:{} in " - "term:{}", + "[tx_id={}] can't abort an expired transaction pid: {} etag: {} " + "tx_seq: {} in term: {}", tx.id, tx.pid, tx.etag, @@ -2082,10 +2218,11 @@ tx_gateway_frontend::do_end_txn( } catch (...) { vlog( txlog.error, - "aborting a tx:{} etag:{} pid:{} tx_seq:{} failed with {}", + "[tx_id={}] error aborting transaction pid: {} etag: {} tx_seq: " + "{} - {}", tx.id, - tx.etag, tx.pid, + tx.etag, tx.tx_seq, std::current_exception()); outcome->set_value(tx_errc::unknown_server_error); @@ -2096,11 +2233,12 @@ tx_gateway_frontend::do_end_txn( } else { auto ec = r.error(); vlog( - txlog.info, - "aborting a tx:{} etag:{} pid:{} tx_seq:{} failed with {}", + txlog.error, + "[tx_id={}] error aborting transaction pid: {} etag: {} tx_seq: " + "{} - {}", tx.id, - tx.etag, tx.pid, + tx.etag, tx.tx_seq, ec); outcome->set_value(ec); @@ -2117,7 +2255,7 @@ tx_gateway_frontend::do_end_txn( if (!ongoing_tx.has_value()) { vlog( txlog.warn, - "can't mark {} as ongoing error:{}", + "[tx_id={}] can't mark transaction as ongoing - {}", tx.id, ongoing_tx.error()); co_return tx_errc::unknown_server_error; @@ -2146,17 +2284,17 @@ tx_gateway_frontend::remove_deleted_partitions_from_tx( if (result) { vlog( txlog.info, - "Deleted non existent partition {} from tx {}", - part.ntp, - tx); + "[tx_id={}] Deleted non existent partition {} from transaction", + tx.id, + part.ntp); tx = result.value(); } else { vlog( txlog.debug, - "Error {} deleting partition {} from tx {}", - result.error(), + "[tx_id={}] Error deleting partition {} from transaction - {}", + tx.id, part.ntp, - tx); + result.error()); break; } } @@ -2172,12 +2310,12 @@ tx_gateway_frontend::do_abort_tm_tx( if (!stm->is_actual_term(expected_term)) { vlog( txlog.trace, - "txn coordinator isn't synced with term:{} tx:{} etag:{} pid:{} " - "tx_seq:{}", - expected_term, + "[tx_id={}] txn coordinator isn't synced with term: {} pid: {} etag: " + "{} tx_seq: {}", tx.id, - tx.etag, + expected_term, tx.pid, + tx.etag, tx.tx_seq); co_return tx_errc::not_coordinator; } @@ -2192,12 +2330,12 @@ tx_gateway_frontend::do_abort_tm_tx( && tx.status != tm_transaction::tx_status::preparing) { vlog( txlog.warn, - "abort encontered a tx with unexpected status:{} (tx:{} etag:{} " - "pid:{} tx_seq:{}) in term:{}", - tx.status, + "[tx_id={}] abort encountered a transaction with unexpected status: " + "{}, pid: {}, etag: {} tx_seq: {} in term: {}", tx.id, - tx.etag, + tx.status, tx.pid, + tx.etag, tx.tx_seq, expected_term); co_return tx_errc::invalid_txn_state; @@ -2208,15 +2346,15 @@ tx_gateway_frontend::do_abort_tm_tx( if (is_fetch_tx_supported()) { vlog( txlog.trace, - "aborting tx:{} etag:{} pid:{} tx_seq:{} status:{} failed with " - "{} in term:{}", + "[tx_id={}] aborting transaction pid: {} etag: {} tx_seq: {} " + "status: {} in term: {} failed - {}", tx.id, - tx.etag, tx.pid, + tx.etag, tx.tx_seq, tx.status, - changed_tx.error(), - expected_term); + expected_term, + changed_tx.error()); co_return tx_errc::coordinator_not_available; } else { if (changed_tx.error() == tm_stm::op_status::not_leader) { @@ -2333,7 +2471,8 @@ tx_gateway_frontend::do_commit_tm_tx( vlog( txlog.trace, - "marking tx_id:{} pid:{} tx_seq:{} etag:{} as prepared in term:{}", + "[tx_id={}] marking transaction pid: {} tx_seq: {} etag: {} as prepared " + "in term: {}", tx.id, tx.pid, tx.tx_seq, @@ -2343,11 +2482,13 @@ tx_gateway_frontend::do_commit_tm_tx( if (!changed_tx.has_value()) { vlog( txlog.trace, - "got {} on committing {} pid:{} tx_seq:{}", - changed_tx.error(), + "[tx_id={}] error committing transaction pid: {} etag: {} tx_seq: {} " + "- {}", tx.id, tx.pid, - tx.tx_seq); + tx.etag, + tx.tx_seq, + changed_tx.error()); if (!is_fetch_tx_supported()) { if (changed_tx.error() == tm_stm::op_status::not_leader) { @@ -2412,8 +2553,9 @@ tx_gateway_frontend::recommit_tm_tx( rejected = true; vlog( txlog.warn, - "commit_tx on consumer groups tx:{} etag:{} pid:{} tx_seq:{} " - "status:{} in term:{} was rejected", + "[tx_id={}] commit_tx on consumer groups etag: {} pid: {} " + "tx_seq: {} " + "status: {} in term: {} was rejected", tx.id, tx.etag, tx.pid, @@ -2424,8 +2566,8 @@ tx_gateway_frontend::recommit_tm_tx( failed = true; vlog( txlog.trace, - "commit_tx on consumer groups tx:{} etag:{} pid:{} tx_seq:{} " - "status:{} in term:{} failed with {}", + "[tx_id={}] commit_tx on consumer groups etag: {} pid: {} " + "tx_seq: {} status: {} in term: {} failed with {}", tx.id, tx.etag, tx.pid, @@ -2442,8 +2584,8 @@ tx_gateway_frontend::recommit_tm_tx( rejected = true; vlog( txlog.warn, - "commit_tx on data partition tx:{} etag:{} pid:{} tx_seq:{} " - "status:{} in term:{} was rejected", + "[tx_id={}] commit_tx on data partition etag: {} pid: {} " + "tx_seq: {} status: {} in term: {} was rejected", tx.id, tx.etag, tx.pid, @@ -2454,8 +2596,8 @@ tx_gateway_frontend::recommit_tm_tx( failed = true; vlog( txlog.trace, - "commit_tx on data partition tx:{} etag:{} pid:{} " - "tx_seq:{} status:{} in term:{} failed with {}", + "[tx_id={}] commit_tx on data partition etag: {} pid: {} " + "tx_seq: {} status: {} in term: {} failed with {}", tx.id, tx.etag, tx.pid, @@ -2478,8 +2620,8 @@ tx_gateway_frontend::recommit_tm_tx( // request won't be ever processed vlog( txlog.warn, - "remote commit of tx:{} etag:{} pid:{} tx_seq:{} in term:{} " - "is rejected", + "[tx_id={}] remote commit etag: {} pid: {} tx_seq: {} in term: " + "{} rejected", tx.id, tx.etag, tx.pid, @@ -2490,7 +2632,13 @@ tx_gateway_frontend::recommit_tm_tx( tx = co_await remove_deleted_partitions_from_tx(stm, expected_term, tx); if (co_await sleep_abortable(delay_ms, _as)) { - vlog(txlog.trace, "retrying re-commit pid:{}", tx.pid); + vlog( + txlog.trace, + "[tx_id={}] retrying re-commit etag: {} pid: {} tx_seq: {}", + tx.id, + tx.etag, + tx.pid, + tx.tx_seq); } else { break; } @@ -2498,7 +2646,7 @@ tx_gateway_frontend::recommit_tm_tx( if (!done) { vlog( txlog.warn, - "remote commit of tx:{} etag:{} pid:{} tx_seq:{} in term:{} " + "[tx_id={}] remote commit etag: {} pid: {} tx_seq: {} in term: {} " "failed", tx.id, tx.etag, @@ -2542,8 +2690,8 @@ tx_gateway_frontend::reabort_tm_tx( rejected = true; vlog( txlog.warn, - "abort_tx on data partition tx:{} etag:{} pid:{} " - "tx_seq:{} status:{} in term:{} was rejected", + "[tx_id={}] abort_tx on data partition etag: {} pid: {} " + "tx_seq: {} status: {} in term: {} was rejected", tx.id, tx.etag, tx.pid, @@ -2553,9 +2701,9 @@ tx_gateway_frontend::reabort_tm_tx( } else if (r.ec != tx_errc::none) { failed = true; vlog( - txlog.trace, - "abort_tx on data partition tx:{} etag:{} pid:{} " - "tx_seq:{} status:{} in term:{} failed with {}", + txlog.info, + "[tx_id={}] abort_tx on data partition etag: {} pid: {} " + "tx_seq: {} status: {} in term: {} failed with {}", tx.id, tx.etag, tx.pid, @@ -2571,8 +2719,8 @@ tx_gateway_frontend::reabort_tm_tx( rejected = true; vlog( txlog.warn, - "abort_tx on consumer groups tx:{} etag:{} pid:{} " - "tx_seq:{} status:{} in term:{} was rejected", + "[tx_id={}] abort_tx on consumer groups etag: {} pid: {} " + "tx_seq: {} status: {} in term: {} was rejected", tx.id, tx.etag, tx.pid, @@ -2583,8 +2731,8 @@ tx_gateway_frontend::reabort_tm_tx( failed = true; vlog( txlog.trace, - "abort_tx on consumer groups tx:{} etag:{} pid:{} " - "tx_seq:{} status:{} in term:{} failed with {}", + "[tx_id={}] abort_tx on consumer groups etag: {} pid: {} " + "tx_seq: {} status: {} in term: {} failed with {}", tx.id, tx.etag, tx.pid, @@ -2602,7 +2750,7 @@ tx_gateway_frontend::reabort_tm_tx( if (rejected && !failed) { vlog( txlog.warn, - "remote abort of tx:{} etag:{} pid:{} tx_seq:{} in term:{} " + "[tx_id={}] remote abort etag: {} pid: {} tx_seq: {} in term: {} " "was rejected", tx.id, tx.etag, @@ -2619,7 +2767,7 @@ tx_gateway_frontend::reabort_tm_tx( if (!done) { vlog( txlog.warn, - "remote abort of tx:{} etag:{} pid:{} tx_seq:{} in term:{} " + "[tx_id={}] remote abort etag: {} pid: {} tx_seq: {} in term: {} " "failed", tx.id, tx.etag, @@ -2649,21 +2797,17 @@ ss::future> tx_gateway_frontend::bump_etag( if (!r1.has_value()) { // we invoke bump when we have the most up-to-date info // so reject isn't possible + vlog( + txlog.error, + "[tx_id={}] error rolling previous transaction with status: {} - " + "{}", + tx.id, + tx.status, + r1.error()); if (r1.error() == tx_errc::request_rejected) { - vlog( - txlog.error, - "got error {} on rolling previous tx.id={} with status={}", - r1.error(), - tx.id, - tx.status); co_return tx_errc::invalid_txn_state; } - vlog( - txlog.warn, - "got error {} on rolling previous tx.id={} with status={}", - r1.error(), - tx.id, - tx.status); + // until any decision is made it's ok to ask user retry co_return tx_errc::not_coordinator; } @@ -2673,10 +2817,10 @@ ss::future> tx_gateway_frontend::bump_etag( auto r2 = co_await stm->update_tx(tx, term); if (!r2) { vlog( - txlog.info, - "got {} on bumping etag on reading {}", - r1.error(), - tx.id); + txlog.error, + "[tx_id={}] error bumping etag - {}", + tx.id, + r2.error()); co_return tx_errc::not_coordinator; } co_return r2.value(); @@ -2705,10 +2849,10 @@ ss::future> tx_gateway_frontend::forget_tx( if (!r1.has_value() && r1.error() != tx_errc::request_rejected) { vlog( txlog.warn, - "got error {} on rolling previous tx.id={} with status={}", - r1.error(), + "[tx_id={}] error rolling previous id with status: {} - {}", tx.id, - tx.status); + tx.status, + r1.error()); // until any decision is made it's ok to ask user retry co_return tx_errc::not_coordinator; @@ -2716,7 +2860,8 @@ ss::future> tx_gateway_frontend::forget_tx( auto ec = co_await stm->expire_tx(term, tx.id); if (ec != tm_stm::op_status::success) { - vlog(txlog.warn, "got error {} on expiring tx.id={}", ec, tx.id); + vlog( + txlog.warn, "[tx_id={}] error expiring transaction - {}", tx.id, ec); co_return tx_errc::not_coordinator; } @@ -2736,9 +2881,9 @@ ss::future> tx_gateway_frontend::get_tx( } else { vlog( txlog.warn, - "got error {} on lookin up tx.id={}", - tx_opt.error(), - tid); + "[tx_id={}] error getting transaction - {}", + tid, + tx_opt.error()); // any error on lookin up a tx is a retriable error co_return tx_errc::not_coordinator; } @@ -2751,9 +2896,9 @@ ss::future> tx_gateway_frontend::get_tx( if (!tx_opt.has_value()) { vlog( txlog.warn, - "got error {} on rehydrating tx.id={}", - tx_opt.error(), - tid); + "[tx_id={}] error rehydrating transaction info - {}", + tid, + tx_opt.error()); // any error on lookin up a tx is a retriable error co_return tx_errc::not_coordinator; } @@ -2794,10 +2939,12 @@ ss::future> tx_gateway_frontend::get_tx( if (!r1.has_value()) { vlog( txlog.warn, - "got error {} on rolling previous tx.id={} with status={}", - r1.error(), - tx.id, - tx.status); + "[tx_id={}] error rolling previous transaction id with status: " + "{} - {}", + tid, + tx.status, + r1.error()); + // until any decision is made it's ok to ask user retry co_return tx_errc::not_coordinator; } @@ -2808,13 +2955,10 @@ ss::future> tx_gateway_frontend::get_tx( if (!r1) { vlog( txlog.warn, - "Can't fetch cached state of the persisted tx (pid:{} etag:{} " - "tx_seq:{} status:{}); true state is unknown, terminating session to " - "avoid data loss", - tx.pid, - tx.etag, - tx.tx_seq, - tx.status); + "[tx_id={}] Can't fetch cached state of the persisted tx: {}, true " + "state is unknown, terminating session to avoid data loss", + tx.id, + tx); co_return co_await forget_tx(term, stm, tx, timeout); } @@ -2830,9 +2974,10 @@ ss::future> tx_gateway_frontend::get_tx( if (old_tx.pid != tx.pid) { vlog( txlog.warn, - "A cached tx (tx:{} pid:{} etag:{} tx_seq:{} status:{}) of " - "the persisted (pid:{} etag:{} tx_seq:{} status:{}) should " - "have same pid; terminating session", + "[tx_id={}] A cached tx (tx: {} pid: {} etag: {} tx_seq: {} " + "status: {}) of the persisted (pid: {} etag: {} tx_seq: {} " + "status: {}) should have the same pid, terminating session", + tx.id, old_tx.id, old_tx.pid, old_tx.etag, @@ -2850,10 +2995,11 @@ ss::future> tx_gateway_frontend::get_tx( if (old_tx.status != tm_transaction::tx_status::ongoing) { vlog( txlog.warn, - "A cached tx (tx:{} pid:{} etag:{} tx_seq:{} status:{}) " - "of the persisted (pid:{} etag:{} tx_seq:{} status:{}) " - "may be in the tx seq future only if it's ongoing; " - "terminating session", + "[tx_id={}] A cached tx (tx: {} pid: {} etag: {} tx_seq: " + "{} status: {}) of the persisted (pid: {} etag: {} " + "tx_seq: {} status: {}) may be in the tx seq future only " + "if it's ongoing, terminating session", + tx.id, old_tx.id, old_tx.pid, old_tx.etag, @@ -2869,10 +3015,11 @@ ss::future> tx_gateway_frontend::get_tx( if (old_tx.status != tx.status) { vlog( txlog.warn, - "A cached tx (tx:{} pid:{} etag:{} tx_seq:{} status:{}) " - "of the persisted (pid:{} etag:{} tx_seq:{} status:{}) " - "with the same tx seq must have same status; terminating " - "session", + "[tx_id={}] A cached tx (tx: {} pid: {} etag: {} tx_seq: " + "{} status: {}) of the persisted (pid: {} etag: {} " + "tx_seq: {} status: {}) with the same tx seq must have " + "same status, terminating session", + tx.id, old_tx.id, old_tx.pid, old_tx.etag, @@ -2882,6 +3029,7 @@ ss::future> tx_gateway_frontend::get_tx( tx.etag, tx.tx_seq, tx.status); + co_return co_await forget_tx(term, stm, tx, timeout); } } @@ -2893,9 +3041,10 @@ ss::future> tx_gateway_frontend::get_tx( if (old_tx.pid != tx.pid) { vlog( txlog.warn, - "A cached tx (tx:{} pid:{} etag:{} tx_seq:{} status:{}) of the " - "persisted (pid:{} etag:{} tx_seq:{} status:{}) should have same " - "pid; terminating session", + "[tx_id={}] A cached tx (tx: {} pid: {} etag: {} tx_seq: " + "{} status: {}) of the persisted (pid: {} etag: {} " + "tx_seq: {} status: {}) should have same pid, terminating session", + tx.id, old_tx.id, old_tx.pid, old_tx.etag, @@ -2905,6 +3054,7 @@ ss::future> tx_gateway_frontend::get_tx( tx.etag, tx.tx_seq, tx.status); + co_return co_await forget_tx(term, stm, tx, timeout); } @@ -2921,10 +3071,11 @@ ss::future> tx_gateway_frontend::get_tx( if (old_tx.status != tm_transaction::tx_status::ongoing) { vlog( txlog.warn, - "A cached tx (tx:{} pid:{} etag:{} tx_seq:{} status:{}) " - "of the persisted (pid:{} etag:{} tx_seq:{} status:{}) " - "may be in the tx seq future only if it's ongoing; " - "terminating session", + "[tx_id={}] A cached tx (tx: {} pid: {} etag: {} tx_seq: " + "{} status: {}) of the persisted (pid: {} etag: {} " + "tx_seq: {} status: {}) may be in the tx seq future only " + "if it's ongoing, terminating session", + tx.id, old_tx.id, old_tx.pid, old_tx.etag, @@ -2944,17 +3095,20 @@ ss::future> tx_gateway_frontend::get_tx( && old_tx.status != tm_transaction::tx_status::aborting) { vlog( txlog.warn, - "A cached tx (tx:{} pid:{} etag:{} tx_seq:{} status:{}) " - "of the persisted (pid:{} etag:{} tx_seq:{} status:{}) " - "with the same tx seq must have either ongoing or " - "aborting status; terminating session", + "[tx_id={}] A cached tx (tx: {} pid: {} etag: {} tx_seq: " + "{} status: {}) of the persisted (pid: {} etag: {} " + "tx_seq: {} status: {}) with the same tx seq must have " + "either ongoing or aborting status; terminating session", + tx.id, old_tx.id, old_tx.pid, old_tx.etag, old_tx.tx_seq, + old_tx.status, tx.pid, tx.etag, - tx.tx_seq); + tx.tx_seq, + tx.status); co_return co_await forget_tx(term, stm, tx, timeout); } old_tx = tx; @@ -2977,10 +3131,11 @@ ss::future> tx_gateway_frontend::get_tx( if (old_tx.status != tm_transaction::tx_status::ongoing) { vlog( txlog.warn, - "A cached tx (tx:{} pid:{} etag:{} tx_seq:{} status:{}) " - "of the persisted (pid:{} etag:{} tx_seq:{} status:{}) " - "may be in the tx seq future only if it's ongoing; " - "terminating session", + "[tx_id={}] A cached tx (tx: {} pid: {} etag: {} tx_seq: " + "{} status: {}) of the persisted (pid: {} etag: {} " + "tx_seq: {} status: {}) may be in the tx seq future only " + "if it's ongoing, terminating session", + tx.id, old_tx.id, old_tx.pid, old_tx.etag, @@ -3000,17 +3155,20 @@ ss::future> tx_gateway_frontend::get_tx( && old_tx.status != tm_transaction::tx_status::prepared) { vlog( txlog.warn, - "A cached tx (tx:{} pid:{} etag:{} tx_seq:{} status:{}) " - "of the persisted (pid:{} etag:{} tx_seq:{} status:{}) " - "with the same tx seq must have either ongoing or " - "prepared status; terminating session", + "[tx_id={}] A cached tx (tx: {} pid: {} etag: {} tx_seq: " + "{} status: {}) of the persisted (pid: {} etag: {} " + "tx_seq: {} status: {}) with the same tx seq must have " + "either ongoing or prepared status, terminating session", + tx.id, old_tx.id, old_tx.pid, old_tx.etag, old_tx.tx_seq, + old_tx.status, tx.pid, tx.etag, - tx.tx_seq); + tx.tx_seq, + tx.status); co_return co_await forget_tx(term, stm, tx, timeout); } old_tx = tx; @@ -3027,9 +3185,11 @@ ss::future> tx_gateway_frontend::get_tx( // the same vlog( txlog.warn, - "A cached tx (tx:{} pid:{} etag:{} tx_seq:{} status:{}) of the " - "persisted (pid:{} etag:{} tx_seq:{} status:{}) should have same " - "txseq and status; terminating session", + "[tx_id={}] A cached tx (tx: {} pid: {} etag: {} tx_seq: " + "{} status: {}) of the persisted (pid: {} etag: {} " + "tx_seq: {} status: {}) should have the same tx_seq and status, " + "terminating session", + tx.id, old_tx.id, old_tx.pid, old_tx.etag, @@ -3049,8 +3209,9 @@ ss::future> tx_gateway_frontend::get_tx( vlog( txlog.warn, - "A persisted tx (pid:{} etag:{} tx_seq:{} status:{}) has unexpected " - "status", + "[tx_id={}] A persisted tx (pid: {} etag: {} tx_seq: {} status: {}) has " + "unexpected status", + tx.id, tx.pid, tx.etag, tx.tx_seq, @@ -3065,6 +3226,12 @@ ss::future> tx_gateway_frontend::get_latest_tx( model::producer_identity pid, kafka::transactional_id tx_id, model::timeout_clock::duration timeout) { + vlog( + txlog.trace, + "[tx_id={}] Getting latest tx for pid: {} in term: {}", + tx_id, + pid, + term); auto r0 = co_await get_tx(term, stm, tx_id, timeout); if (!r0.has_value()) { co_return r0.error(); @@ -3082,14 +3249,19 @@ ss::future> tx_gateway_frontend::get_latest_tx( if (tx.pid.id == pid.id && tx.pid.epoch > pid.epoch) { vlog( txlog.info, - "Producer {} (tx:{}) is fenced off by {}", - pid, + "[tx_id={}] producer {} is fenced of by {}", tx_id, + pid, tx.pid); co_return tx_errc::fenced; } + vlog( + txlog.info, + "[tx_id={}] transaction is mapped to {} not {}", + tx_id, + tx.pid, + pid); - vlog(txlog.info, "tx:{} is mapped to {} not {}", tx_id, tx.pid, pid); co_return tx_errc::invalid_producer_id_mapping; } @@ -3103,6 +3275,11 @@ tx_gateway_frontend::get_ongoing_tx( model::producer_identity pid, kafka::transactional_id tx_id, model::timeout_clock::duration timeout) { + vlog( + txlog.trace, + "[tx_id={}] Getting ongoing transactions for pid: {}", + tx_id, + pid); auto r0 = co_await get_latest_tx(term, stm, pid, tx_id, timeout); if (!r0.has_value()) { if (r0.error() == tx_errc::tx_not_found) { @@ -3133,7 +3310,8 @@ tx_gateway_frontend::get_ongoing_tx( // start a new transactions vlog( txlog.warn, - "can't modify an expired tx:{} pid:{} etag:{} tx_seq:{} in term:{}", + "[tx_id={} can't modify an expired transaction pid: {} etag: {} " + "tx_seq: {} in term: {}", tx.id, tx.pid, tx.etag, @@ -3157,9 +3335,14 @@ tx_gateway_frontend::get_ongoing_tx( if (!ongoing_tx.has_value()) { vlog( txlog.warn, - "resetting tx as ongoing failed with {} pid:{}", - ongoing_tx.error(), - pid); + "[tx_id={}] failed resetting ongoing transaction pid: {} etag: " + "{} tx_seq: {} term: {}", + tx.id, + tx.pid, + tx.etag, + tx.tx_seq, + term); + co_return tx_errc::invalid_txn_state; } co_return ongoing_tx.value(); @@ -3247,14 +3430,14 @@ ss::future<> tx_gateway_frontend::expire_old_tx( if (term_opt.error() == tm_stm::op_status::not_leader) { vlog( txlog.trace, - "this node isn't a leader for tx.id={} coordinator", + "[tx_id={}] this node is not current coordinator", tx_id); } vlog( txlog.warn, - "got error {} on syncing state machine (loading tx.id={})", - term_opt.error(), - tx_id); + "[tx_id={}] error syncing state machine - {}", + tx_id, + term_opt.error()); co_return; } @@ -3284,7 +3467,8 @@ ss::future tx_gateway_frontend::do_expire_old_tx( vlog( txlog.trace, - "attempting to expire tx id:{} pid:{} tx_seq:{} status:{}", + "[tx_id={}] attempting to expire transaction pid: {} tx_seq: {} status: " + "{}", tx.id, tx.pid, tx.tx_seq, @@ -3296,7 +3480,11 @@ ss::future tx_gateway_frontend::do_expire_old_tx( r = co_await do_abort_tm_tx(term, stm, tx, timeout); } if (!r.has_value()) { - vlog(txlog.warn, "got error {} on aborting tx.id={}", r.error(), tx.id); + vlog( + txlog.warn, + "[tx_id={}] error aborting transaction - {}", + tx.id, + r.error()); co_return r.error(); } @@ -3304,7 +3492,8 @@ ss::future tx_gateway_frontend::do_expire_old_tx( // it will be retried and it's an idempotent operation auto ec = co_await stm->expire_tx(term, tx.id); if (ec != tm_stm::op_status::success) { - vlog(txlog.warn, "got error {} on expiring tx.id={}", ec, tx.id); + vlog( + txlog.warn, "[tx_id={}] error expiring transaction - {}", tx.id, ec); co_return tx_errc::not_coordinator; } @@ -3413,7 +3602,11 @@ tx_gateway_frontend::describe_tx(kafka::transactional_id tid) { auto shard = _shard_table.local().shard_for(tm_ntp); if (!shard.has_value()) { - vlog(txlog.warn, "can't find a shard for {}", tm_ntp); + vlog( + txlog.warn, + "[tx_id={}] transaction manager {} partition shard not found", + tid, + tm_ntp); co_return tx_errc::shard_not_found; } @@ -3426,8 +3619,10 @@ tx_gateway_frontend::describe_tx(kafka::transactional_id tid) { if (!partition) { vlog( txlog.warn, - "transaction manager partition: {} not found", + "[tx_id={}] transaction manager {} partition not found", + tid, tm_ntp); + return ss::make_ready_future>( tx_errc::partition_not_found); } @@ -3436,7 +3631,10 @@ tx_gateway_frontend::describe_tx(kafka::transactional_id tid) { if (!stm) { vlog( - txlog.error, "can't get tm stm of the {}' partition", tm_ntp); + txlog.warn, + "[tx_id={}] can not get transactional manager stm for {}", + tid, + tm_ntp); return ss::make_ready_future>( tx_errc::stm_not_found); } @@ -3489,7 +3687,11 @@ ss::future tx_gateway_frontend::delete_partition_from_tx( } auto shard = _shard_table.local().shard_for(tm_ntp.value()); if (shard == std::nullopt) { - vlog(txlog.warn, "can't find a shard for {}", tm_ntp.value()); + vlog( + txlog.warn, + "[tx_id={}] transaction manager {} partition shard not found", + tid, + tm_ntp); co_return tx_errc::shard_not_found; } @@ -3497,7 +3699,11 @@ ss::future tx_gateway_frontend::delete_partition_from_tx( *shard, _ssg, [tid, ntp, tm_ntp](tx_gateway_frontend& self) { auto partition = self._partition_manager.local().get(tm_ntp.value()); if (!partition) { - vlog(txlog.warn, "can't get partition by {} ntp", tm_ntp.value()); + vlog( + txlog.warn, + "[tx_id={}] transaction manager {} partition not found", + tid, + tm_ntp); return ss::make_ready_future(tx_errc::invalid_txn_state); } @@ -3506,8 +3712,9 @@ ss::future tx_gateway_frontend::delete_partition_from_tx( if (!stm) { vlog( txlog.warn, - "can't get tm stm of the {}' partition", - tm_ntp.value()); + "[tx_id={}] can not get transactional manager stm for {}", + tid, + tm_ntp); return ss::make_ready_future(tx_errc::invalid_txn_state); } diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index d1b8c2ad739e6..abb33ca5537ee 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -1083,6 +1083,45 @@ std::ostream& operator<<(std::ostream& o, const nt_revision& ntr) { return o; } +std::ostream& operator<<(std::ostream& o, const fetch_tx_reply& r) { + fmt::print( + o, + "{{ec: {}, pid: {}, last_pid: {}, tx_seq: {}, timeout_ms: {}, status: " + "{}, partitions: {}, groups: {}}}", + r.ec, + r.pid, + r.last_pid, + r.tx_seq, + r.timeout_ms.count(), + r.status, + r.partitions, + r.groups); + return o; +} + +std::ostream& operator<<(std::ostream& o, const fetch_tx_reply::tx_group& g) { + fmt::print(o, "{{etag: {}, group_id: {}}}", g.etag, g.group_id); + return o; +} + +std::ostream& +operator<<(std::ostream& o, const fetch_tx_reply::tx_partition& p) { + fmt::print( + o, + "{{etag: {}, ntp: {}, revision: {}}}", + p.etag, + p.ntp, + p.topic_revision); + + return o; +} + +std::ostream& +operator<<(std::ostream& o, const add_paritions_tx_request::topic& t) { + fmt::print(o, "{{topic: {}, partitions: {}}}", t.name, t.partitions); + return o; +} + } // namespace cluster namespace reflection { diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index 4c0aacd66c3a9..b2e471b86c5b5 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -306,6 +306,7 @@ struct add_paritions_tx_request { struct topic { model::topic name{}; std::vector partitions{}; + friend std::ostream& operator<<(std::ostream&, const topic&); }; kafka::transactional_id transactional_id{}; kafka::producer_id producer_id{}; @@ -425,7 +426,7 @@ struct fetch_tx_reply friend bool operator==(const tx_group&, const tx_group&) = default; - friend std::ostream& operator<<(std::ostream& o, const tx_partition& r); + friend std::ostream& operator<<(std::ostream& o, const tx_group& r); auto serde_fields() { return std::tie(group_id, etag); } }; diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index c8dc032d4819f..442b2b6a05ef2 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -1115,11 +1115,17 @@ consensus::remove_member(vnode node, model::revision_id new_revision) { ss::future consensus::replace_configuration( std::vector nodes, model::revision_id new_revision) { - return change_configuration([nodes = std::move(nodes), new_revision]( + return change_configuration([this, nodes = std::move(nodes), new_revision]( group_configuration current) mutable { + auto old = current; + current.set_version(raft::group_configuration::v_5); current.replace(nodes, new_revision); - + vlog( + _ctxlog.debug, + "Replacing current configuration: {} with new configuration: {}", + old, + current); return result{std::move(current)}; }); } diff --git a/src/v/raft/recovery_stm.cc b/src/v/raft/recovery_stm.cc index 534752562b341..af8f85db02fd2 100644 --- a/src/v/raft/recovery_stm.cc +++ b/src/v/raft/recovery_stm.cc @@ -445,6 +445,18 @@ ss::future<> recovery_stm::replicate( _stop_requested = true; return ss::now(); } + if (meta.value()->expected_log_end_offset >= _last_batch_offset) { + vlog( + _ctxlog.trace, + "follower expected log end offset is already updated, stopping " + "recovery. Expected log end offset: {}, recovery range last offset: " + "{}", + meta.value()->expected_log_end_offset, + _last_batch_offset); + + _stop_requested = true; + return ss::now(); + } /** * Update follower expected log end. It is equal to the last batch in a set * of batches read for this recovery round.