Skip to content

Commit

Permalink
[v24.1.x] cloud: Add config options to trigger cache trim
Browse files Browse the repository at this point in the history
Backport of PR #18756
  • Loading branch information
jcipar committed Jun 12, 2024
1 parent 79866e5 commit f71f6c5
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 14 deletions.
77 changes: 64 additions & 13 deletions src/v/cloud_storage/cache_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <algorithm>
#include <exception>
#include <filesystem>
#include <optional>
#include <stdexcept>
#include <string_view>

Expand Down Expand Up @@ -137,10 +138,7 @@ void cache::update_max_bytes() {
_max_percent());

if (_current_cache_size > _max_bytes) {
ssx::spawn_with_gate(_gate, [this]() {
return ss::with_semaphore(
_cleanup_sm, 1, [this]() { return trim_throttled(); });
});
ssx::spawn_with_gate(_gate, [this]() { return trim_throttled(); });
}
}

Expand Down Expand Up @@ -266,7 +264,9 @@ std::optional<std::chrono::milliseconds> cache::get_trim_delay() const {
}
}

ss::future<> cache::trim_throttled() {
ss::future<> cache::trim_throttled_unlocked(
std::optional<uint64_t> size_limit_override,
std::optional<size_t> object_limit_override) {
// If we trimmed very recently then do not do it immediately:
// this reduces load and improves chance of currently promoted
// segments finishing their read work before we demote their
Expand All @@ -282,7 +282,15 @@ ss::future<> cache::trim_throttled() {
co_await ss::sleep_abortable(*trim_delay, _as);
}

co_await trim();
co_await trim(size_limit_override, object_limit_override);
}

ss::future<> cache::trim_throttled(
std::optional<uint64_t> size_limit_override,
std::optional<size_t> object_limit_override) {
auto units = co_await ss::get_units(_cleanup_sm, 1);
co_await trim_throttled_unlocked(
size_limit_override, object_limit_override);
}

ss::future<> cache::trim_manually(
Expand Down Expand Up @@ -1095,10 +1103,7 @@ ss::future<> cache::put(
// Trim proactively: if many fibers hit this concurrently,
// they'll contend for cleanup_sm and the losers will skip
// trim due to throttling.
{
auto units = co_await ss::get_units(_cleanup_sm, 1);
co_await trim_throttled();
}
co_await trim_throttled();

throw disk_full_error;
}
Expand Down Expand Up @@ -1384,9 +1389,55 @@ cache::trim_carryover(uint64_t delete_bytes, uint64_t delete_objects) {
co_return result;
}

void cache::maybe_background_trim() {
auto& trim_threshold_pct_objects
= config::shard_local_cfg()
.cloud_storage_cache_trim_threshold_percent_objects;
auto& trim_threshold_pct_size
= config::shard_local_cfg()
.cloud_storage_cache_trim_threshold_percent_size;
if (
!trim_threshold_pct_size.value().has_value()
&& !trim_threshold_pct_objects.value().has_value()) {
return;
}

uint64_t target_bytes = uint64_t(
_max_bytes * trim_threshold_pct_size.value().value_or(100.0) / 100.0);
uint32_t target_objects = uint32_t(
_max_objects() * trim_threshold_pct_objects.value().value_or(100.0)
/ 100.0);

bool bytes_over_limit = _current_cache_size + _reserved_cache_size
> target_bytes;
bool objects_over_limit = _current_cache_objects + _reserved_cache_objects
> target_objects;

if (bytes_over_limit || objects_over_limit) {
auto units = ss::try_get_units(_cleanup_sm, 1);
if (units.has_value()) {
vlog(cst_log.debug, "Spawning background trim");
ssx::spawn_with_gate(
_gate,
[this,
target_bytes,
target_objects,
u = std::move(units)]() mutable {
return trim_throttled_unlocked(target_bytes, target_objects)
.finally([u = std::move(u)] {});
});
} else {
vlog(
cst_log.debug, "Not spawning background trim: already started");
}
}
}

ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) {
vassert(ss::this_shard_id() == ss::shard_id{0}, "Only call on shard 0");

maybe_background_trim();

if (may_reserve_space(bytes, objects)) {
// Fast path: space was available.
_reserved_cache_size += bytes;
Expand Down Expand Up @@ -1463,7 +1514,8 @@ ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) {
// We want number of full trims to match number of carryover trims.
vlog(cst_log.debug, "Spawning background trim_throttled");
ssx::spawn_with_gate(_gate, [this, u = std::move(units)]() mutable {
return trim_throttled().finally([u = std::move(u)] {});
return trim_throttled_unlocked(std::nullopt, std::nullopt)
.finally([u = std::move(u)] {});
});
co_return;
}
Expand Down Expand Up @@ -1496,7 +1548,7 @@ ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) {
// After taking lock, there still isn't space: means someone
// else didn't take it and free space for us already, so we will
// do the trim.
co_await trim_throttled();
co_await trim_throttled_unlocked();
did_trim = true;
}

Expand Down Expand Up @@ -1644,5 +1696,4 @@ ss::future<> cache::initialize(std::filesystem::path cache_dir) {
co_await ss::recursive_touch_directory(cache_dir.string());
}
}

} // namespace cloud_storage
14 changes: 13 additions & 1 deletion src/v/cloud_storage/cache_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
#include <seastar/core/gate.hh>
#include <seastar/core/io_priority_class.hh>
#include <seastar/core/iostream.hh>
#include <seastar/core/thread.hh>

#include <filesystem>
#include <iterator>
#include <optional>
#include <set>
#include <string_view>

Expand Down Expand Up @@ -234,7 +237,16 @@ class cache : public ss::peering_sharded_service<cache> {
std::optional<std::chrono::milliseconds> get_trim_delay() const;

/// Invoke trim, waiting if not enough time passed since the last trim
ss::future<> trim_throttled();
ss::future<> trim_throttled_unlocked(
std::optional<uint64_t> size_limit_override = std::nullopt,
std::optional<size_t> object_limit_override = std::nullopt);

// Take the cleanup semaphore before calling trim_throttled
ss::future<> trim_throttled(
std::optional<uint64_t> size_limit_override = std::nullopt,
std::optional<size_t> object_limit_override = std::nullopt);

void maybe_background_trim();

/// Whether an objects path makes it impervious to pinning, like
/// the access time tracker.
Expand Down
79 changes: 79 additions & 0 deletions src/v/cloud_storage/tests/cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -621,3 +621,82 @@ FIXTURE_TEST(test_cache_carryover_trim, cache_test_fixture) {
BOOST_REQUIRE_EQUAL(after_bytes, 0);
BOOST_REQUIRE_EQUAL(after_objects, 0);
}

FIXTURE_TEST(bucketing_works_with_old_objects, cache_test_fixture) {
auto data_string = create_data_string('a', 1_KiB);
const std::filesystem::path key1{"80000001/a/b/c/d/file1.txt"};
const std::filesystem::path key2{"80000002/a/b/c/d/file2.txt"};
const std::filesystem::path key2_rehashed{"2/a/b/c/d/file2.txt"};
put_into_cache(data_string, key1);

scoped_config cfg;
cfg.get("cloud_storage_cache_num_buckets").set_value((uint32_t)16);
// Now key2 should be mapped to "2/a/b/c/d/file2.txt"

ss::sleep(1s).get();
put_into_cache(data_string, key2);

BOOST_CHECK(ss::file_exists((CACHE_DIR / key1).native()).get());
BOOST_CHECK(!ss::file_exists((CACHE_DIR / key2).native()).get());
BOOST_CHECK(ss::file_exists((CACHE_DIR / key2_rehashed).native()).get());

// check that GET works
auto r1 = sharded_cache.local().get(key1).get();
auto r2 = sharded_cache.local().get(key2).get();
BOOST_CHECK(r1.has_value());
BOOST_CHECK(r2.has_value());

// check is_cached
BOOST_CHECK(
sharded_cache.local().is_cached(key1).get()
== cache_element_status::available);
BOOST_CHECK(
sharded_cache.local().is_cached(key2).get()
== cache_element_status::available);

// check cache invalidation
sharded_cache.local().invalidate(key1).get();
sharded_cache.local().invalidate(key2).get();
BOOST_CHECK(!ss::file_exists((CACHE_DIR / key1).native()).get());
BOOST_CHECK(!ss::file_exists((CACHE_DIR / key2).native()).get());
BOOST_CHECK(!ss::file_exists((CACHE_DIR / key2_rehashed).native()).get());
}

FIXTURE_TEST(test_background_maybe_trim, cache_test_fixture) {
std::string write_buf(1_KiB, ' ');
random_generators::fill_buffer_randomchars(
write_buf.data(), write_buf.size());
size_t num_objects = 100;
std::vector<std::filesystem::path> object_keys;
for (int i = 0; i < num_objects; i++) {
object_keys.emplace_back(fmt::format("test_{}.log.1", i));
std::ofstream segment{CACHE_DIR / object_keys.back()};
segment.write(
write_buf.data(), static_cast<std::streamsize>(write_buf.size()));
segment.flush();
}

// Account all files in the cache (100 MiB).
clean_up_at_start().get();
BOOST_REQUIRE_EQUAL(get_object_count(), 100);

for (const auto& key : object_keys) {
// Touch every object so they have access times assigned to them
auto item = sharded_cache.local().get(key).get();
item->body.close().get();
}
BOOST_REQUIRE_EQUAL(get_object_count(), 100);

set_trim_thresholds(100.0, 50.0, 100);

// Do a put which should trigger a background trim and reduce the
// object count to 40, followed by a put that will increase it to 41.
auto data_string = create_data_string('a', 1_KiB);
put_into_cache(data_string, KEY);
wait_for_trim();

// 41 because we set a trigger of 50. That means that trim is triggered at
// 50%, but the low water mark adjusts this by an additional 80%, 50% * 80%
// = 40%. +1 for the object we just added.
BOOST_REQUIRE_EQUAL(get_object_count(), 41);
}
44 changes: 44 additions & 0 deletions src/v/cloud_storage/tests/cache_test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,22 @@
#include "cloud_storage/cache_service.h"
#include "config/property.h"
#include "seastarx.h"
#include "test_utils/scoped_config.h"
#include "test_utils/tmp_dir.h"
#include "units.h"

#include <seastar/core/lowres_clock.hh>
#include <seastar/core/seastar.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/sstring.hh>

#include <boost/filesystem/operations.hpp>

#include <chrono>
#include <cstdint>
#include <filesystem>
#include <optional>

using namespace std::chrono_literals;

Expand Down Expand Up @@ -123,6 +127,46 @@ class cache_test_fixture {
void trim_carryover(uint64_t size_limit, uint64_t object_limit) {
sharded_cache.local().trim_carryover(size_limit, object_limit).get();
}

void set_trim_thresholds(
double size_limit_percent,
double object_limit_percent,
uint32_t max_objects) {
cfg.get("cloud_storage_cache_trim_threshold_percent_size")
.set_value(std::make_optional<double>(size_limit_percent));
cfg.get("cloud_storage_cache_trim_threshold_percent_objects")
.set_value(std::make_optional<double>(object_limit_percent));

sharded_cache
.invoke_on(
ss::shard_id{0},
[max_objects](cloud_storage::cache& c) {
c._max_objects = config::mock_binding<uint32_t>(max_objects);
})
.get();
}

void wait_for_trim() {
// Waits for the cleanup semaphore to be available. This ensures that
// there are no trim operations in progress.
sharded_cache
.invoke_on(
ss::shard_id{0},
[](cloud_storage::cache& c) {
auto units = ss::get_units(c._cleanup_sm, 1).get();
})
.get();
}

uint32_t get_object_count() {
return sharded_cache
.invoke_on(
ss::shard_id{0},
[](cloud_storage::cache& c) { return c._current_cache_objects; })
.get();
}

scoped_config cfg;
};

} // namespace cloud_storage
27 changes: 27 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2386,6 +2386,33 @@ configuration::configuration()
"Number of chunks to prefetch ahead of every downloaded chunk",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
0)
, cloud_storage_cache_num_buckets(
*this,
"cloud_storage_cache_num_buckets",
"Divide cloud storage cache across specified number of buckets. This "
"only works for objects with randomized prefixes. The names will not be "
"changed if the value is set to zero.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
0,
{.min = 0, .max = 1024})
, cloud_storage_cache_trim_threshold_percent_size(
*this,
"cloud_storage_cache_trim_threshold_percent_size",
"Trim is triggered when the cache reaches this percent of the maximum "
"cache size. If this is unset, the default behavior"
"is to start trim when the cache is about 100\% full.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
std::nullopt,
{.min = 1.0, .max = 100.0})
, cloud_storage_cache_trim_threshold_percent_objects(
*this,
"cloud_storage_cache_trim_threshold_percent_objects",
"Trim is triggered when the cache reaches this percent of the maximum "
"object count. If this is unset, the default behavior"
"is to start trim when the cache is about 100\% full.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
std::nullopt,
{.min = 1.0, .max = 100.0})
, superusers(
*this,
"superusers",
Expand Down
5 changes: 5 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,11 @@ struct configuration final : public config_store {
enum_property<model::cloud_storage_chunk_eviction_strategy>
cloud_storage_chunk_eviction_strategy;
property<uint16_t> cloud_storage_chunk_prefetch;
bounded_property<uint32_t> cloud_storage_cache_num_buckets;
bounded_property<std::optional<double>, numeric_bounds>
cloud_storage_cache_trim_threshold_percent_size;
bounded_property<std::optional<double>, numeric_bounds>
cloud_storage_cache_trim_threshold_percent_objects;

one_or_many_property<ss::sstring> superusers;

Expand Down
7 changes: 7 additions & 0 deletions src/v/config/property.h
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,8 @@ class binding : public binding_base<T> {
friend class mock_property<T>;
template<typename U>
friend inline binding<U> mock_binding(U&&);
template<typename U>
friend inline binding<U> mock_binding(U const&);
};

/**
Expand All @@ -474,6 +476,11 @@ inline binding<T> mock_binding(T&& value) {
return binding<T>(std::forward<T>(value));
}

template<typename T>
inline binding<T> mock_binding(T const& value) {
return binding<T>(T(value));
}

/**
* A conversion property binding contains the result of application of
* a conversion function to property value. The result is update in-place
Expand Down

0 comments on commit f71f6c5

Please sign in to comment.