Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

archival: consistent log size probes across replicas #24257

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/v/cluster/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ redpanda_cc_library(
"archival/scrubber.h",
"archival/scrubber_scheduler.h",
"archival/segment_reupload.h",
"archival/stm_subscriptions.h",
"archival/types.h",
"archival/upload_controller.h",
"archival/upload_housekeeping_service.h",
Expand Down
10 changes: 10 additions & 0 deletions src/v/cluster/archival/archival_metadata_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,9 @@ ss::future<> archival_metadata_stm::do_apply(const model::record_batch& b) {

// The offset should only be advanced after all the changes are applied.
_manifest->advance_insync_offset(b.last_offset());

// Notify subscribers that the STM has been updated.
_subscriptions.notify();
}

ss::future<> archival_metadata_stm::apply_raft_snapshot(const iobuf&) {
Expand Down Expand Up @@ -1240,6 +1243,9 @@ ss::future<> archival_metadata_stm::apply_raft_snapshot(const iobuf&) {
next_offset,
start_offset,
get_last_offset());

// Notify subscribers that the STM has been updated.
_subscriptions.notify();
}

ss::future<> archival_metadata_stm::apply_local_snapshot(
Expand Down Expand Up @@ -1310,6 +1316,10 @@ ss::future<> archival_metadata_stm::apply_local_snapshot(
} else {
_last_clean_at = header.offset;
}

// Notify subscribers that the STM has been updated.
_subscriptions.notify();

co_return;
}

Expand Down
16 changes: 16 additions & 0 deletions src/v/cluster/archival/archival_metadata_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "cloud_storage/partition_manifest.h"
#include "cloud_storage/remote_path_provider.h"
#include "cloud_storage/types.h"
#include "cluster/archival/stm_subscriptions.h"
#include "cluster/errc.h"
#include "cluster/state_machine_registry.h"
#include "cluster/topic_table.h"
Expand Down Expand Up @@ -287,6 +288,18 @@ class archival_metadata_stm final : public raft::persisted_stm<> {
return _remote_path_provider;
}

// Subscribe to STM state changes.
[[nodiscard("return value must be used to unsubscribe")]] archival::
stm_subscriptions::id_t
subscribe_to_state_change(ss::noncopyable_function<void() noexcept> f) {
return _subscriptions.subscribe(std::move(f));
}

// Unsubscribe from STM state changes.
void unsubscribe_from_state_change(archival::stm_subscriptions::id_t id) {
_subscriptions.unsubscribe(id);
}

private:
ss::future<bool>
do_sync(model::timeout_clock::duration timeout, ss::abort_source* as);
Expand Down Expand Up @@ -394,6 +407,9 @@ class archival_metadata_stm final : public raft::persisted_stm<> {
// the change in size in cloud storage until the original segment(s) are
// garbage collected.
size_t _compacted_replaced_bytes{0};

// Subscriptions to STM state changes.
archival::stm_subscriptions _subscriptions;
};

class archival_metadata_stm_factory : public state_machine_factory {
Expand Down
19 changes: 14 additions & 5 deletions src/v/cluster/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,15 @@ ss::future<> ntp_archiver::upload_until_abort(bool legacy_mode) {
}
if (!_probe) {
_probe.emplace(_conf->ntp_metrics_disabled, _ntp);

// Ensure we are exception safe and won't leave the probe without
// a watcher in case of exceptions. Also ensures we won't crash calling
// the callback.
static_assert(noexcept(update_probe()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you could add this requirement to parameter to subscribe_to_state_change ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is there subscribe_to_state_change(ss::noncopyable_function<void() noexcept> f) unless I got your comment wrong


update_probe();
_stm_sub_id = _parent.archival_meta_stm()->subscribe_to_state_change(
[this]() noexcept { update_probe(); });
}

while (!_as.abort_requested()) {
Expand Down Expand Up @@ -797,8 +806,6 @@ ss::future<> ntp_archiver::upload_until_term_change_legacy() {
co_await maybe_flush_manifest_clean_offset();
}

update_probe();

// Drop _uploads_active lock: we are not considered active while
// sleeping for backoff at the end of the loop.
units.return_all();
Expand Down Expand Up @@ -1062,8 +1069,6 @@ ss::future<> ntp_archiver::upload_until_term_change() {
// flush it for them.
co_await maybe_flush_manifest_clean_offset();
}

update_probe();
}
}

Expand Down Expand Up @@ -1163,7 +1168,7 @@ ss::future<cloud_storage::download_result> ntp_archiver::sync_manifest() {
co_return cloud_storage::download_result::success;
}

void ntp_archiver::update_probe() {
void ntp_archiver::update_probe() noexcept {
const auto& man = manifest();

_probe->segments_in_manifest(man.size());
Expand All @@ -1189,6 +1194,10 @@ bool ntp_archiver::may_begin_uploads() const {
}

ss::future<> ntp_archiver::stop() {
if (_stm_sub_id != archival::stm_subscriptions::id_t{}) {
WillemKauf marked this conversation as resolved.
Show resolved Hide resolved
_parent.archival_meta_stm()->unsubscribe_from_state_change(_stm_sub_id);
}

if (_local_segment_merger) {
if (!_local_segment_merger->interrupted()) {
_local_segment_merger->interrupt();
Expand Down
6 changes: 5 additions & 1 deletion src/v/cluster/archival/ntp_archiver_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/

#pragma once

#include "cloud_storage/cache_service.h"
#include "cloud_storage/fwd.h"
#include "cloud_storage/partition_manifest.h"
Expand All @@ -21,6 +22,7 @@
#include "cluster/archival/archiver_scheduler_api.h"
#include "cluster/archival/probe.h"
#include "cluster/archival/scrubber.h"
#include "cluster/archival/stm_subscriptions.h"
#include "cluster/archival/types.h"
#include "cluster/fwd.h"
#include "config/property.h"
Expand Down Expand Up @@ -633,7 +635,7 @@ class ntp_archiver {
// the state of manifest uploads.
bool uploaded_data_past_flush_offset() const;

void update_probe();
void update_probe() noexcept;

/// Return true if archival metadata can be replicated.
/// This means that the replica is a leader, the term did not
Expand Down Expand Up @@ -768,6 +770,8 @@ class ntp_archiver {
config::binding<std::chrono::milliseconds> _initial_backoff;
config::binding<std::chrono::milliseconds> _max_backoff;

archival::stm_subscriptions::id_t _stm_sub_id;

friend class archiver_fixture;
};

Expand Down
59 changes: 59 additions & 0 deletions src/v/cluster/archival/stm_subscriptions.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#pragma once

#include "base/seastarx.h"
#include "utils/named_type.h"

#include <seastar/util/noncopyable_function.hh>

#include <absl/container/flat_hash_map.h>

#include <cstdint>

namespace archival {

/// A manager for archival stm state changes subscriptions.
class stm_subscriptions {
public:
using id_t = named_type<int32_t, struct id_tag>;
using cb_t = ss::noncopyable_function<void() noexcept>;

/// Subscribe to state changes. The callback will be called every time the
/// state of the STM changes. The subscription ID is returned and must be
/// used to unsubscribe. It is unique for the lifetime of the STM.
///
/// If you need to store subscriptions for multiple STMs in the same
/// container, augment the id with additional information like to
/// distinguish between different STMs.
[[nodiscard("return value must be used to unsubscribe")]] id_t
subscribe(cb_t f) {
auto id = _next_sub_id++;
_subscriptions.emplace(id, std::move(f));
return id;
}

/// Unsubscribe from state changes if subscription id exists. If the
/// subscription id does not exist, the method is a no-op.
void unsubscribe(id_t id) { _subscriptions.erase(id); }

/// Notify all subscribers.
void notify() {
for (auto& [_, f] : _subscriptions) {
f();
}
}

private:
absl::flat_hash_map<id_t, cb_t> _subscriptions;
id_t _next_sub_id{0};
};

}; // namespace archival
40 changes: 30 additions & 10 deletions tests/rptest/tests/usage_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import time
import random
import operator
from rptest.services.redpanda import RedpandaService
from rptest.services.redpanda import MetricsEndpoint, RedpandaService
from rptest.clients.rpk import RpkTool
from requests.exceptions import HTTPError
from rptest.services.cluster import cluster
Expand Down Expand Up @@ -498,6 +498,19 @@ def test_usage_manager_cloud_storage(self):

bucket_view = BucketView(self.redpanda)

def fetch_metric_usage():
total_usage = 0
for topic in self.topics:
usage = self.redpanda.metric_sum(
"redpanda_cloud_storage_cloud_log_size",
metrics_endpoint=MetricsEndpoint.PUBLIC_METRICS,
topic=topic.name) / topic.replication_factor
self.logger.info(
f"Metric reported cloud storage usage for topic {topic.name}: {usage}"
)
total_usage += usage
return int(total_usage)

def check_usage():
# Check that the usage reporting system has reported correct values
manifest_usage = bucket_view.cloud_log_sizes_sum().total(
Expand All @@ -514,12 +527,19 @@ def check_usage():
self.logger.info(
f"Max reported usages via kafka/usage_manager: {max(reported_usages)}"
)
return manifest_usage in reported_usages

wait_until(
check_usage,
timeout_sec=30,
backoff_sec=1,
err_msg=
"Reported cloud storage usage (via usage endpoint) did not match the manifest inferred usage"
)
if manifest_usage not in reported_usages:
self.logger.info(
f"Reported usages: {reported_usages} does not contain {manifest_usage}"
)
return False

metric_usage = fetch_metric_usage()
self.logger.info(
f"Metric reported cloud storage usage: {metric_usage}")

return manifest_usage == metric_usage

wait_until(check_usage,
timeout_sec=30,
backoff_sec=1,
err_msg="Inconsistent cloud storage usage reported")