Skip to content

Commit

Permalink
archival: consistent log size probes across replicas
Browse files Browse the repository at this point in the history
We called update probe only from leaders and after exiting the upload
loop which led to inconsistent and stale metrics.

Fix this by introducing a subscription mechanism to the STM which
is the source of truth for the manifest state and must be consistent
across all replicas.

Another option is to report metrics only from the leader :think:
  • Loading branch information
nvartolomei committed Nov 22, 2024
1 parent 1d42061 commit 6e467d4
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 16 deletions.
1 change: 1 addition & 0 deletions src/v/cluster/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ redpanda_cc_library(
"archival/scrubber.h",
"archival/scrubber_scheduler.h",
"archival/segment_reupload.h",
"archival/stm_subscriptions.h",
"archival/types.h",
"archival/upload_controller.h",
"archival/upload_housekeeping_service.h",
Expand Down
10 changes: 10 additions & 0 deletions src/v/cluster/archival/archival_metadata_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1174,6 +1174,9 @@ ss::future<> archival_metadata_stm::do_apply(const model::record_batch& b) {

// The offset should only be advanced after all the changes are applied.
_manifest->advance_insync_offset(b.last_offset());

// Notify subscribers that the STM has been updated.
_subscriptions.notify();
}

ss::future<> archival_metadata_stm::apply_raft_snapshot(const iobuf&) {
Expand Down Expand Up @@ -1235,6 +1238,9 @@ ss::future<> archival_metadata_stm::apply_raft_snapshot(const iobuf&) {
next_offset,
start_offset,
get_last_offset());

// Notify subscribers that the STM has been updated.
_subscriptions.notify();
}

ss::future<> archival_metadata_stm::apply_local_snapshot(
Expand Down Expand Up @@ -1305,6 +1311,10 @@ ss::future<> archival_metadata_stm::apply_local_snapshot(
} else {
_last_clean_at = header.offset;
}

// Notify subscribers that the STM has been updated.
_subscriptions.notify();

co_return;
}

Expand Down
15 changes: 15 additions & 0 deletions src/v/cluster/archival/archival_metadata_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "cloud_storage/partition_manifest.h"
#include "cloud_storage/remote_path_provider.h"
#include "cloud_storage/types.h"
#include "cluster/archival/stm_subscriptions.h"
#include "cluster/errc.h"
#include "cluster/state_machine_registry.h"
#include "cluster/topic_table.h"
Expand Down Expand Up @@ -287,6 +288,17 @@ class archival_metadata_stm final : public raft::persisted_stm<> {
return _remote_path_provider;
}

// Subscribe to STM state changes.
archival::stm_subscriptions::id_t
subscribe_to_state_change(ss::noncopyable_function<void() noexcept> f) {
return _subscriptions.subscribe(std::move(f));
}

// Unsubscribe from STM state changes.
void unsubscribe_from_state_change(archival::stm_subscriptions::id_t id) {
_subscriptions.unsubscribe(id);
}

private:
ss::future<bool>
do_sync(model::timeout_clock::duration timeout, ss::abort_source* as);
Expand Down Expand Up @@ -394,6 +406,9 @@ class archival_metadata_stm final : public raft::persisted_stm<> {
// the change in size in cloud storage until the original segment(s) are
// garbage collected.
size_t _compacted_replaced_bytes{0};

// Subscriptions to STM state changes.
archival::stm_subscriptions _subscriptions;
};

class archival_metadata_stm_factory : public state_machine_factory {
Expand Down
19 changes: 14 additions & 5 deletions src/v/cluster/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,15 @@ ss::future<> ntp_archiver::upload_until_abort(bool legacy_mode) {
}
if (!_probe) {
_probe.emplace(_conf->ntp_metrics_disabled, _ntp);

// Ensure we are exception safe and won't leave the probe without
// a watcher in case of exceptions. Also ensures we won't crash calling
// the callback.
static_assert(noexcept(update_probe()));

update_probe();
_parent.archival_meta_stm()->subscribe_to_state_change(
[this]() noexcept { update_probe(); });
}

while (!_as.abort_requested()) {
Expand Down Expand Up @@ -797,8 +806,6 @@ ss::future<> ntp_archiver::upload_until_term_change_legacy() {
co_await maybe_flush_manifest_clean_offset();
}

update_probe();

// Drop _uploads_active lock: we are not considered active while
// sleeping for backoff at the end of the loop.
units.return_all();
Expand Down Expand Up @@ -1062,8 +1069,6 @@ ss::future<> ntp_archiver::upload_until_term_change() {
// flush it for them.
co_await maybe_flush_manifest_clean_offset();
}

update_probe();
}
}

Expand Down Expand Up @@ -1163,7 +1168,7 @@ ss::future<cloud_storage::download_result> ntp_archiver::sync_manifest() {
co_return cloud_storage::download_result::success;
}

void ntp_archiver::update_probe() {
void ntp_archiver::update_probe() noexcept {
const auto& man = manifest();

_probe->segments_in_manifest(man.size());
Expand All @@ -1189,6 +1194,10 @@ bool ntp_archiver::may_begin_uploads() const {
}

ss::future<> ntp_archiver::stop() {
if (_stm_sub_id != archival::stm_subscriptions::id_t{}) {
_parent.archival_meta_stm()->unsubscribe_from_state_change(_stm_sub_id);
}

if (_local_segment_merger) {
if (!_local_segment_merger->interrupted()) {
_local_segment_merger->interrupt();
Expand Down
6 changes: 5 additions & 1 deletion src/v/cluster/archival/ntp_archiver_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/

#pragma once

#include "cloud_storage/cache_service.h"
#include "cloud_storage/fwd.h"
#include "cloud_storage/partition_manifest.h"
Expand All @@ -21,6 +22,7 @@
#include "cluster/archival/archiver_scheduler_api.h"
#include "cluster/archival/probe.h"
#include "cluster/archival/scrubber.h"
#include "cluster/archival/stm_subscriptions.h"
#include "cluster/archival/types.h"
#include "cluster/fwd.h"
#include "config/property.h"
Expand Down Expand Up @@ -633,7 +635,7 @@ class ntp_archiver {
// the state of manifest uploads.
bool uploaded_data_past_flush_offset() const;

void update_probe();
void update_probe() noexcept;

/// Return true if archival metadata can be replicated.
/// This means that the replica is a leader, the term did not
Expand Down Expand Up @@ -768,6 +770,8 @@ class ntp_archiver {
config::binding<std::chrono::milliseconds> _initial_backoff;
config::binding<std::chrono::milliseconds> _max_backoff;

archival::stm_subscriptions::id_t _stm_sub_id;

friend class archiver_fixture;
};

Expand Down
49 changes: 49 additions & 0 deletions src/v/cluster/archival/stm_subscriptions.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#pragma once

#include "base/seastarx.h"
#include "utils/named_type.h"

#include <seastar/util/noncopyable_function.hh>

#include <absl/container/flat_hash_map.h>

#include <cstdint>

namespace archival {

/// A class that allows to subscribe to state changes in the archival metadata
/// STM.
class stm_subscriptions {
public:
using id_t = named_type<int32_t, struct id_tag>;

id_t subscribe(ss::noncopyable_function<void() noexcept> f) {
auto id = _next_sub_id++;
_subscriptions.emplace(id, std::move(f));
return id;
}

void unsubscribe(id_t id) { _subscriptions.erase(id); }

void notify() {
for (auto& [_, f] : _subscriptions) {
f();
}
}

private:
absl::flat_hash_map<id_t, ss::noncopyable_function<void() noexcept>>
_subscriptions;
id_t _next_sub_id{0};
};

}; // namespace archival
40 changes: 30 additions & 10 deletions tests/rptest/tests/usage_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import time
import random
import operator
from rptest.services.redpanda import RedpandaService
from rptest.services.redpanda import MetricsEndpoint, RedpandaService
from rptest.clients.rpk import RpkTool
from requests.exceptions import HTTPError
from rptest.services.cluster import cluster
Expand Down Expand Up @@ -498,6 +498,19 @@ def test_usage_manager_cloud_storage(self):

bucket_view = BucketView(self.redpanda)

def fetch_metric_usage():
total_usage = 0
for topic in self.topics:
usage = self.redpanda.metric_sum(
"redpanda_cloud_storage_cloud_log_size",
metrics_endpoint=MetricsEndpoint.PUBLIC_METRICS,
topic=topic.name) / topic.replication_factor
self.logger.info(
f"Metric reported cloud storage usage for topic {topic.name}: {usage}"
)
total_usage += usage
return int(total_usage)

def check_usage():
# Check that the usage reporting system has reported correct values
manifest_usage = bucket_view.cloud_log_sizes_sum().total(
Expand All @@ -514,12 +527,19 @@ def check_usage():
self.logger.info(
f"Max reported usages via kafka/usage_manager: {max(reported_usages)}"
)
return manifest_usage in reported_usages

wait_until(
check_usage,
timeout_sec=30,
backoff_sec=1,
err_msg=
"Reported cloud storage usage (via usage endpoint) did not match the manifest inferred usage"
)
if manifest_usage not in reported_usages:
self.logger.info(
f"Reported usages: {reported_usages} does not contain {manifest_usage}"
)
return False

metric_usage = fetch_metric_usage()
self.logger.info(
f"Metric reported cloud storage usage: {metric_usage}")

return manifest_usage == metric_usage

wait_until(check_usage,
timeout_sec=30,
backoff_sec=1,
err_msg="Inconsistent cloud storage usage reported")

0 comments on commit 6e467d4

Please sign in to comment.