From f5ed5134e60de12528901a6d90a2d6351b44372b Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Tue, 2 Jul 2024 23:23:02 -0400 Subject: [PATCH 1/3] cluster: cache kafka offset of last applied record in log_eviction_stm This allows the log_eviction_stm to know what the current kafka start offset override is even if it doesn't reside in the local log. However, since this cached value is not snapshotted by the stm it may not be recovered when repanda restarts. In this case one would need to fallback on the archival stm to find the kafka start offset override. (cherry picked from commit feeec639f2d52379d33100c505682113deb03817) --- src/v/cluster/log_eviction_stm.cc | 36 ++++++++++++++++++++++++++++--- src/v/cluster/log_eviction_stm.h | 21 +++++++++++------- src/v/cluster/partition.cc | 25 ++++++--------------- 3 files changed, 53 insertions(+), 29 deletions(-) diff --git a/src/v/cluster/log_eviction_stm.cc b/src/v/cluster/log_eviction_stm.cc index e30a0eb12c7d..8347bc1fa532 100644 --- a/src/v/cluster/log_eviction_stm.cc +++ b/src/v/cluster/log_eviction_stm.cc @@ -217,8 +217,37 @@ log_eviction_stm::do_write_raft_snapshot(model::offset truncation_point) { raft::write_snapshot_cfg(truncation_point, std::move(snapshot_data))); } -ss::future> -log_eviction_stm::sync_start_offset_override( +kafka::offset log_eviction_stm::kafka_start_offset_override() { + if (_cached_kafka_start_offset_override != kafka::offset{}) { + return _cached_kafka_start_offset_override; + } + + // Since the STM doesn't snapshot `_cached_kafka_start_override` its + // possible for it to be lost during restarts. Therefore the raft offset + // which is snapshotted will be translated if possible. + if (_delete_records_eviction_offset == model::offset{}) { + return kafka::offset{}; + } + + auto raft_start_offset_override = model::next_offset( + _delete_records_eviction_offset); + + // This handles an edge case where the stm will not record any raft + // offsets that do not land in local storage. Hence returning + // `kafka::offset{}` indicates to the caller that the archival stm + // should be queried for the offset instead. + if (raft_start_offset_override <= _raft->start_offset()) { + return kafka::offset{}; + } + + _cached_kafka_start_offset_override = model::offset_cast( + _raft->log()->from_log_offset(raft_start_offset_override)); + + return _cached_kafka_start_offset_override; +} + +ss::future> +log_eviction_stm::sync_kafka_start_offset_override( model::timeout_clock::duration timeout) { /// Call this method to ensure followers have processed up until the /// most recent known version of the special batch. This is particularly @@ -232,7 +261,7 @@ log_eviction_stm::sync_start_offset_override( co_return errc::timeout; } } - co_return start_offset_override(); + co_return kafka_start_offset_override(); } model::offset log_eviction_stm::effective_start_offset() const { @@ -369,6 +398,7 @@ ss::future<> log_eviction_stm::apply(const model::record_batch& batch) { } const auto record = serde::from_iobuf( batch.copy_records().begin()->release_value()); + _cached_kafka_start_offset_override = record.kafka_start_offset; if (record.rp_start_offset == model::offset{}) { // This may happen if the requested offset was not in the local log at // time of replicating. We still need to have replicated it though so diff --git a/src/v/cluster/log_eviction_stm.h b/src/v/cluster/log_eviction_stm.h index a992a0675200..0338bf240baf 100644 --- a/src/v/cluster/log_eviction_stm.h +++ b/src/v/cluster/log_eviction_stm.h @@ -94,15 +94,17 @@ class log_eviction_stm /// This only returns the start override, if one exists. It does not take /// into account local storage, and may not even point to an offset that /// exists in local storage (e.g. if we have locally truncated). - ss::future - sync_start_offset_override(model::timeout_clock::duration timeout); + /// + /// If `kafka::offset{}` is returned and archival storage is enabled for the + /// given ntp then the caller should fall back on the archival stm to check + /// if a start offset override exists and if so what its value is. + ss::future> + sync_kafka_start_offset_override(model::timeout_clock::duration timeout); - model::offset start_offset_override() const { - if (_delete_records_eviction_offset == model::offset{}) { - return model::offset{}; - } - return model::next_offset(_delete_records_eviction_offset); - } + /// If `kafka::offset{}` is returned and archival storage is enabled for the + /// given ntp then the caller should fall back on the archival stm to check + /// if a start offset override exists and if so what its value is. + kafka::offset kafka_start_offset_override(); ss::future take_snapshot(model::offset) final { co_return iobuf{}; } @@ -144,6 +146,9 @@ class log_eviction_stm // Should be signaled every time either of the above offsets are updated. ss::condition_variable _has_pending_truncation; + + // Kafka offset of the last `prefix_truncate_record` applied to this stm. + kafka::offset _cached_kafka_start_offset_override; }; class log_eviction_stm_factory : public state_machine_factory { diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 497fc6d5fc39..348f2a54b873 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -1228,22 +1228,14 @@ ss::future> partition::sync_kafka_start_offset_override( model::timeout_clock::duration timeout) { if (_log_eviction_stm && !is_read_replica_mode_enabled()) { - auto offset_res - = co_await _log_eviction_stm->sync_start_offset_override(timeout); + auto offset_res = co_await _log_eviction_stm + ->sync_kafka_start_offset_override(timeout); if (offset_res.has_failure()) { co_return offset_res.as_failure(); } - // The eviction STM only keeps track of DeleteRecords truncations - // as Raft offsets. Translate if possible. - if ( - offset_res.value() != model::offset{} - && _raft->start_offset() < offset_res.value()) { - auto start_kafka_offset = log()->from_log_offset( - offset_res.value()); - co_return start_kafka_offset; + if (offset_res.value() != kafka::offset{}) { + co_return kafka::offset_cast(offset_res.value()); } - // If a start override is no longer in the offset translator state, - // it may have been uploaded and persisted in the manifest. } if (_archival_meta_stm) { auto term = _raft->term(); @@ -1318,13 +1310,10 @@ partition::archival_meta_stm() const { std::optional partition::kafka_start_offset_override() const { if (_log_eviction_stm && !is_read_replica_mode_enabled()) { - auto o = _log_eviction_stm->start_offset_override(); - if (o != model::offset{} && _raft->start_offset() < o) { - auto start_kafka_offset = log()->from_log_offset(o); - return start_kafka_offset; + auto o = _log_eviction_stm->kafka_start_offset_override(); + if (o != kafka::offset{}) { + return kafka::offset_cast(o); } - // If a start override is no longer in the offset translator state, - // it may have been uploaded and persisted in the manifest. } if (_archival_meta_stm) { auto o From bba50c6645640812bc78978d76ac6a5632abfe2b Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Sat, 8 Jun 2024 23:23:08 -0400 Subject: [PATCH 2/3] cluster: avoid syncing archival_meta_stm in sync_kafka_start_offset_override (cherry picked from commit 3ba042d4ee19e1329c8faedd336d65d483c3b462) --- src/v/cluster/partition.cc | 52 ++++++++++++++++++++++++++++++++------ src/v/cluster/partition.h | 4 +++ 2 files changed, 48 insertions(+), 8 deletions(-) diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 348f2a54b873..ce344cb7e671 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -1227,7 +1227,22 @@ partition::get_cloud_storage_manifest_view() { ss::future> partition::sync_kafka_start_offset_override( model::timeout_clock::duration timeout) { - if (_log_eviction_stm && !is_read_replica_mode_enabled()) { + if (is_read_replica_mode_enabled()) { + auto term = _raft->term(); + if (!co_await _archival_meta_stm->sync(timeout)) { + if (term != _raft->term()) { + co_return errc::not_leader; + } else { + co_return errc::timeout; + } + } + auto start_kafka_offset + = _archival_meta_stm->manifest().get_start_kafka_offset_override(); + + co_return kafka::offset_cast(start_kafka_offset); + } + + if (_log_eviction_stm) { auto offset_res = co_await _log_eviction_stm ->sync_kafka_start_offset_override(timeout); if (offset_res.has_failure()) { @@ -1237,7 +1252,29 @@ partition::sync_kafka_start_offset_override( co_return kafka::offset_cast(offset_res.value()); } } - if (_archival_meta_stm) { + + if (!_archival_meta_stm) { + co_return model::offset{}; + } + + // There are a few cases in which the log_eviction_stm will return a kafka + // offset of `kafka::offset{}` for the start offset override. + // - The topic was remotely recovered. + // - A start offset override was never set. + // - The broker has restarted and the log_eviction_stm couldn't recover the + // kafka offset for the start offset override. + // + // In all cases we'll need to fall back to the archival stm to figure out if + // a start offset override exists, and if so, what it is. + // + // For this we'll sync the archival stm a single time to ensure we have the + // most up-to-date manifest. From that point onwards the offset + // `_archival_meta_stm->manifest().get_start_kafka_offset_override()` will + // be correct without having to sync again. This is since the offset will + // not change until another offset override has been applied to the log + // eviction stm. And at that point the log eviction stm will be able to give + // us the correct offset override. + if (!_has_synced_archival_for_start_override) [[unlikely]] { auto term = _raft->term(); if (!co_await _archival_meta_stm->sync(timeout)) { if (term != _raft->term()) { @@ -1246,13 +1283,12 @@ partition::sync_kafka_start_offset_override( co_return errc::timeout; } } - auto start_kafka_offset - = _archival_meta_stm->manifest().get_start_kafka_offset_override(); - if (start_kafka_offset != kafka::offset{}) { - co_return kafka::offset_cast(start_kafka_offset); - } + _has_synced_archival_for_start_override = true; } - co_return model::offset{}; + + auto start_kafka_offset + = _archival_meta_stm->manifest().get_start_kafka_offset_override(); + co_return kafka::offset_cast(start_kafka_offset); } model::offset partition::last_stable_offset() const { diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index b1679bd1ecbf..dacf40b80fbe 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -378,6 +378,10 @@ class partition : public ss::enable_lw_shared_from_this { ss::sharded& _upload_housekeeping; + // Used in `sync_kafka_start_offset_override` to avoid having to re-sync the + // `archival_meta_stm`. + bool _has_synced_archival_for_start_override{false}; + friend std::ostream& operator<<(std::ostream& o, const partition& x); }; } // namespace cluster From 281778f275773359e3b45ae1c2e22fc4c4423b44 Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Sun, 23 Jun 2024 16:22:27 -0400 Subject: [PATCH 3/3] treewide: remove coroutines in validate_fetch_offset callees The most common path for validate_fetch_offset results in a number of short lived coroutines. In these cases the allocation/deallocation for the coroutine's frame ends up dominating the runtime for the function. This commit removes the coroutines in favor for then chains which can avoid the allocation if the task quota hasn't been met. (cherry picked from commit e96451ad86d860097a7f1928ac3445f411459318) --- src/v/cluster/log_eviction_stm.cc | 19 ++-- src/v/kafka/server/replicated_partition.cc | 102 +++++++++++---------- src/v/raft/persisted_stm.cc | 22 +++-- 3 files changed, 77 insertions(+), 66 deletions(-) diff --git a/src/v/cluster/log_eviction_stm.cc b/src/v/cluster/log_eviction_stm.cc index 8347bc1fa532..ead87335e291 100644 --- a/src/v/cluster/log_eviction_stm.cc +++ b/src/v/cluster/log_eviction_stm.cc @@ -254,14 +254,17 @@ log_eviction_stm::sync_kafka_start_offset_override( /// useful to know if the start offset is up to date in the case /// leadership has recently changed for example. auto term = _raft->term(); - if (!co_await sync(timeout)) { - if (term != _raft->term()) { - co_return errc::not_leader; - } else { - co_return errc::timeout; - } - } - co_return kafka_start_offset_override(); + return sync(timeout).then( + [this, term](bool success) -> result { + if (!success) { + if (term != _raft->term()) { + return errc::not_leader; + } else { + return errc::timeout; + } + } + return kafka_start_offset_override(); + }); } model::offset log_eviction_stm::effective_start_offset() const { diff --git a/src/v/kafka/server/replicated_partition.cc b/src/v/kafka/server/replicated_partition.cc index d323cef241a4..00fff8bd960a 100644 --- a/src/v/kafka/server/replicated_partition.cc +++ b/src/v/kafka/server/replicated_partition.cc @@ -46,30 +46,32 @@ const model::ntp& replicated_partition::ntp() const { ss::future> replicated_partition::sync_effective_start( model::timeout_clock::duration timeout) { - auto synced_start_offset_override - = co_await _partition->sync_kafka_start_offset_override(timeout); - if (synced_start_offset_override.has_failure()) { - auto err = synced_start_offset_override.error(); - auto error_code = error_code::unknown_server_error; - if (err.category() == cluster::error_category()) { - switch (cluster::errc(err.value())) { - /** - * In the case of timeout and shutting down errors return - * not_leader_for_partition error to force clients retry. - */ - case cluster::errc::shutting_down: - case cluster::errc::not_leader: - case cluster::errc::timeout: - error_code = error_code::not_leader_for_partition; - break; - default: - error_code = error_code::unknown_server_error; - } - } - co_return error_code; - } - co_return kafka_start_offset_with_override( - synced_start_offset_override.value()); + return _partition->sync_kafka_start_offset_override(timeout).then( + [this](auto synced_start_offset_override) + -> result { + if (synced_start_offset_override.has_failure()) { + auto err = synced_start_offset_override.error(); + auto error_code = error_code::unknown_server_error; + if (err.category() == cluster::error_category()) { + switch (cluster::errc(err.value())) { + /** + * In the case of timeout and shutting down errors return + * not_leader_for_partition error to force clients retry. + */ + case cluster::errc::shutting_down: + case cluster::errc::not_leader: + case cluster::errc::timeout: + error_code = error_code::not_leader_for_partition; + break; + default: + error_code = error_code::unknown_server_error; + } + } + return error_code; + } + return kafka_start_offset_with_override( + synced_start_offset_override.value()); + }); } model::offset replicated_partition::start_offset() const { @@ -577,36 +579,40 @@ ss::future replicated_partition::validate_fetch_offset( ec); } - co_return ec; + return ss::make_ready_future(ec); } // Grab the up to date start offset auto timeout = deadline - model::timeout_clock::now(); - auto start_offset = co_await sync_effective_start(timeout); - if (!start_offset) { - vlog( - klog.warn, - "ntp {}: error obtaining latest start offset - {}", - ntp(), - start_offset.error()); - co_return start_offset.error(); - } + return sync_effective_start(timeout).then( + [this, fetch_offset](auto start_offset) { + if (!start_offset) { + vlog( + klog.warn, + "ntp {}: error obtaining latest start offset - {}", + ntp(), + start_offset.error()); + return start_offset.error(); + } - if ( - fetch_offset < start_offset.value() || fetch_offset > log_end_offset()) { - vlog( - klog.warn, - "ntp {}: fetch offset_out_of_range on leader, requested: {}, " - "partition start offset: {}, high watermark: {}, log end offset: {}", - ntp(), - fetch_offset, - start_offset.value(), - high_watermark(), - log_end_offset()); - co_return error_code::offset_out_of_range; - } + if ( + fetch_offset < start_offset.value() + || fetch_offset > log_end_offset()) { + vlog( + klog.warn, + "ntp {}: fetch offset_out_of_range on leader, requested: {}, " + "partition start offset: {}, high watermark: {}, log end " + "offset: {}", + ntp(), + fetch_offset, + start_offset.value(), + high_watermark(), + log_end_offset()); + return error_code::offset_out_of_range; + } - co_return error_code::none; + return error_code::none; + }); } result replicated_partition::get_partition_info() const { diff --git a/src/v/raft/persisted_stm.cc b/src/v/raft/persisted_stm.cc index 1a42dbb53f2e..3ae7802a79ac 100644 --- a/src/v/raft/persisted_stm.cc +++ b/src/v/raft/persisted_stm.cc @@ -491,16 +491,16 @@ ss::future persisted_stm::sync(model::timeout_clock::duration timeout) { auto term = _raft->term(); if (!_raft->is_leader()) { - co_return false; + return ss::make_ready_future(false); } if (_insync_term == term) { - co_return true; + return ss::make_ready_future(true); } if (_is_catching_up) { auto deadline = model::timeout_clock::now() + timeout; auto sync_waiter = ss::make_lw_shared>(); _sync_waiters.push_back(sync_waiter); - co_return co_await sync_waiter->get_future_with_timeout( + return sync_waiter->get_future_with_timeout( deadline, [] { return false; }); } _is_catching_up = true; @@ -526,14 +526,16 @@ persisted_stm::sync(model::timeout_clock::duration timeout) { // of the term yet. sync_offset = log_offsets.dirty_offset; } - auto is_synced = co_await do_sync(timeout, sync_offset, term); - _is_catching_up = false; - for (auto& sync_waiter : _sync_waiters) { - sync_waiter->set_value(is_synced); - } - _sync_waiters.clear(); - co_return is_synced; + return do_sync(timeout, sync_offset, term).then([this](bool is_synced) { + _is_catching_up = false; + for (auto& sync_waiter : _sync_waiters) { + sync_waiter->set_value(is_synced); + } + _sync_waiters.clear(); + + return is_synced; + }); } template