Skip to content

Commit

Permalink
Merge pull request #18756 from jcipar/jcipar/cache-trim-settings
Browse files Browse the repository at this point in the history
cloud: Add config options to trigger cache trim
  • Loading branch information
jcipar authored Jun 10, 2024
2 parents dcbba8c + 027632c commit dfff9c2
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 14 deletions.
78 changes: 65 additions & 13 deletions src/v/cloud_storage/cache_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <seastar/core/fstream.hh>
#include <seastar/core/io_priority_class.hh>
#include <seastar/core/seastar.hh>
#include <seastar/core/shard_id.hh>
#include <seastar/core/smp.hh>
#include <seastar/core/sstring.hh>
#include <seastar/util/defer.hh>
Expand All @@ -35,6 +36,7 @@
#include <algorithm>
#include <exception>
#include <filesystem>
#include <optional>
#include <stdexcept>
#include <string_view>

Expand Down Expand Up @@ -137,10 +139,7 @@ void cache::update_max_bytes() {
_max_percent());

if (_current_cache_size > _max_bytes) {
ssx::spawn_with_gate(_gate, [this]() {
return ss::with_semaphore(
_cleanup_sm, 1, [this]() { return trim_throttled(); });
});
ssx::spawn_with_gate(_gate, [this]() { return trim_throttled(); });
}
}

Expand Down Expand Up @@ -266,7 +265,9 @@ std::optional<std::chrono::milliseconds> cache::get_trim_delay() const {
}
}

ss::future<> cache::trim_throttled() {
ss::future<> cache::trim_throttled_unlocked(
std::optional<uint64_t> size_limit_override,
std::optional<size_t> object_limit_override) {
// If we trimmed very recently then do not do it immediately:
// this reduces load and improves chance of currently promoted
// segments finishing their read work before we demote their
Expand All @@ -282,7 +283,15 @@ ss::future<> cache::trim_throttled() {
co_await ss::sleep_abortable(*trim_delay, _as);
}

co_await trim();
co_await trim(size_limit_override, object_limit_override);
}

ss::future<> cache::trim_throttled(
std::optional<uint64_t> size_limit_override,
std::optional<size_t> object_limit_override) {
auto units = co_await ss::get_units(_cleanup_sm, 1);
co_await trim_throttled_unlocked(
size_limit_override, object_limit_override);
}

ss::future<> cache::trim_manually(
Expand Down Expand Up @@ -1182,10 +1191,7 @@ ss::future<> cache::put(
// Trim proactively: if many fibers hit this concurrently,
// they'll contend for cleanup_sm and the losers will skip
// trim due to throttling.
{
auto units = co_await ss::get_units(_cleanup_sm, 1);
co_await trim_throttled();
}
co_await trim_throttled();

throw disk_full_error;
}
Expand Down Expand Up @@ -1496,9 +1502,55 @@ cache::trim_carryover(uint64_t delete_bytes, uint64_t delete_objects) {
co_return result;
}

void cache::maybe_background_trim() {
auto& trim_threshold_pct_objects
= config::shard_local_cfg()
.cloud_storage_cache_trim_threshold_percent_objects;
auto& trim_threshold_pct_size
= config::shard_local_cfg()
.cloud_storage_cache_trim_threshold_percent_size;
if (
!trim_threshold_pct_size.value().has_value()
&& !trim_threshold_pct_objects.value().has_value()) {
return;
}

uint64_t target_bytes = uint64_t(
_max_bytes * trim_threshold_pct_size.value().value_or(100.0) / 100.0);
uint32_t target_objects = uint32_t(
_max_objects() * trim_threshold_pct_objects.value().value_or(100.0)
/ 100.0);

bool bytes_over_limit = _current_cache_size + _reserved_cache_size
> target_bytes;
bool objects_over_limit = _current_cache_objects + _reserved_cache_objects
> target_objects;

if (bytes_over_limit || objects_over_limit) {
auto units = ss::try_get_units(_cleanup_sm, 1);
if (units.has_value()) {
vlog(cst_log.debug, "Spawning background trim");
ssx::spawn_with_gate(
_gate,
[this,
target_bytes,
target_objects,
u = std::move(units)]() mutable {
return trim_throttled_unlocked(target_bytes, target_objects)
.finally([u = std::move(u)] {});
});
} else {
vlog(
cst_log.debug, "Not spawning background trim: already started");
}
}
}

ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) {
vassert(ss::this_shard_id() == ss::shard_id{0}, "Only call on shard 0");

maybe_background_trim();

if (may_reserve_space(bytes, objects)) {
// Fast path: space was available.
_reserved_cache_size += bytes;
Expand Down Expand Up @@ -1575,7 +1627,8 @@ ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) {
// We want number of full trims to match number of carryover trims.
vlog(cst_log.debug, "Spawning background trim_throttled");
ssx::spawn_with_gate(_gate, [this, u = std::move(units)]() mutable {
return trim_throttled().finally([u = std::move(u)] {});
return trim_throttled_unlocked(std::nullopt, std::nullopt)
.finally([u = std::move(u)] {});
});
co_return;
}
Expand Down Expand Up @@ -1608,7 +1661,7 @@ ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) {
// After taking lock, there still isn't space: means someone
// else didn't take it and free space for us already, so we will
// do the trim.
co_await trim_throttled();
co_await trim_throttled_unlocked();
did_trim = true;
}

Expand Down Expand Up @@ -1756,5 +1809,4 @@ ss::future<> cache::initialize(std::filesystem::path cache_dir) {
co_await ss::recursive_touch_directory(cache_dir.string());
}
}

} // namespace cloud_storage
14 changes: 13 additions & 1 deletion src/v/cloud_storage/cache_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
#include <seastar/core/gate.hh>
#include <seastar/core/io_priority_class.hh>
#include <seastar/core/iostream.hh>
#include <seastar/core/thread.hh>

#include <filesystem>
#include <iterator>
#include <optional>
#include <set>
#include <string_view>

Expand Down Expand Up @@ -242,7 +245,16 @@ class cache : public ss::peering_sharded_service<cache> {
std::optional<std::chrono::milliseconds> get_trim_delay() const;

/// Invoke trim, waiting if not enough time passed since the last trim
ss::future<> trim_throttled();
ss::future<> trim_throttled_unlocked(
std::optional<uint64_t> size_limit_override = std::nullopt,
std::optional<size_t> object_limit_override = std::nullopt);

// Take the cleanup semaphore before calling trim_throttled
ss::future<> trim_throttled(
std::optional<uint64_t> size_limit_override = std::nullopt,
std::optional<size_t> object_limit_override = std::nullopt);

void maybe_background_trim();

/// Whether an objects path makes it impervious to pinning, like
/// the access time tracker.
Expand Down
39 changes: 39 additions & 0 deletions src/v/cloud_storage/tests/cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -663,3 +663,42 @@ FIXTURE_TEST(bucketing_works_with_old_objects, cache_test_fixture) {
BOOST_CHECK(!ss::file_exists((CACHE_DIR / key2).native()).get());
BOOST_CHECK(!ss::file_exists((CACHE_DIR / key2_rehashed).native()).get());
}

FIXTURE_TEST(test_background_maybe_trim, cache_test_fixture) {
std::string write_buf(1_KiB, ' ');
random_generators::fill_buffer_randomchars(
write_buf.data(), write_buf.size());
size_t num_objects = 100;
std::vector<std::filesystem::path> object_keys;
for (int i = 0; i < num_objects; i++) {
object_keys.emplace_back(fmt::format("test_{}.log.1", i));
std::ofstream segment{CACHE_DIR / object_keys.back()};
segment.write(
write_buf.data(), static_cast<std::streamsize>(write_buf.size()));
segment.flush();
}

// Account all files in the cache (100 MiB).
clean_up_at_start().get();
BOOST_REQUIRE_EQUAL(get_object_count(), 100);

for (const auto& key : object_keys) {
// Touch every object so they have access times assigned to them
auto item = sharded_cache.local().get(key).get();
item->body.close().get();
}
BOOST_REQUIRE_EQUAL(get_object_count(), 100);

set_trim_thresholds(100.0, 50.0, 100);

// Do a put which should trigger a background trim and reduce the
// object count to 40, followed by a put that will increase it to 41.
auto data_string = create_data_string('a', 1_KiB);
put_into_cache(data_string, KEY);
wait_for_trim();

// 41 because we set a trigger of 50. That means that trim is triggered at
// 50%, but the low water mark adjusts this by an additional 80%, 50% * 80%
// = 40%. +1 for the object we just added.
BOOST_REQUIRE_EQUAL(get_object_count(), 41);
}
44 changes: 44 additions & 0 deletions src/v/cloud_storage/tests/cache_test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,21 @@
#include "bytes/iobuf.h"
#include "cloud_storage/cache_service.h"
#include "config/property.h"
#include "test_utils/scoped_config.h"
#include "test_utils/tmp_dir.h"

#include <seastar/core/lowres_clock.hh>
#include <seastar/core/seastar.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/sstring.hh>

#include <boost/filesystem/operations.hpp>

#include <chrono>
#include <cstdint>
#include <filesystem>
#include <optional>

using namespace std::chrono_literals;

Expand Down Expand Up @@ -123,6 +127,46 @@ class cache_test_fixture {
void trim_carryover(uint64_t size_limit, uint64_t object_limit) {
sharded_cache.local().trim_carryover(size_limit, object_limit).get();
}

void set_trim_thresholds(
double size_limit_percent,
double object_limit_percent,
uint32_t max_objects) {
cfg.get("cloud_storage_cache_trim_threshold_percent_size")
.set_value(std::make_optional<double>(size_limit_percent));
cfg.get("cloud_storage_cache_trim_threshold_percent_objects")
.set_value(std::make_optional<double>(object_limit_percent));

sharded_cache
.invoke_on(
ss::shard_id{0},
[max_objects](cloud_storage::cache& c) {
c._max_objects = config::mock_binding<uint32_t>(max_objects);
})
.get();
}

void wait_for_trim() {
// Waits for the cleanup semaphore to be available. This ensures that
// there are no trim operations in progress.
sharded_cache
.invoke_on(
ss::shard_id{0},
[](cloud_storage::cache& c) {
auto units = ss::get_units(c._cleanup_sm, 1).get();
})
.get();
}

uint32_t get_object_count() {
return sharded_cache
.invoke_on(
ss::shard_id{0},
[](cloud_storage::cache& c) { return c._current_cache_objects; })
.get();
}

scoped_config cfg;
};

} // namespace cloud_storage
18 changes: 18 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2540,6 +2540,24 @@ configuration::configuration()
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
0,
{.min = 0, .max = 1024})
, cloud_storage_cache_trim_threshold_percent_size(
*this,
"cloud_storage_cache_trim_threshold_percent_size",
"Trim is triggered when the cache reaches this percent of the maximum "
"cache size. If this is unset, the default behavior"
"is to start trim when the cache is about 100\% full.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
std::nullopt,
{.min = 1.0, .max = 100.0})
, cloud_storage_cache_trim_threshold_percent_objects(
*this,
"cloud_storage_cache_trim_threshold_percent_objects",
"Trim is triggered when the cache reaches this percent of the maximum "
"object count. If this is unset, the default behavior"
"is to start trim when the cache is about 100\% full.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
std::nullopt,
{.min = 1.0, .max = 100.0})
, superusers(
*this,
"superusers",
Expand Down
4 changes: 4 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,10 @@ struct configuration final : public config_store {
cloud_storage_chunk_eviction_strategy;
property<uint16_t> cloud_storage_chunk_prefetch;
bounded_property<uint32_t> cloud_storage_cache_num_buckets;
bounded_property<std::optional<double>, numeric_bounds>
cloud_storage_cache_trim_threshold_percent_size;
bounded_property<std::optional<double>, numeric_bounds>
cloud_storage_cache_trim_threshold_percent_objects;

one_or_many_property<ss::sstring> superusers;

Expand Down

0 comments on commit dfff9c2

Please sign in to comment.