Skip to content

Commit

Permalink
Merge pull request #11702 from dotnwat/space-management-v2
Browse files Browse the repository at this point in the history
storage: add greedy log storage target size control loop monitor
  • Loading branch information
jcsp authored Jun 28, 2023
2 parents 6439825 + 0be1c0d commit 9135779
Show file tree
Hide file tree
Showing 12 changed files with 359 additions and 46 deletions.
9 changes: 9 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ configuration::configuration()
.example = "31536000000",
.visibility = visibility::tunable},
24h * 365)
, log_storage_target_size(
*this,
"log_storage_target_size",
"The target size in bytes that log storage will try meet. When no target "
"is specified storage usage is unbounded.",
{.needs_restart = needs_restart::no,
.example = "2147483648000",
.visibility = visibility::tunable},
std::nullopt)
, rpc_server_listen_backlog(
*this,
"rpc_server_listen_backlog",
Expand Down
1 change: 1 addition & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ struct configuration final : public config_store {
bounded_property<std::optional<std::chrono::milliseconds>> log_segment_ms;
property<std::chrono::milliseconds> log_segment_ms_min;
property<std::chrono::milliseconds> log_segment_ms_max;
property<std::optional<uint64_t>> log_storage_target_size;

// Network
bounded_property<std::optional<int>> rpc_server_listen_backlog;
Expand Down
1 change: 1 addition & 0 deletions src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1361,6 +1361,7 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
construct_single_service(
space_manager,
config::shard_local_cfg().enable_storage_space_manager.bind(),
config::shard_local_cfg().log_storage_target_size.bind(),
&storage,
&storage_node,
&shadow_index_cache,
Expand Down
198 changes: 154 additions & 44 deletions src/v/resource_mgmt/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

#include "cloud_storage/cache_service.h"
#include "cluster/partition_manager.h"
#include "storage/disk_log_impl.h"
#include "utils/human.h"
#include "vlog.h"

#include <seastar/util/log.hh>
Expand All @@ -23,6 +25,7 @@ namespace storage {

disk_space_manager::disk_space_manager(
config::binding<bool> enabled,
config::binding<std::optional<uint64_t>> log_storage_target_size,
ss::sharded<storage::api>* storage,
ss::sharded<storage::node>* storage_node,
ss::sharded<cloud_storage::cache>* cache,
Expand All @@ -31,7 +34,8 @@ disk_space_manager::disk_space_manager(
, _storage(storage)
, _storage_node(storage_node)
, _cache(cache->local_is_initialized() ? cache : nullptr)
, _pm(pm) {
, _pm(pm)
, _log_storage_target_size(std::move(log_storage_target_size)) {
_enabled.watch([this] {
vlog(
rlog.info,
Expand Down Expand Up @@ -74,10 +78,13 @@ ss::future<> disk_space_manager::run_loop() {
vassert(ss::this_shard_id() == run_loop_core, "Run on wrong core");

/*
* we want the code here to actually run a little, but the final shape of
* configuration options is not yet known.
* In the short term this frequency should be low enough to provide decent
* reactivity to changes in space usage, but not too low as to do an
* excessive amount of work. Medium term we want storage to trigger an
* upcall to start the monitor loop when it appears that we are getting
* close to an important threshold.
*/
constexpr auto frequency = std::chrono::seconds(30);
constexpr auto frequency = std::chrono::seconds(5);

while (!_gate.is_closed()) {
try {
Expand All @@ -95,60 +102,163 @@ ss::future<> disk_space_manager::run_loop() {
continue;
}

/*
* Collect cache and logs storage usage information. These accumulate
* across all shards (despite the local() accessor). If a failure occurs
* we wait rather than operate with a reduced set of information.
*/
cloud_storage::cache_usage_target cache_usage_target;
try {
cache_usage_target
= co_await _pm->local().get_cloud_cache_disk_usage_target();
} catch (...) {
vlog(
rlog.info,
"Unable to collect cloud cache usage: {}",
std::current_exception());
if (_log_storage_target_size().has_value()) {
co_await manage_data_disk(_log_storage_target_size().value());
}
}
}

/*
* Attempts to set retention offset for cloud enabled topics such that the
* target amount of bytes will be recovered when garbage collection is applied.
*
* This is greedy approach which will recover as much as possible from each
* partition before moving on to the next.
*/
static ss::future<size_t>
set_partition_retention_offsets(cluster::partition_manager& pm, size_t target) {
// build a lightweight copy to avoid invalidations during iteration
fragmented_vector<ss::lw_shared_ptr<cluster::partition>> partitions;
for (const auto& p : pm.partitions()) {
if (!p.second->remote_partition()) {
continue;
}
partitions.push_back(p.second);
}

size_t partitions_total = 0;
for (const auto& p : partitions) {
if (partitions_total >= target) {
break;
}

storage::usage_report logs_usage;
try {
logs_usage = co_await _storage->local().disk_usage();
} catch (...) {
vlog(
rlog.info,
"Unable to collect log storage usage: {}",
std::current_exception());
auto log = dynamic_cast<storage::disk_log_impl*>(p->log().get_impl());
// make sure housekeeping doesn't delete the log from below us
auto gate = log->gate().hold();

auto segments = log->cloud_gc_eligible_segments();
if (segments.empty()) {
continue;
}

size_t log_total = 0;
auto offset = segments.front()->offsets().committed_offset;
for (const auto& seg : segments) {
auto usage = co_await seg->persistent_size();
log_total += usage.total();
if (log_total >= target) {
break;
}
}

vlog(
rlog.info,
"Setting retention offset override {} estimated reclaim of {} for "
"cloud topic {}",
offset,
log_total,
p->ntp());

log->set_cloud_gc_offset(offset);
partitions_total += log_total;
}

co_return partitions_total;
}

ss::future<> disk_space_manager::manage_data_disk(uint64_t target_size) {
/*
* query log storage usage across all cores
*/
storage::usage_report usage;
try {
usage = co_await _storage->local().disk_usage();
} catch (...) {
vlog(
rlog.debug,
"Cloud storage cache target minimum size {} nice to have {}",
cache_usage_target.target_min_bytes,
cache_usage_target.target_bytes);
rlog.info,
"Unable to collect log storage usage. Skipping management tick: "
"{}",
std::current_exception());
co_return;
}

/*
* how much data from log storage do we need to remove from disk to be able
* to stay below the current target size?
*/
const auto target_excess = usage.usage.total() < target_size
? 0
: usage.usage.total() - target_size;
if (target_excess <= 0) {
vlog(
rlog.debug,
"Log storage usage total {} - data {} index {} compaction {}",
logs_usage.usage.total(),
logs_usage.usage.data,
logs_usage.usage.index,
logs_usage.usage.compaction);
rlog.info,
"Log storage usage {} <= target size {}. No work to do.",
human::bytes(usage.usage.total()),
human::bytes(target_size));
co_return;
}

/*
* when log storage has exceeded the target usage, then there are some knobs
* that can be adjusted to help stay below this target.
*
* the first knob is to prioritize garbage collection. this is normally a
* periodic task performed by the storage layer. if it hasn't run recently,
* or it has been spending most of its time doing low impact compaction
* work, then we can instead immediately begin applying retention rules.
*
* the effect of gc is defined entirely by topic configuration and current
* state of storage. so in order to turn the first knob we need only trigger
* gc across shards.
*
* however, if current retention is not sufficient to bring usage below the
* target, then a second knob must be turned: overriding local retention
* targets for cloud-enabled topics, removing data that has been backed up
* into the cloud.
*/
if (target_excess > usage.reclaim.retention) {
vlog(
rlog.debug,
"Log storage usage available for reclaim local {} total {}",
logs_usage.reclaim.retention,
logs_usage.reclaim.available);
rlog.info,
"Log storage usage {} > target size {} by {}. Garbage collection "
"expected to recover {}. Overriding tiered storage retention to "
"recover {}. Total estimated available to recover {}",
human::bytes(usage.usage.total()),
human::bytes(target_size),
human::bytes(target_excess),
human::bytes(usage.reclaim.retention),
human::bytes(target_excess - usage.reclaim.retention),
human::bytes(usage.reclaim.available));

/*
* This is a simple greedy approach. It will attempt to reclaim as much
* data as possible from each partition on each core, stopping once
* enough space has been reclaimed to meet the current target.
*/
size_t total = 0;
for (auto shard : ss::smp::all_cpus()) {
auto goal = target_excess - total;
total += co_await _pm->invoke_on(shard, [goal](auto& pm) {
return set_partition_retention_offsets(pm, goal);
});
if (total >= target_excess) {
break;
}
}
} else {
vlog(
rlog.debug,
"Log storage usage target minimum size {} nice to have {}",
logs_usage.target.min_capacity,
logs_usage.target.min_capacity_wanted);
rlog.info,
"Log storage usage {} > target size {} by {}. Garbage collection "
"expected to recover {}.",
human::bytes(usage.usage.total()),
human::bytes(target_size),
human::bytes(target_excess),
human::bytes(usage.reclaim.retention));
}

/*
* ask storage across all nodes to apply retention rules asap.
*/
co_await _storage->invoke_on_all([](api& api) { api.trigger_gc(); });
}

} // namespace storage
4 changes: 4 additions & 0 deletions src/v/resource_mgmt/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class disk_space_manager {
public:
disk_space_manager(
config::binding<bool> enabled,
config::binding<std::optional<uint64_t>> log_storage_target_size,
ss::sharded<storage::api>* storage,
ss::sharded<storage::node>* storage_node,
ss::sharded<cloud_storage::cache>* cache,
Expand Down Expand Up @@ -69,6 +70,9 @@ class disk_space_manager {
node::disk_space_info _cache_disk_info{};
node::disk_space_info _data_disk_info{};

ss::future<> manage_data_disk(uint64_t target_size);
config::binding<std::optional<uint64_t>> _log_storage_target_size;

ss::gate _gate;
ss::future<> run_loop();
ssx::semaphore _control_sem{0, "resource_mgmt::space_manager"};
Expand Down
2 changes: 2 additions & 0 deletions src/v/storage/api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ void api::handle_disk_notification(
}
}

void api::trigger_gc() { _log_mgr->trigger_gc(); }

} // namespace storage
5 changes: 5 additions & 0 deletions src/v/storage/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ class api : public ss::peering_sharded_service<api> {
uint64_t free_space,
storage::disk_space_alert alert);

/*
* ask local log manager to trigger gc
*/
void trigger_gc();

private:
storage_resources _resources;

Expand Down
Loading

0 comments on commit 9135779

Please sign in to comment.