diff --git a/src/v/archival/ntp_archiver_service.cc b/src/v/archival/ntp_archiver_service.cc index 17b721af8940f..cdc79821cfe41 100644 --- a/src/v/archival/ntp_archiver_service.cc +++ b/src/v/archival/ntp_archiver_service.cc @@ -320,7 +320,8 @@ ss::future<> ntp_archiver::upload_until_abort() { co_return; } if (!_probe) { - _probe.emplace(_conf->ntp_metrics_disabled, _ntp); + _probe.emplace( + _conf->ntp_metrics_disabled, _ntp, _parent.archival_meta_stm()); } while (!_as.abort_requested()) { @@ -436,7 +437,8 @@ ss::future<> ntp_archiver::sync_manifest_until_abort() { co_return; } if (!_probe) { - _probe.emplace(_conf->ntp_metrics_disabled, _ntp); + _probe.emplace( + _conf->ntp_metrics_disabled, _ntp, _parent.archival_meta_stm()); } while (!_as.abort_requested()) { @@ -769,8 +771,6 @@ ss::future<> ntp_archiver::upload_until_term_change() { co_await maybe_flush_manifest_clean_offset(); } - update_probe(); - // Drop _uploads_active lock: we are not considered active while // sleeping for backoff at the end of the loop. units.return_all(); @@ -896,22 +896,6 @@ ss::future ntp_archiver::sync_manifest() { co_return cloud_storage::download_result::success; } -void ntp_archiver::update_probe() { - const auto& man = manifest(); - - _probe->segments_in_manifest(man.size()); - - const auto first_addressable = man.first_addressable_segment(); - const auto truncated_seg_count = first_addressable == man.end() - ? 0 - : first_addressable.index(); - - _probe->segments_to_delete( - truncated_seg_count + man.replaced_segments_count()); - - _probe->cloud_log_size(man.cloud_log_size()); -} - bool ntp_archiver::can_update_archival_metadata() const { return !_as.abort_requested() && !_gate.is_closed() && _parent.is_leader() && _parent.term() == _start_term; diff --git a/src/v/archival/ntp_archiver_service.h b/src/v/archival/ntp_archiver_service.h index e3574d8488b78..51f4bc56d22be 100644 --- a/src/v/archival/ntp_archiver_service.h +++ b/src/v/archival/ntp_archiver_service.h @@ -621,8 +621,6 @@ class ntp_archiver { // the state of manifest uploads. bool uploaded_data_past_flush_offset() const; - void update_probe(); - /// Return true if archival metadata can be replicated. /// This means that the replica is a leader, the term did not /// change and the archiver is not stopping. diff --git a/src/v/archival/probe.cc b/src/v/archival/probe.cc index f7a1519f5bb0f..9dfe34e676ee0 100644 --- a/src/v/archival/probe.cc +++ b/src/v/archival/probe.cc @@ -10,6 +10,7 @@ #include "archival/probe.h" +#include "archival/archival_metadata_stm.h" #include "config/configuration.h" #include "metrics/prometheus_sanitize.h" @@ -19,7 +20,10 @@ namespace archival { ntp_level_probe::ntp_level_probe( - per_ntp_metrics_disabled disabled, const model::ntp& ntp) { + per_ntp_metrics_disabled disabled, + const model::ntp& ntp, + ss::shared_ptr stm) + : _stm(std::move(stm)) { if (!disabled) { setup_ntp_metrics(ntp); } @@ -118,21 +122,31 @@ void ntp_level_probe::setup_public_metrics(const model::ntp& ntp) { .aggregate(aggregate_labels), sm::make_gauge( "segments", - [this] { return _segments_in_manifest; }, + [this] { return _stm->manifest().size(); }, sm::description( "Total number of accounted segments in the cloud for the topic"), labels) .aggregate(aggregate_labels), sm::make_gauge( "segments_pending_deletion", - [this] { return _segments_to_delete; }, + [this] { + const auto first_addressable + = _stm->manifest().first_addressable_segment(); + const auto truncated_seg_count = first_addressable + == _stm->manifest().end() + ? 0 + : first_addressable.index(); + + return truncated_seg_count + + _stm->manifest().replaced_segments_count(); + }, sm::description("Total number of segments pending deletion from the " "cloud for the topic"), labels) .aggregate(aggregate_labels), sm::make_gauge( "cloud_log_size", - [this] { return _cloud_log_size; }, + [this] { return _stm->manifest().cloud_log_size(); }, sm::description( "Total size in bytes of the user-visible log for the topic"), labels) diff --git a/src/v/archival/probe.h b/src/v/archival/probe.h index ac079222d9c19..72712cb091dc4 100644 --- a/src/v/archival/probe.h +++ b/src/v/archival/probe.h @@ -19,6 +19,10 @@ #include +namespace cluster { +class archival_metadata_stm; +} + namespace archival { /// \brief Per-ntp archval service probe @@ -30,7 +34,10 @@ namespace archival { /// The unit of measure is offset delta. class ntp_level_probe { public: - ntp_level_probe(per_ntp_metrics_disabled disabled, const model::ntp& ntp); + ntp_level_probe( + per_ntp_metrics_disabled disabled, + const model::ntp& ntp, + ss::shared_ptr stm); ntp_level_probe(const ntp_level_probe&) = delete; ntp_level_probe& operator=(const ntp_level_probe&) = delete; ntp_level_probe(ntp_level_probe&&) = delete; @@ -56,12 +63,6 @@ class ntp_level_probe { _segments_deleted += deleted_count; }; - void segments_in_manifest(int64_t count) { _segments_in_manifest = count; }; - - void segments_to_delete(int64_t count) { _segments_to_delete = count; }; - - void cloud_log_size(uint64_t size) { _cloud_log_size = size; } - void compacted_replaced_bytes(size_t bytes) { _compacted_replaced_bytes = bytes; } @@ -77,17 +78,13 @@ class ntp_level_probe { int64_t _pending = 0; /// Number of segments deleted by garbage collection int64_t _segments_deleted = 0; - /// Number of accounted segments in the cloud - int64_t _segments_in_manifest = 0; - /// Number of segments awaiting deletion - int64_t _segments_to_delete = 0; - /// size in bytes of the user-visible log - uint64_t _cloud_log_size = 0; /// cloud bytes "removed" due to compaction operation size_t _compacted_replaced_bytes = 0; metrics::internal_metric_groups _metrics; metrics::public_metric_groups _public_metrics; + + ss::shared_ptr _stm; }; /// Metrics probe for upload housekeeping service diff --git a/tests/rptest/tests/usage_test.py b/tests/rptest/tests/usage_test.py index 4bb739389e61d..bbeeb32c5b75f 100644 --- a/tests/rptest/tests/usage_test.py +++ b/tests/rptest/tests/usage_test.py @@ -11,7 +11,7 @@ import time import random import operator -from rptest.services.redpanda import RedpandaService +from rptest.services.redpanda import MetricsEndpoint, RedpandaService from rptest.clients.rpk import RpkTool from requests.exceptions import HTTPError from rptest.services.cluster import cluster @@ -498,6 +498,19 @@ def test_usage_manager_cloud_storage(self): bucket_view = BucketView(self.redpanda) + def fetch_metric_usage(): + total_usage = 0 + for topic in self.topics: + usage = self.redpanda.metric_sum( + "redpanda_cloud_storage_cloud_log_size", + metrics_endpoint=MetricsEndpoint.PUBLIC_METRICS, + topic=topic.name) / topic.replication_factor + self.logger.info( + f"Metric reported cloud storage usage for topic {topic.name}: {usage}" + ) + total_usage += usage + return int(total_usage) + def check_usage(): # Check that the usage reporting system has reported correct values manifest_usage = bucket_view.cloud_log_sizes_sum().total( @@ -514,12 +527,19 @@ def check_usage(): self.logger.info( f"Max reported usages via kafka/usage_manager: {max(reported_usages)}" ) - return manifest_usage in reported_usages - - wait_until( - check_usage, - timeout_sec=30, - backoff_sec=1, - err_msg= - "Reported cloud storage usage (via usage endpoint) did not match the manifest inferred usage" - ) + if manifest_usage not in reported_usages: + self.logger.info( + f"Reported usages: {reported_usages} does not contain {manifest_usage}" + ) + return False + + metric_usage = fetch_metric_usage() + self.logger.info( + f"Metric reported cloud storage usage: {metric_usage}") + + return manifest_usage == metric_usage + + wait_until(check_usage, + timeout_sec=30, + backoff_sec=1, + err_msg="Inconsistent cloud storage usage reported")