diff --git a/src/v/cluster/BUILD b/src/v/cluster/BUILD index 96121b46bf5bd..f14f7ea3d2713 100644 --- a/src/v/cluster/BUILD +++ b/src/v/cluster/BUILD @@ -432,6 +432,7 @@ redpanda_cc_library( "archival/scrubber.h", "archival/scrubber_scheduler.h", "archival/segment_reupload.h", + "archival/stm_subscriptions.h", "archival/types.h", "archival/upload_controller.h", "archival/upload_housekeeping_service.h", diff --git a/src/v/cluster/archival/archival_metadata_stm.cc b/src/v/cluster/archival/archival_metadata_stm.cc index 5b101526c171d..da920ec5a2e8e 100644 --- a/src/v/cluster/archival/archival_metadata_stm.cc +++ b/src/v/cluster/archival/archival_metadata_stm.cc @@ -1174,6 +1174,9 @@ ss::future<> archival_metadata_stm::do_apply(const model::record_batch& b) { // The offset should only be advanced after all the changes are applied. _manifest->advance_insync_offset(b.last_offset()); + + // Notify subscribers that the STM has been updated. + _subscriptions.notify(); } ss::future<> archival_metadata_stm::apply_raft_snapshot(const iobuf&) { @@ -1235,6 +1238,9 @@ ss::future<> archival_metadata_stm::apply_raft_snapshot(const iobuf&) { next_offset, start_offset, get_last_offset()); + + // Notify subscribers that the STM has been updated. + _subscriptions.notify(); } ss::future<> archival_metadata_stm::apply_local_snapshot( @@ -1305,6 +1311,10 @@ ss::future<> archival_metadata_stm::apply_local_snapshot( } else { _last_clean_at = header.offset; } + + // Notify subscribers that the STM has been updated. + _subscriptions.notify(); + co_return; } diff --git a/src/v/cluster/archival/archival_metadata_stm.h b/src/v/cluster/archival/archival_metadata_stm.h index b29d1d1dd8fd6..9bbae51d4e75b 100644 --- a/src/v/cluster/archival/archival_metadata_stm.h +++ b/src/v/cluster/archival/archival_metadata_stm.h @@ -15,6 +15,7 @@ #include "cloud_storage/partition_manifest.h" #include "cloud_storage/remote_path_provider.h" #include "cloud_storage/types.h" +#include "cluster/archival/stm_subscriptions.h" #include "cluster/errc.h" #include "cluster/state_machine_registry.h" #include "cluster/topic_table.h" @@ -287,6 +288,17 @@ class archival_metadata_stm final : public raft::persisted_stm<> { return _remote_path_provider; } + // Subscribe to STM state changes. + archival::stm_subscriptions::id_t + subscribe_to_state_change(ss::noncopyable_function f) { + return _subscriptions.subscribe(std::move(f)); + } + + // Unsubscribe from STM state changes. + void unsubscribe_from_state_change(archival::stm_subscriptions::id_t id) { + _subscriptions.unsubscribe(id); + } + private: ss::future do_sync(model::timeout_clock::duration timeout, ss::abort_source* as); @@ -394,6 +406,9 @@ class archival_metadata_stm final : public raft::persisted_stm<> { // the change in size in cloud storage until the original segment(s) are // garbage collected. size_t _compacted_replaced_bytes{0}; + + // Subscriptions to STM state changes. + archival::stm_subscriptions _subscriptions; }; class archival_metadata_stm_factory : public state_machine_factory { diff --git a/src/v/cluster/archival/ntp_archiver_service.cc b/src/v/cluster/archival/ntp_archiver_service.cc index b628098632d22..d390495f9c30a 100644 --- a/src/v/cluster/archival/ntp_archiver_service.cc +++ b/src/v/cluster/archival/ntp_archiver_service.cc @@ -338,6 +338,15 @@ ss::future<> ntp_archiver::upload_until_abort(bool legacy_mode) { } if (!_probe) { _probe.emplace(_conf->ntp_metrics_disabled, _ntp); + + // Ensure we are exception safe and won't leave the probe without + // a watcher in case of exceptions. Also ensures we won't crash calling + // the callback. + static_assert(noexcept(update_probe())); + + update_probe(); + _parent.archival_meta_stm()->subscribe_to_state_change( + [this]() noexcept { update_probe(); }); } while (!_as.abort_requested()) { @@ -797,8 +806,6 @@ ss::future<> ntp_archiver::upload_until_term_change_legacy() { co_await maybe_flush_manifest_clean_offset(); } - update_probe(); - // Drop _uploads_active lock: we are not considered active while // sleeping for backoff at the end of the loop. units.return_all(); @@ -1062,8 +1069,6 @@ ss::future<> ntp_archiver::upload_until_term_change() { // flush it for them. co_await maybe_flush_manifest_clean_offset(); } - - update_probe(); } } @@ -1163,7 +1168,7 @@ ss::future ntp_archiver::sync_manifest() { co_return cloud_storage::download_result::success; } -void ntp_archiver::update_probe() { +void ntp_archiver::update_probe() noexcept { const auto& man = manifest(); _probe->segments_in_manifest(man.size()); @@ -1189,6 +1194,10 @@ bool ntp_archiver::may_begin_uploads() const { } ss::future<> ntp_archiver::stop() { + if (_stm_sub_id != archival::stm_subscriptions::id_t{}) { + _parent.archival_meta_stm()->unsubscribe_from_state_change(_stm_sub_id); + } + if (_local_segment_merger) { if (!_local_segment_merger->interrupted()) { _local_segment_merger->interrupt(); diff --git a/src/v/cluster/archival/ntp_archiver_service.h b/src/v/cluster/archival/ntp_archiver_service.h index 2ddd8008ea49d..d4ab88018a95f 100644 --- a/src/v/cluster/archival/ntp_archiver_service.h +++ b/src/v/cluster/archival/ntp_archiver_service.h @@ -9,6 +9,7 @@ */ #pragma once + #include "cloud_storage/cache_service.h" #include "cloud_storage/fwd.h" #include "cloud_storage/partition_manifest.h" @@ -21,6 +22,7 @@ #include "cluster/archival/archiver_scheduler_api.h" #include "cluster/archival/probe.h" #include "cluster/archival/scrubber.h" +#include "cluster/archival/stm_subscriptions.h" #include "cluster/archival/types.h" #include "cluster/fwd.h" #include "config/property.h" @@ -633,7 +635,7 @@ class ntp_archiver { // the state of manifest uploads. bool uploaded_data_past_flush_offset() const; - void update_probe(); + void update_probe() noexcept; /// Return true if archival metadata can be replicated. /// This means that the replica is a leader, the term did not @@ -768,6 +770,8 @@ class ntp_archiver { config::binding _initial_backoff; config::binding _max_backoff; + archival::stm_subscriptions::id_t _stm_sub_id; + friend class archiver_fixture; }; diff --git a/src/v/cluster/archival/stm_subscriptions.h b/src/v/cluster/archival/stm_subscriptions.h new file mode 100644 index 0000000000000..d1434847ee5a1 --- /dev/null +++ b/src/v/cluster/archival/stm_subscriptions.h @@ -0,0 +1,49 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#pragma once + +#include "base/seastarx.h" +#include "utils/named_type.h" + +#include + +#include + +#include + +namespace archival { + +/// A class that allows to subscribe to state changes in the archival metadata +/// STM. +class stm_subscriptions { +public: + using id_t = named_type; + + id_t subscribe(ss::noncopyable_function f) { + auto id = _next_sub_id++; + _subscriptions.emplace(id, std::move(f)); + return id; + } + + void unsubscribe(id_t id) { _subscriptions.erase(id); } + + void notify() { + for (auto& [_, f] : _subscriptions) { + f(); + } + } + +private: + absl::flat_hash_map> + _subscriptions; + id_t _next_sub_id{0}; +}; + +}; // namespace archival diff --git a/tests/rptest/tests/usage_test.py b/tests/rptest/tests/usage_test.py index 4bb739389e61d..bbeeb32c5b75f 100644 --- a/tests/rptest/tests/usage_test.py +++ b/tests/rptest/tests/usage_test.py @@ -11,7 +11,7 @@ import time import random import operator -from rptest.services.redpanda import RedpandaService +from rptest.services.redpanda import MetricsEndpoint, RedpandaService from rptest.clients.rpk import RpkTool from requests.exceptions import HTTPError from rptest.services.cluster import cluster @@ -498,6 +498,19 @@ def test_usage_manager_cloud_storage(self): bucket_view = BucketView(self.redpanda) + def fetch_metric_usage(): + total_usage = 0 + for topic in self.topics: + usage = self.redpanda.metric_sum( + "redpanda_cloud_storage_cloud_log_size", + metrics_endpoint=MetricsEndpoint.PUBLIC_METRICS, + topic=topic.name) / topic.replication_factor + self.logger.info( + f"Metric reported cloud storage usage for topic {topic.name}: {usage}" + ) + total_usage += usage + return int(total_usage) + def check_usage(): # Check that the usage reporting system has reported correct values manifest_usage = bucket_view.cloud_log_sizes_sum().total( @@ -514,12 +527,19 @@ def check_usage(): self.logger.info( f"Max reported usages via kafka/usage_manager: {max(reported_usages)}" ) - return manifest_usage in reported_usages - - wait_until( - check_usage, - timeout_sec=30, - backoff_sec=1, - err_msg= - "Reported cloud storage usage (via usage endpoint) did not match the manifest inferred usage" - ) + if manifest_usage not in reported_usages: + self.logger.info( + f"Reported usages: {reported_usages} does not contain {manifest_usage}" + ) + return False + + metric_usage = fetch_metric_usage() + self.logger.info( + f"Metric reported cloud storage usage: {metric_usage}") + + return manifest_usage == metric_usage + + wait_until(check_usage, + timeout_sec=30, + backoff_sec=1, + err_msg="Inconsistent cloud storage usage reported")