Skip to content

Commit

Permalink
Merge pull request #14448 from bharathv/v232x-more-tx-logging
Browse files Browse the repository at this point in the history
[v23.2.x] [backport] tx: add more trace logging in init_producer_id code path
  • Loading branch information
bharathv authored Oct 26, 2023
2 parents c2bf918 + 80ed295 commit b5a8b9a
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 92 deletions.
75 changes: 46 additions & 29 deletions src/v/cluster/tm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ tm_stm::tm_stm(
, _transactional_id_expiration(
config::shard_local_cfg().transactional_id_expiration_ms.value())
, _feature_table(feature_table)
, _cache(tm_stm_cache) {}
, _cache(tm_stm_cache)
, _ctx_log(logger, ssx::sformat("[{}]", _raft->ntp())) {}

ss::future<> tm_stm::start() { co_await persisted_stm::start(); }

Expand All @@ -136,6 +137,12 @@ ss::future<tm_stm::op_status> tm_stm::try_init_hosted_transactions(

auto units = co_await _cache->write_lock();

vlog(
_ctx_log.trace,
"initing hosted transactions, term: {}, partition count: {}",
term,
tx_coordinator_partition_amount);

model::partition_id partition = get_partition();
auto initial_hash_range = default_tm_hash_range(
partition, tx_coordinator_partition_amount);
Expand Down Expand Up @@ -244,7 +251,7 @@ ss::future<checked<model::term_id, tm_stm::op_status>> tm_stm::do_barrier() {
return term;
} catch (...) {
vlog(
txlog.error,
_ctx_log.error,
"Error during writing a barrier batch: {}",
std::current_exception());
return tm_stm::op_status::unknown;
Expand All @@ -261,7 +268,7 @@ ss::future<> tm_stm::checkpoint_ongoing_txs() {
size_t checkpointed_txes = 0;
for (auto& tx : txes_to_checkpoint) {
vlog(
txlog.trace,
_ctx_log.trace,
"transfering tx:{} etag:{} pid:{} tx_seq:{}",
tx.id,
tx.etag,
Expand All @@ -271,7 +278,7 @@ ss::future<> tm_stm::checkpoint_ongoing_txs() {
auto result = co_await update_tx(tx, tx.etag);
if (!result.has_value()) {
vlog(
txlog.warn,
_ctx_log.warn,
"Error {} transferring tx {} to new leader, transferred {}/{} "
"txns.",
result.error(),
Expand All @@ -286,13 +293,13 @@ ss::future<> tm_stm::checkpoint_ongoing_txs() {
checkpointed_txes++;
}
vlog(
txlog.info,
_ctx_log.info,
"Checkpointed all txes: {} to the new leader.",
txes_to_checkpoint.size());
}

ss::future<ss::basic_rwlock<>::holder> tm_stm::prepare_transfer_leadership() {
vlog(txlog.trace, "Preparing for leadership transfer");
vlog(_ctx_log.trace, "Preparing for leadership transfer");
auto units = co_await _cache->write_lock();
// This is a best effort basis, we checkpoint as many as we can
// and stop at the first error.
Expand Down Expand Up @@ -335,7 +342,7 @@ ss::future<tm_stm::op_status> tm_stm::do_update_hosted_transactions(

auto r = co_await replicate_quorum_ack(term, std::move(batch));
if (!r) {
vlog(txlog.info, "got error {} on updating hash_ranges", r.error());
vlog(_ctx_log.info, "got error {} on updating hash_ranges", r.error());
if (_c->is_leader() && _c->term() == term) {
co_await _c->step_down(
"txn coordinator update_hash_ranges replication error");
Expand All @@ -350,7 +357,7 @@ ss::future<tm_stm::op_status> tm_stm::do_update_hosted_transactions(
if (!co_await wait_no_throw(
offset, model::timeout_clock::now() + _sync_timeout)) {
vlog(
txlog.info,
_ctx_log.info,
"timeout on waiting until {} is applied on updating hash_ranges",
offset);
if (_c->is_leader() && _c->term() == term) {
Expand All @@ -360,7 +367,7 @@ ss::future<tm_stm::op_status> tm_stm::do_update_hosted_transactions(
}
if (_c->term() != term) {
vlog(
txlog.info,
_ctx_log.info,
"lost leadership while waiting until {} is applied on updating hash "
"ranges",
offset);
Expand All @@ -382,7 +389,7 @@ tm_stm::do_update_tx(tm_transaction tx, model::term_id term) {
auto r = co_await replicate_quorum_ack(term, std::move(batch));
if (!r) {
vlog(
txlog.info,
_ctx_log.info,
"got error {} on updating tx:{} pid:{} etag:{} tx_seq:{}",
r.error(),
tx.id,
Expand All @@ -403,7 +410,7 @@ tm_stm::do_update_tx(tm_transaction tx, model::term_id term) {
if (!co_await wait_no_throw(
offset, model::timeout_clock::now() + _sync_timeout)) {
vlog(
txlog.info,
_ctx_log.info,
"timeout on waiting until {} is applied on updating tx:{} pid:{} "
"tx_seq:{}",
offset,
Expand All @@ -417,7 +424,7 @@ tm_stm::do_update_tx(tm_transaction tx, model::term_id term) {
}
if (_c->term() != term) {
vlog(
txlog.info,
_ctx_log.info,
"lost leadership while waiting until {} is applied on updating tx:{} "
"pid:{} tx_seq:{}",
offset,
Expand All @@ -430,7 +437,7 @@ tm_stm::do_update_tx(tm_transaction tx, model::term_id term) {
auto tx_opt = _cache->find_log(tx.id);
if (!tx_opt) {
vlog(
txlog.warn,
_ctx_log.warn,
"can't find an updated tx:{} pid:{} tx_seq:{} in the cache",
tx.id,
tx.pid,
Expand Down Expand Up @@ -477,7 +484,7 @@ ss::future<checked<tm_transaction, tm_stm::op_status>> tm_stm::mark_tx_prepared(
auto tx_opt = co_await get_tx(tx_id);
if (!tx_opt.has_value()) {
vlog(
txlog.trace,
_ctx_log.trace,
"got {} on pulling tx {} to mark it prepared",
tx_opt.error(),
tx_id);
Expand All @@ -490,7 +497,7 @@ ss::future<checked<tm_transaction, tm_stm::op_status>> tm_stm::mark_tx_prepared(
: tm_transaction::tx_status::preparing;
if (tx.status != check_status) {
vlog(
txlog.warn,
_ctx_log.warn,
"can't mark tx:{} pid:{} tx_seq:{} prepared wrong status {} != {}",
tx.id,
tx.pid,
Expand Down Expand Up @@ -533,7 +540,7 @@ tm_stm::reset_transferring(model::term_id term, kafka::transactional_id tx_id) {
co_return tm_stm::op_status::conflict;
}
vlog(
txlog.trace,
_ctx_log.trace,
"observed a transferring tx:{} pid:{} etag:{} tx_seq:{} in term:{}",
tx_id,
tx.pid,
Expand All @@ -543,7 +550,7 @@ tm_stm::reset_transferring(model::term_id term, kafka::transactional_id tx_id) {
if (tx.etag == term) {
// case 1 - Unlikely, just reset the transferring flag.
vlog(
txlog.warn,
_ctx_log.warn,
"tx: {} transferring within same term: {}, resetting.",
tx_id,
tx.etag);
Expand All @@ -569,7 +576,7 @@ ss::future<checked<tm_transaction, tm_stm::op_status>> tm_stm::mark_tx_ongoing(
tm_transaction tx = tx_opt.value();
if (tx.etag != expected_term) {
vlog(
txlog.warn,
_ctx_log.warn,
"An attempt to update state data tx:{} pid:{} tx_seq:{} etag:{} "
"assuming etag is {}",
tx.id,
Expand All @@ -594,7 +601,7 @@ ss::future<tm_stm::op_status> tm_stm::re_register_producer(
std::chrono::milliseconds transaction_timeout_ms,
model::producer_identity pid,
model::producer_identity last_pid) {
vlog(txlog.trace, "Registering existing tx: id={}, pid={}", tx_id, pid);
vlog(_ctx_log.trace, "Registering existing tx: id={}, pid={}", tx_id, pid);

auto tx_opt = co_await get_tx(tx_id);
if (!tx_opt.has_value()) {
Expand Down Expand Up @@ -638,7 +645,7 @@ ss::future<tm_stm::op_status> tm_stm::do_register_new_producer(
kafka::transactional_id tx_id,
std::chrono::milliseconds transaction_timeout_ms,
model::producer_identity pid) {
vlog(txlog.trace, "Registering new tx: id={}, pid={}", tx_id, pid);
vlog(_ctx_log.trace, "Registering new tx: id={}, pid={}", tx_id, pid);

auto tx_opt = co_await get_tx(tx_id);
if (tx_opt.has_value()) {
Expand Down Expand Up @@ -687,13 +694,13 @@ ss::future<tm_stm::op_status> tm_stm::add_partitions(
std::vector<tm_transaction::tx_partition> partitions) {
auto tx_opt = find_tx(tx_id);
if (!tx_opt) {
vlog(txlog.warn, "An ongoing transaction tx:{} isn't found", tx_id);
vlog(_ctx_log.warn, "An ongoing transaction tx:{} isn't found", tx_id);
co_return tm_stm::op_status::unknown;
}
auto tx = tx_opt.value();
if (tx.status != tm_transaction::tx_status::ongoing) {
vlog(
txlog.warn,
_ctx_log.warn,
"Expected an ongoing txn, found tx:{} pid:{} tx_seq:{} etag:{} "
"status:{}",
tx.id,
Expand All @@ -705,7 +712,7 @@ ss::future<tm_stm::op_status> tm_stm::add_partitions(
}
if (tx.etag != expected_term) {
vlog(
txlog.warn,
_ctx_log.warn,
"An attempt to add partitions to tx:{} pid:{} tx_seq:{} etag:{} "
"assuming etag is {}",
tx.id,
Expand Down Expand Up @@ -750,13 +757,13 @@ ss::future<tm_stm::op_status> tm_stm::add_group(
model::term_id etag) {
auto tx_opt = find_tx(tx_id);
if (!tx_opt) {
vlog(txlog.warn, "An ongoing transaction tx:{} isn't found", tx_id);
vlog(_ctx_log.warn, "An ongoing transaction tx:{} isn't found", tx_id);
co_return tm_stm::op_status::unknown;
}
auto tx = tx_opt.value();
if (tx.status != tm_transaction::tx_status::ongoing) {
vlog(
txlog.warn,
_ctx_log.warn,
"Expected an ongoing txn, found tx:{} pid:{} tx_seq:{} etag:{} "
"status:{}",
tx.id,
Expand All @@ -768,7 +775,7 @@ ss::future<tm_stm::op_status> tm_stm::add_group(
}
if (tx.etag != expected_term) {
vlog(
txlog.warn,
_ctx_log.warn,
"An attempt to add group to tx:{} pid:{} tx_seq:{} etag:{} assuming "
"etag is {}",
tx.id,
Expand Down Expand Up @@ -844,6 +851,11 @@ tm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tm_ss_buf) {
_insync_offset = data.offset;
_hosted_txes = std::move(data.hash_ranges);
_hosted_txes.inited = true;
vlog(
_ctx_log.trace,
"Applied snapshot at offset: {}, hosted txes: {}",
hdr.offset,
_hosted_txes);
}

return ss::now();
Expand Down Expand Up @@ -960,7 +972,7 @@ tm_stm::apply_tm_update(model::record_batch_header hdr, model::record_batch b) {
if (tx.status == tm_transaction::tx_status::tombstone) {
_cache->erase_log(tx.id);
vlog(
txlog.trace,
_ctx_log.trace,
"erasing {} (tombstone) pid:{} tx_seq:{} etag:{} in term:{} from mem",
tx.id,
tx.pid,
Expand All @@ -980,7 +992,7 @@ tm_stm::apply_tm_update(model::record_batch_header hdr, model::record_batch b) {
|| (old_tx.etag == tx.etag && old_tx.tx_seq <= tx.tx_seq)) {
_cache->erase_mem(tx.id);
vlog(
txlog.trace,
_ctx_log.trace,
"erasing {} (log overwrite) pid:{} tx_seq:{} etag:{} from mem in "
"term:{} by pid:{} etag:{} tx_seq:{}",
old_tx.id,
Expand Down Expand Up @@ -1015,6 +1027,11 @@ ss::future<> tm_stm::apply_hosted_transactions(model::record_batch b) {
auto hash_ranges = serde::from_iobuf<tm_tx_hosted_transactions>(
rec.release_value());
_hosted_txes = hash_ranges;
vlog(
_ctx_log.trace,
"Applied hosted txes batch from log, offset: {}, ranges: {}",
b.base_offset(),
_hosted_txes);
return ss::now();
}

Expand Down Expand Up @@ -1142,7 +1159,7 @@ tm_stm::expire_tx(model::term_id term, kafka::transactional_id tx_id) {
auto r0 = co_await update_tx(std::move(tx), etag);
if (r0.has_value()) {
vlog(
txlog.error,
_ctx_log.error,
"written tombstone should evict tx:{} from the cache",
tx_id);
co_return tm_stm::op_status::unknown;
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/tm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ class tm_stm final : public persisted_stm<> {
ss::lw_shared_ptr<cluster::tm_stm_cache> _cache;
tm_tx_hosted_transactions _hosted_txes;
mutex _tx_thrashing_lock;
prefix_logger _ctx_log;

ss::future<> apply(model::record_batch b) override;
ss::future<> apply_hosted_transactions(model::record_batch b);
Expand Down
32 changes: 32 additions & 0 deletions src/v/cluster/tm_tx_hash_ranges.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ struct tm_hash_range
return (r.first >= first && r.first <= last)
|| (r.last >= first && r.last <= last) || r.contains(*this);
}

friend std::ostream&
operator<<(std::ostream& o, const tm_hash_range& range) {
fmt::print(o, "[{}, {}]", range.first, range.last);
return o;
}
};

inline tm_hash_range default_tm_hash_range(
Expand Down Expand Up @@ -149,6 +155,12 @@ struct tm_hash_ranges_set
return range1.intersects(range2);
});
}

friend std::ostream&
operator<<(std::ostream& o, const tm_hash_ranges_set& ranges) {
fmt::print(o, "{{ {} }}", ranges.ranges);
return o;
}
};

using repartitioning_id = named_type<int64_t, struct repartitioning_id_type>;
Expand All @@ -170,6 +182,15 @@ struct draining_txs
, transactions(std::move(txs)) {}

auto serde_fields() { return std::tie(id, ranges, transactions); }
friend std::ostream& operator<<(std::ostream& o, const draining_txs& txes) {
fmt::print(
o,
"{{ id: {}, ranges: {}, transactions: {} }}",
txes.id,
txes.ranges,
txes.transactions.size());
return o;
}
};

struct tm_tx_hosted_transactions
Expand Down Expand Up @@ -276,6 +297,17 @@ struct tm_tx_hosted_transactions
auto tx_id_hash = get_tx_id_hash(tx_id);
return hash_ranges.contains(tx_id_hash);
}

friend std::ostream&
operator<<(std::ostream& o, const tm_tx_hosted_transactions& h) {
fmt::print(
o,
"{{ ranges: {}, excluded: {}, included: {} }}",
h.hash_ranges,
h.excluded_transactions.size(),
h.included_transactions.size());
return o;
}
};

} // namespace cluster
Expand Down
Loading

0 comments on commit b5a8b9a

Please sign in to comment.