Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[storage]: add local retention reclaimable to partition health report #12737

Merged
merged 7 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions src/v/cloud_storage/tests/cloud_storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
#include "archival/ntp_archiver_service.h"
#include "cloud_storage/remote.h"
#include "cloud_storage/spillover_manifest.h"
#include "cloud_storage/tests/manual_fixture.h"
#include "cloud_storage/tests/produce_utils.h"
#include "cloud_storage/tests/s3_imposter.h"
#include "cluster/health_monitor_frontend.h"
#include "config/configuration.h"
#include "kafka/server/tests/produce_consume_utils.h"
#include "model/fundamental.h"
Expand Down Expand Up @@ -464,3 +466,119 @@ FIXTURE_TEST(test_consume_during_spillover, cloud_storage_manual_e2e_test) {
}
cleanup.cancel();
}

FIXTURE_TEST(
reclaimable_reported_in_health_report,
cloud_storage_manual_multinode_test_base) {
config::shard_local_cfg().retention_local_trim_interval.set_value(
std::chrono::milliseconds(2000));

// start a second fixutre and wait for stable setup
auto fx2 = start_second_fixture();
tests::cooperative_spin_wait_with_timeout(3s, [this] {
return app.controller->get_members_table().local().node_ids().size()
== 2;
}).get();

// test topic
const model::topic topic_name("tapioca");
model::ntp ntp(model::kafka_namespace, topic_name, 0);
cluster::topic_properties props;
props.shadow_indexing = model::shadow_indexing_mode::full;
props.cleanup_policy_bitflags = model::cleanup_policy_bitflags::deletion;
props.segment_size = 64_KiB;
props.retention_local_target_bytes = tristate<size_t>(1);
add_topic({model::kafka_namespace, topic_name}, 1, props, 2).get();

// figuring out the leader is useful for constructing the producer. the
// follower is just the "other" node.
redpanda_thread_fixture* fx_l = nullptr;
boost_require_eventually(10s, [&] {
cluster::partition* prt_a
= app.partition_manager.local().get(ntp).get();
cluster::partition* prt_b
= fx2->app.partition_manager.local().get(ntp).get();
if (!prt_a || !prt_b) {
return false;
}
if (prt_a->is_leader()) {
fx_l = this;
return true;
}
if (prt_b->is_leader()) {
fx_l = fx2.get();
return true;
}
return false;
});

auto prt_l = fx_l->app.partition_manager.local().get(ntp);

kafka_produce_transport producer(fx_l->make_kafka_client().get());
producer.start().get();

auto get_reclaimable = [&]() -> std::optional<std::vector<size_t>> {
auto report = app.controller->get_health_monitor()
.local()
.get_cluster_health(
cluster::cluster_report_filter{},
cluster::force_refresh::yes,
model::timeout_clock::now() + std::chrono::seconds(2))
.get();
if (report.has_value()) {
std::vector<size_t> sizes;
for (auto& node_report : report.value().node_reports) {
for (auto& topic : node_report.topics) {
if (
topic.tp_ns
!= model::topic_namespace_view(
model::kafka_namespace, topic_name)) {
continue;
}
for (auto partition : topic.partitions) {
sizes.push_back(
partition.reclaimable_size_bytes.value_or(0));
}
}
}
if (!sizes.empty()) {
return sizes;
}
}
return std::nullopt;
};

for (int j = 0; j < 20; j++) {
for (int i = 0; i < 200; i++) {
producer
.produce_to_partition(
topic_name,
model::partition_id(0),
tests::kv_t::sequence(0, 200))
.get();
}

// drive the uploading
auto& archiver = prt_l->archiver()->get();
archiver.sync_for_tests().get();
archiver.upload_next_candidates().get();

// not for synchronization... just to give the system time to propogate
// all the state changes are are happening so that this overall loop
// doesn't spin to completion too fast.
ss::sleep(std::chrono::seconds(2)).get();

auto sizes = get_reclaimable();
if (sizes.has_value()) {
BOOST_REQUIRE(!sizes->empty());
if (std::all_of(sizes->begin(), sizes->end(), [](size_t s) {
return s > 0;
})) {
return; // test success
}
}
}

// health report never reported non-zero reclaimable sizes. bummer!
BOOST_REQUIRE(false);
}
8 changes: 6 additions & 2 deletions src/v/cluster/health_monitor_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ struct ntp_report {
ntp_leader leader;
size_t size_bytes;
std::optional<uint8_t> under_replicated_replicas;
size_t reclaimable_size_bytes;
};

partition_status to_partition_status(const ntp_report& ntpr) {
Expand All @@ -680,7 +681,8 @@ partition_status to_partition_status(const ntp_report& ntpr) {
.leader_id = ntpr.leader.leader_id,
.revision_id = ntpr.leader.revision_id,
.size_bytes = ntpr.size_bytes,
.under_replicated_replicas = ntpr.under_replicated_replicas};
.under_replicated_replicas = ntpr.under_replicated_replicas,
.reclaimable_size_bytes = ntpr.reclaimable_size_bytes};
}

ss::chunked_fifo<ntp_report> collect_shard_local_reports(
Expand All @@ -703,6 +705,7 @@ ss::chunked_fifo<ntp_report> collect_shard_local_reports(
},
.size_bytes = p.second->size_bytes() + p.second->non_log_disk_size_bytes(),
.under_replicated_replicas = p.second->get_under_replicated(),
.reclaimable_size_bytes = p.second->reclaimable_local_size_bytes(),
};
});
} else {
Expand All @@ -715,8 +718,9 @@ ss::chunked_fifo<ntp_report> collect_shard_local_reports(
.leader_id = partition->get_leader_id(),
.revision_id = partition->get_revision_id(),
},
.size_bytes = partition->size_bytes(),
.size_bytes = partition->size_bytes() + partition->non_log_disk_size_bytes(),
.under_replicated_replicas = partition->get_under_replicated(),
.reclaimable_size_bytes = partition->reclaimable_local_size_bytes(),
});
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/health_monitor_types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,13 @@ std::ostream& operator<<(std::ostream& o, const partition_status& ps) {
fmt::print(
o,
"{{id: {}, term: {}, leader_id: {}, revision_id: {}, size_bytes: {}, "
"under_replicated: {}}}",
"reclaimable_size_bytes: {}, under_replicated: {}}}",
ps.id,
ps.term,
ps.leader_id,
ps.revision_id,
ps.size_bytes,
ps.reclaimable_size_bytes,
ps.under_replicated_replicas);
return o;
}
Expand Down
36 changes: 18 additions & 18 deletions src/v/cluster/health_monitor_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,7 @@ struct node_state

struct partition_status
: serde::
envelope<partition_status, serde::version<1>, serde::compat_version<0>> {
/**
* We increase a version here 'backward' since incorrect assertion would
* cause older redpanda versions to crash.
*
* Version: -1: added revision_id field
* Version: -2: added size_bytes field
*
* Same versioning should also be supported in get_node_health_request
*/

static constexpr int8_t initial_version = 0;
static constexpr int8_t revision_id_version = -1;
static constexpr int8_t size_bytes_version = -2;

static constexpr int8_t current_version = size_bytes_version;

envelope<partition_status, serde::version<2>, serde::compat_version<0>> {
static constexpr size_t invalid_size_bytes = size_t(-1);

model::partition_id id;
Expand All @@ -83,14 +67,30 @@ struct partition_status
size_t size_bytes;
std::optional<uint8_t> under_replicated_replicas;

/*
* estimated amount of data above local retention that is subject to
* reclaim under disk pressure. this is useful for the partition balancer
* which is interested in free space on a node. a node may have very little
dotnwat marked this conversation as resolved.
Show resolved Hide resolved
* physical free space, but have effective free space represented by
* reclaimable size bytes.
*
* an intuitive relationship between size_bytes and reclaimable_size_bytes
* would have the former being >= than the later. however due to the way
* that data is collected it is conceivable that this inequality doesn't
* hold. callers should check for this condition and normalize the values or
* ignore the update.
*/
std::optional<size_t> reclaimable_size_bytes;

auto serde_fields() {
return std::tie(
id,
term,
leader_id,
revision_id,
size_bytes,
under_replicated_replicas);
under_replicated_replicas,
reclaimable_size_bytes);
}

friend std::ostream& operator<<(std::ostream&, const partition_status&);
Expand Down
4 changes: 4 additions & 0 deletions src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ class partition {

size_t size_bytes() const { return _raft->log()->size_bytes(); }

size_t reclaimable_local_size_bytes() const {
return _raft->log()->reclaimable_local_size_bytes();
}

uint64_t non_log_disk_size_bytes() const;

ss::future<> update_configuration(topic_properties);
Expand Down
37 changes: 36 additions & 1 deletion src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1529,7 +1529,18 @@ ss::future<> disk_log_impl::truncate_prefix(truncate_prefix_config cfg) {
vassert(!_closed, "truncate_prefix() on closed log - {}", *this);
return _failure_probes.truncate_prefix().then([this, cfg]() mutable {
// dispatch the actual truncation
return do_truncate_prefix(cfg);
return do_truncate_prefix(cfg)
.then([this] {
Comment on lines +1532 to +1533
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noting there's still room for a race where the reported size is lower than the reclaimable size, I think. Since size_bytes gets updated in do_truncate() and there is this scheduling point here.

It seems hard to be robust, so it's probably fine leaving this, but with the caveat that partition balancing should sanitize these values before making decisions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems hard to be robust, so it's probably fine leaving this, but with the caveat that partition balancing should sanitize these values before making decisions.

yeh. i added a comment to the interface. flagging directly to @ztlpn for visibility.

/*
* after truncation do a quick refresh of cached variables that
* are computed during disk usage calculation. this is useful for
* providing more timely updates of reclaimable space through the
* health report.
*/
return disk_usage_and_reclaimable_space(
_manager.default_gc_config());
})
.discard_result();
});
}

Expand Down Expand Up @@ -2113,6 +2124,11 @@ disk_log_impl::disk_usage_and_reclaimable_space(gc_config input_cfg) {
.local_retention = lcl.total(),
};

/*
* cache this for access by the health
*/
_reclaimable_local_size_bytes = reclaim.local_retention;

co_return std::make_pair(usage, reclaim);
}

Expand Down Expand Up @@ -2587,4 +2603,23 @@ disk_log_impl::get_reclaimable_offsets(gc_config cfg) {
co_return res;
}

size_t disk_log_impl::reclaimable_local_size_bytes() const {
/*
* circumstances/configuration under which this log will be trimming back to
* local retention size may change. catch these before reporting potentially
* stale information.
*/
if (!is_cloud_retention_active()) {
return 0;
}
if (config().is_read_replica_mode_enabled()) {
// https://github.com/redpanda-data/redpanda/issues/11936
return 0;
}
if (deletion_exempt(config().ntp())) {
return 0;
}
return _reclaimable_local_size_bytes;
}

} // namespace storage
3 changes: 3 additions & 0 deletions src/v/storage/disk_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ class disk_log_impl final : public log {
return _segments_rolling_lock.get_units();
}

size_t reclaimable_local_size_bytes() const override;

private:
friend class disk_log_appender; // for multi-term appends
friend class disk_log_builder; // for tests
Expand Down Expand Up @@ -289,6 +291,7 @@ class disk_log_impl final : public log {
mutex _segments_rolling_lock;

std::optional<model::offset> _cloud_gc_offset;
size_t _reclaimable_local_size_bytes{0};
};

} // namespace storage
6 changes: 6 additions & 0 deletions src/v/storage/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ class log {

virtual probe& get_probe() = 0;

/*
* estimate amount of data beyond local retention. zero will be returned in
* cases where this is not applicable, such as the log not being TS-enabled.
*/
virtual size_t reclaimable_local_size_bytes() const = 0;

private:
ntp_config _config;

Expand Down
26 changes: 14 additions & 12 deletions src/v/storage/log_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -749,14 +749,7 @@ ss::future<usage_report> log_manager::disk_usage() {
* TODO: this will be factored out to make the sharing of settings easier to
* maintain.
*/
model::timestamp collection_threshold;
if (!_config.delete_retention()) {
collection_threshold = model::timestamp(0);
} else {
collection_threshold = model::timestamp(
model::timestamp::now().value()
- _config.delete_retention()->count());
}
auto cfg = default_gc_config();

fragmented_vector<ss::shared_ptr<log>> logs;
for (auto& it : _logs) {
Expand All @@ -766,10 +759,7 @@ ss::future<usage_report> log_manager::disk_usage() {
co_return co_await ss::map_reduce(
logs.begin(),
logs.end(),
[this, collection_threshold](ss::shared_ptr<log> log) {
return log->disk_usage(
gc_config(collection_threshold, _config.retention_bytes()));
},
[cfg](ss::shared_ptr<log> log) { return log->disk_usage(cfg); },
usage_report{},
[](usage_report acc, usage_report update) { return acc + update; });
}
Expand Down Expand Up @@ -797,4 +787,16 @@ void log_manager::trigger_gc() {
_housekeeping_sem.signal();
}

gc_config log_manager::default_gc_config() const {
model::timestamp collection_threshold;
if (!_config.delete_retention()) {
collection_threshold = model::timestamp(0);
} else {
collection_threshold = model::timestamp(
model::timestamp::now().value()
- _config.delete_retention()->count());
}
return {collection_threshold, _config.retention_bytes()};
}

} // namespace storage
2 changes: 2 additions & 0 deletions src/v/storage/log_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ class log_manager {
*/
void trigger_gc();

gc_config default_gc_config() const;

private:
using logs_type
= absl::flat_hash_map<model::ntp, std::unique_ptr<log_housekeeping_meta>>;
Expand Down