From 734375bbde45f9a39c7bac9ffd07c55d7f68997c Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Fri, 22 Nov 2024 12:31:37 +0000 Subject: [PATCH] archival: consistent log size probes across replicas We called update probe only from leaders and after exiting the upload loop which led to inconsistent and stale metrics. Fix this by introducing a subscription mechanism to the STM which is the source of truth for the manifest state and must be consistent across all replicas. Another option is to report metrics only from the leader :think: --- src/v/cluster/BUILD | 1 + .../cluster/archival/archival_metadata_stm.cc | 10 ++++ .../cluster/archival/archival_metadata_stm.h | 15 ++++++ .../cluster/archival/ntp_archiver_service.cc | 19 +++++-- src/v/cluster/archival/ntp_archiver_service.h | 6 ++- src/v/cluster/archival/stm_subscriptions.h | 50 +++++++++++++++++++ tests/rptest/tests/usage_test.py | 40 +++++++++++---- 7 files changed, 125 insertions(+), 16 deletions(-) create mode 100644 src/v/cluster/archival/stm_subscriptions.h diff --git a/src/v/cluster/BUILD b/src/v/cluster/BUILD index 96121b46bf5bd..f14f7ea3d2713 100644 --- a/src/v/cluster/BUILD +++ b/src/v/cluster/BUILD @@ -432,6 +432,7 @@ redpanda_cc_library( "archival/scrubber.h", "archival/scrubber_scheduler.h", "archival/segment_reupload.h", + "archival/stm_subscriptions.h", "archival/types.h", "archival/upload_controller.h", "archival/upload_housekeeping_service.h", diff --git a/src/v/cluster/archival/archival_metadata_stm.cc b/src/v/cluster/archival/archival_metadata_stm.cc index 5b101526c171d..da920ec5a2e8e 100644 --- a/src/v/cluster/archival/archival_metadata_stm.cc +++ b/src/v/cluster/archival/archival_metadata_stm.cc @@ -1174,6 +1174,9 @@ ss::future<> archival_metadata_stm::do_apply(const model::record_batch& b) { // The offset should only be advanced after all the changes are applied. _manifest->advance_insync_offset(b.last_offset()); + + // Notify subscribers that the STM has been updated. + _subscriptions.notify(); } ss::future<> archival_metadata_stm::apply_raft_snapshot(const iobuf&) { @@ -1235,6 +1238,9 @@ ss::future<> archival_metadata_stm::apply_raft_snapshot(const iobuf&) { next_offset, start_offset, get_last_offset()); + + // Notify subscribers that the STM has been updated. + _subscriptions.notify(); } ss::future<> archival_metadata_stm::apply_local_snapshot( @@ -1305,6 +1311,10 @@ ss::future<> archival_metadata_stm::apply_local_snapshot( } else { _last_clean_at = header.offset; } + + // Notify subscribers that the STM has been updated. + _subscriptions.notify(); + co_return; } diff --git a/src/v/cluster/archival/archival_metadata_stm.h b/src/v/cluster/archival/archival_metadata_stm.h index b29d1d1dd8fd6..9bbae51d4e75b 100644 --- a/src/v/cluster/archival/archival_metadata_stm.h +++ b/src/v/cluster/archival/archival_metadata_stm.h @@ -15,6 +15,7 @@ #include "cloud_storage/partition_manifest.h" #include "cloud_storage/remote_path_provider.h" #include "cloud_storage/types.h" +#include "cluster/archival/stm_subscriptions.h" #include "cluster/errc.h" #include "cluster/state_machine_registry.h" #include "cluster/topic_table.h" @@ -287,6 +288,17 @@ class archival_metadata_stm final : public raft::persisted_stm<> { return _remote_path_provider; } + // Subscribe to STM state changes. + archival::stm_subscriptions::id_t + subscribe_to_state_change(ss::noncopyable_function f) { + return _subscriptions.subscribe(std::move(f)); + } + + // Unsubscribe from STM state changes. + void unsubscribe_from_state_change(archival::stm_subscriptions::id_t id) { + _subscriptions.unsubscribe(id); + } + private: ss::future do_sync(model::timeout_clock::duration timeout, ss::abort_source* as); @@ -394,6 +406,9 @@ class archival_metadata_stm final : public raft::persisted_stm<> { // the change in size in cloud storage until the original segment(s) are // garbage collected. size_t _compacted_replaced_bytes{0}; + + // Subscriptions to STM state changes. + archival::stm_subscriptions _subscriptions; }; class archival_metadata_stm_factory : public state_machine_factory { diff --git a/src/v/cluster/archival/ntp_archiver_service.cc b/src/v/cluster/archival/ntp_archiver_service.cc index b628098632d22..d390495f9c30a 100644 --- a/src/v/cluster/archival/ntp_archiver_service.cc +++ b/src/v/cluster/archival/ntp_archiver_service.cc @@ -338,6 +338,15 @@ ss::future<> ntp_archiver::upload_until_abort(bool legacy_mode) { } if (!_probe) { _probe.emplace(_conf->ntp_metrics_disabled, _ntp); + + // Ensure we are exception safe and won't leave the probe without + // a watcher in case of exceptions. Also ensures we won't crash calling + // the callback. + static_assert(noexcept(update_probe())); + + update_probe(); + _parent.archival_meta_stm()->subscribe_to_state_change( + [this]() noexcept { update_probe(); }); } while (!_as.abort_requested()) { @@ -797,8 +806,6 @@ ss::future<> ntp_archiver::upload_until_term_change_legacy() { co_await maybe_flush_manifest_clean_offset(); } - update_probe(); - // Drop _uploads_active lock: we are not considered active while // sleeping for backoff at the end of the loop. units.return_all(); @@ -1062,8 +1069,6 @@ ss::future<> ntp_archiver::upload_until_term_change() { // flush it for them. co_await maybe_flush_manifest_clean_offset(); } - - update_probe(); } } @@ -1163,7 +1168,7 @@ ss::future ntp_archiver::sync_manifest() { co_return cloud_storage::download_result::success; } -void ntp_archiver::update_probe() { +void ntp_archiver::update_probe() noexcept { const auto& man = manifest(); _probe->segments_in_manifest(man.size()); @@ -1189,6 +1194,10 @@ bool ntp_archiver::may_begin_uploads() const { } ss::future<> ntp_archiver::stop() { + if (_stm_sub_id != archival::stm_subscriptions::id_t{}) { + _parent.archival_meta_stm()->unsubscribe_from_state_change(_stm_sub_id); + } + if (_local_segment_merger) { if (!_local_segment_merger->interrupted()) { _local_segment_merger->interrupt(); diff --git a/src/v/cluster/archival/ntp_archiver_service.h b/src/v/cluster/archival/ntp_archiver_service.h index 2ddd8008ea49d..d4ab88018a95f 100644 --- a/src/v/cluster/archival/ntp_archiver_service.h +++ b/src/v/cluster/archival/ntp_archiver_service.h @@ -9,6 +9,7 @@ */ #pragma once + #include "cloud_storage/cache_service.h" #include "cloud_storage/fwd.h" #include "cloud_storage/partition_manifest.h" @@ -21,6 +22,7 @@ #include "cluster/archival/archiver_scheduler_api.h" #include "cluster/archival/probe.h" #include "cluster/archival/scrubber.h" +#include "cluster/archival/stm_subscriptions.h" #include "cluster/archival/types.h" #include "cluster/fwd.h" #include "config/property.h" @@ -633,7 +635,7 @@ class ntp_archiver { // the state of manifest uploads. bool uploaded_data_past_flush_offset() const; - void update_probe(); + void update_probe() noexcept; /// Return true if archival metadata can be replicated. /// This means that the replica is a leader, the term did not @@ -768,6 +770,8 @@ class ntp_archiver { config::binding _initial_backoff; config::binding _max_backoff; + archival::stm_subscriptions::id_t _stm_sub_id; + friend class archiver_fixture; }; diff --git a/src/v/cluster/archival/stm_subscriptions.h b/src/v/cluster/archival/stm_subscriptions.h new file mode 100644 index 0000000000000..0407fd3bc2900 --- /dev/null +++ b/src/v/cluster/archival/stm_subscriptions.h @@ -0,0 +1,50 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#pragma once + +#include "base/seastarx.h" +#include "cluster/archival/stm_subscriptions.h" +#include "utils/named_type.h" + +#include + +#include + +#include + +namespace archival { + +/// A class that allows to subscribe to state changes in the archival metadata +/// STM. +class stm_subscriptions { +public: + using id_t = named_type; + + id_t subscribe(ss::noncopyable_function f) { + auto id = _next_sub_id++; + _subscriptions.emplace(id, std::move(f)); + return id; + } + + void unsubscribe(id_t id) { _subscriptions.erase(id); } + + void notify() { + for (auto& [_, f] : _subscriptions) { + f(); + } + } + +private: + absl::flat_hash_map> + _subscriptions; + id_t _next_sub_id{0}; +}; + +}; // namespace archival diff --git a/tests/rptest/tests/usage_test.py b/tests/rptest/tests/usage_test.py index 4bb739389e61d..bbeeb32c5b75f 100644 --- a/tests/rptest/tests/usage_test.py +++ b/tests/rptest/tests/usage_test.py @@ -11,7 +11,7 @@ import time import random import operator -from rptest.services.redpanda import RedpandaService +from rptest.services.redpanda import MetricsEndpoint, RedpandaService from rptest.clients.rpk import RpkTool from requests.exceptions import HTTPError from rptest.services.cluster import cluster @@ -498,6 +498,19 @@ def test_usage_manager_cloud_storage(self): bucket_view = BucketView(self.redpanda) + def fetch_metric_usage(): + total_usage = 0 + for topic in self.topics: + usage = self.redpanda.metric_sum( + "redpanda_cloud_storage_cloud_log_size", + metrics_endpoint=MetricsEndpoint.PUBLIC_METRICS, + topic=topic.name) / topic.replication_factor + self.logger.info( + f"Metric reported cloud storage usage for topic {topic.name}: {usage}" + ) + total_usage += usage + return int(total_usage) + def check_usage(): # Check that the usage reporting system has reported correct values manifest_usage = bucket_view.cloud_log_sizes_sum().total( @@ -514,12 +527,19 @@ def check_usage(): self.logger.info( f"Max reported usages via kafka/usage_manager: {max(reported_usages)}" ) - return manifest_usage in reported_usages - - wait_until( - check_usage, - timeout_sec=30, - backoff_sec=1, - err_msg= - "Reported cloud storage usage (via usage endpoint) did not match the manifest inferred usage" - ) + if manifest_usage not in reported_usages: + self.logger.info( + f"Reported usages: {reported_usages} does not contain {manifest_usage}" + ) + return False + + metric_usage = fetch_metric_usage() + self.logger.info( + f"Metric reported cloud storage usage: {metric_usage}") + + return manifest_usage == metric_usage + + wait_until(check_usage, + timeout_sec=30, + backoff_sec=1, + err_msg="Inconsistent cloud storage usage reported")