From f71f6c501d707d34d5b25667455990daf4adfe9b Mon Sep 17 00:00:00 2001 From: Jim Cipar Date: Fri, 31 May 2024 17:49:33 -0400 Subject: [PATCH] [v24.1.x] cloud: Add config options to trigger cache trim Backport of PR #18756 --- src/v/cloud_storage/cache_service.cc | 77 +++++++++++++++--- src/v/cloud_storage/cache_service.h | 14 +++- src/v/cloud_storage/tests/cache_test.cc | 79 +++++++++++++++++++ .../cloud_storage/tests/cache_test_fixture.h | 44 +++++++++++ src/v/config/configuration.cc | 27 +++++++ src/v/config/configuration.h | 5 ++ src/v/config/property.h | 7 ++ 7 files changed, 239 insertions(+), 14 deletions(-) diff --git a/src/v/cloud_storage/cache_service.cc b/src/v/cloud_storage/cache_service.cc index 0ca0f6cf441f5..291c7fc6869c3 100644 --- a/src/v/cloud_storage/cache_service.cc +++ b/src/v/cloud_storage/cache_service.cc @@ -35,6 +35,7 @@ #include #include #include +#include #include #include @@ -137,10 +138,7 @@ void cache::update_max_bytes() { _max_percent()); if (_current_cache_size > _max_bytes) { - ssx::spawn_with_gate(_gate, [this]() { - return ss::with_semaphore( - _cleanup_sm, 1, [this]() { return trim_throttled(); }); - }); + ssx::spawn_with_gate(_gate, [this]() { return trim_throttled(); }); } } @@ -266,7 +264,9 @@ std::optional cache::get_trim_delay() const { } } -ss::future<> cache::trim_throttled() { +ss::future<> cache::trim_throttled_unlocked( + std::optional size_limit_override, + std::optional object_limit_override) { // If we trimmed very recently then do not do it immediately: // this reduces load and improves chance of currently promoted // segments finishing their read work before we demote their @@ -282,7 +282,15 @@ ss::future<> cache::trim_throttled() { co_await ss::sleep_abortable(*trim_delay, _as); } - co_await trim(); + co_await trim(size_limit_override, object_limit_override); +} + +ss::future<> cache::trim_throttled( + std::optional size_limit_override, + std::optional object_limit_override) { + auto units = co_await ss::get_units(_cleanup_sm, 1); + co_await trim_throttled_unlocked( + size_limit_override, object_limit_override); } ss::future<> cache::trim_manually( @@ -1095,10 +1103,7 @@ ss::future<> cache::put( // Trim proactively: if many fibers hit this concurrently, // they'll contend for cleanup_sm and the losers will skip // trim due to throttling. - { - auto units = co_await ss::get_units(_cleanup_sm, 1); - co_await trim_throttled(); - } + co_await trim_throttled(); throw disk_full_error; } @@ -1384,9 +1389,55 @@ cache::trim_carryover(uint64_t delete_bytes, uint64_t delete_objects) { co_return result; } +void cache::maybe_background_trim() { + auto& trim_threshold_pct_objects + = config::shard_local_cfg() + .cloud_storage_cache_trim_threshold_percent_objects; + auto& trim_threshold_pct_size + = config::shard_local_cfg() + .cloud_storage_cache_trim_threshold_percent_size; + if ( + !trim_threshold_pct_size.value().has_value() + && !trim_threshold_pct_objects.value().has_value()) { + return; + } + + uint64_t target_bytes = uint64_t( + _max_bytes * trim_threshold_pct_size.value().value_or(100.0) / 100.0); + uint32_t target_objects = uint32_t( + _max_objects() * trim_threshold_pct_objects.value().value_or(100.0) + / 100.0); + + bool bytes_over_limit = _current_cache_size + _reserved_cache_size + > target_bytes; + bool objects_over_limit = _current_cache_objects + _reserved_cache_objects + > target_objects; + + if (bytes_over_limit || objects_over_limit) { + auto units = ss::try_get_units(_cleanup_sm, 1); + if (units.has_value()) { + vlog(cst_log.debug, "Spawning background trim"); + ssx::spawn_with_gate( + _gate, + [this, + target_bytes, + target_objects, + u = std::move(units)]() mutable { + return trim_throttled_unlocked(target_bytes, target_objects) + .finally([u = std::move(u)] {}); + }); + } else { + vlog( + cst_log.debug, "Not spawning background trim: already started"); + } + } +} + ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) { vassert(ss::this_shard_id() == ss::shard_id{0}, "Only call on shard 0"); + maybe_background_trim(); + if (may_reserve_space(bytes, objects)) { // Fast path: space was available. _reserved_cache_size += bytes; @@ -1463,7 +1514,8 @@ ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) { // We want number of full trims to match number of carryover trims. vlog(cst_log.debug, "Spawning background trim_throttled"); ssx::spawn_with_gate(_gate, [this, u = std::move(units)]() mutable { - return trim_throttled().finally([u = std::move(u)] {}); + return trim_throttled_unlocked(std::nullopt, std::nullopt) + .finally([u = std::move(u)] {}); }); co_return; } @@ -1496,7 +1548,7 @@ ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) { // After taking lock, there still isn't space: means someone // else didn't take it and free space for us already, so we will // do the trim. - co_await trim_throttled(); + co_await trim_throttled_unlocked(); did_trim = true; } @@ -1644,5 +1696,4 @@ ss::future<> cache::initialize(std::filesystem::path cache_dir) { co_await ss::recursive_touch_directory(cache_dir.string()); } } - } // namespace cloud_storage diff --git a/src/v/cloud_storage/cache_service.h b/src/v/cloud_storage/cache_service.h index 76753a8ee14af..e86f76cfd6c3a 100644 --- a/src/v/cloud_storage/cache_service.h +++ b/src/v/cloud_storage/cache_service.h @@ -24,8 +24,11 @@ #include #include #include +#include #include +#include +#include #include #include @@ -234,7 +237,16 @@ class cache : public ss::peering_sharded_service { std::optional get_trim_delay() const; /// Invoke trim, waiting if not enough time passed since the last trim - ss::future<> trim_throttled(); + ss::future<> trim_throttled_unlocked( + std::optional size_limit_override = std::nullopt, + std::optional object_limit_override = std::nullopt); + + // Take the cleanup semaphore before calling trim_throttled + ss::future<> trim_throttled( + std::optional size_limit_override = std::nullopt, + std::optional object_limit_override = std::nullopt); + + void maybe_background_trim(); /// Whether an objects path makes it impervious to pinning, like /// the access time tracker. diff --git a/src/v/cloud_storage/tests/cache_test.cc b/src/v/cloud_storage/tests/cache_test.cc index 15076c5afb521..70fa0820de6c9 100644 --- a/src/v/cloud_storage/tests/cache_test.cc +++ b/src/v/cloud_storage/tests/cache_test.cc @@ -621,3 +621,82 @@ FIXTURE_TEST(test_cache_carryover_trim, cache_test_fixture) { BOOST_REQUIRE_EQUAL(after_bytes, 0); BOOST_REQUIRE_EQUAL(after_objects, 0); } + +FIXTURE_TEST(bucketing_works_with_old_objects, cache_test_fixture) { + auto data_string = create_data_string('a', 1_KiB); + const std::filesystem::path key1{"80000001/a/b/c/d/file1.txt"}; + const std::filesystem::path key2{"80000002/a/b/c/d/file2.txt"}; + const std::filesystem::path key2_rehashed{"2/a/b/c/d/file2.txt"}; + put_into_cache(data_string, key1); + + scoped_config cfg; + cfg.get("cloud_storage_cache_num_buckets").set_value((uint32_t)16); + // Now key2 should be mapped to "2/a/b/c/d/file2.txt" + + ss::sleep(1s).get(); + put_into_cache(data_string, key2); + + BOOST_CHECK(ss::file_exists((CACHE_DIR / key1).native()).get()); + BOOST_CHECK(!ss::file_exists((CACHE_DIR / key2).native()).get()); + BOOST_CHECK(ss::file_exists((CACHE_DIR / key2_rehashed).native()).get()); + + // check that GET works + auto r1 = sharded_cache.local().get(key1).get(); + auto r2 = sharded_cache.local().get(key2).get(); + BOOST_CHECK(r1.has_value()); + BOOST_CHECK(r2.has_value()); + + // check is_cached + BOOST_CHECK( + sharded_cache.local().is_cached(key1).get() + == cache_element_status::available); + BOOST_CHECK( + sharded_cache.local().is_cached(key2).get() + == cache_element_status::available); + + // check cache invalidation + sharded_cache.local().invalidate(key1).get(); + sharded_cache.local().invalidate(key2).get(); + BOOST_CHECK(!ss::file_exists((CACHE_DIR / key1).native()).get()); + BOOST_CHECK(!ss::file_exists((CACHE_DIR / key2).native()).get()); + BOOST_CHECK(!ss::file_exists((CACHE_DIR / key2_rehashed).native()).get()); +} + +FIXTURE_TEST(test_background_maybe_trim, cache_test_fixture) { + std::string write_buf(1_KiB, ' '); + random_generators::fill_buffer_randomchars( + write_buf.data(), write_buf.size()); + size_t num_objects = 100; + std::vector object_keys; + for (int i = 0; i < num_objects; i++) { + object_keys.emplace_back(fmt::format("test_{}.log.1", i)); + std::ofstream segment{CACHE_DIR / object_keys.back()}; + segment.write( + write_buf.data(), static_cast(write_buf.size())); + segment.flush(); + } + + // Account all files in the cache (100 MiB). + clean_up_at_start().get(); + BOOST_REQUIRE_EQUAL(get_object_count(), 100); + + for (const auto& key : object_keys) { + // Touch every object so they have access times assigned to them + auto item = sharded_cache.local().get(key).get(); + item->body.close().get(); + } + BOOST_REQUIRE_EQUAL(get_object_count(), 100); + + set_trim_thresholds(100.0, 50.0, 100); + + // Do a put which should trigger a background trim and reduce the + // object count to 40, followed by a put that will increase it to 41. + auto data_string = create_data_string('a', 1_KiB); + put_into_cache(data_string, KEY); + wait_for_trim(); + + // 41 because we set a trigger of 50. That means that trim is triggered at + // 50%, but the low water mark adjusts this by an additional 80%, 50% * 80% + // = 40%. +1 for the object we just added. + BOOST_REQUIRE_EQUAL(get_object_count(), 41); +} diff --git a/src/v/cloud_storage/tests/cache_test_fixture.h b/src/v/cloud_storage/tests/cache_test_fixture.h index 7ac9ae3390750..7ee3a969a1481 100644 --- a/src/v/cloud_storage/tests/cache_test_fixture.h +++ b/src/v/cloud_storage/tests/cache_test_fixture.h @@ -13,18 +13,22 @@ #include "cloud_storage/cache_service.h" #include "config/property.h" #include "seastarx.h" +#include "test_utils/scoped_config.h" #include "test_utils/tmp_dir.h" #include "units.h" #include #include +#include #include #include #include #include +#include #include +#include using namespace std::chrono_literals; @@ -123,6 +127,46 @@ class cache_test_fixture { void trim_carryover(uint64_t size_limit, uint64_t object_limit) { sharded_cache.local().trim_carryover(size_limit, object_limit).get(); } + + void set_trim_thresholds( + double size_limit_percent, + double object_limit_percent, + uint32_t max_objects) { + cfg.get("cloud_storage_cache_trim_threshold_percent_size") + .set_value(std::make_optional(size_limit_percent)); + cfg.get("cloud_storage_cache_trim_threshold_percent_objects") + .set_value(std::make_optional(object_limit_percent)); + + sharded_cache + .invoke_on( + ss::shard_id{0}, + [max_objects](cloud_storage::cache& c) { + c._max_objects = config::mock_binding(max_objects); + }) + .get(); + } + + void wait_for_trim() { + // Waits for the cleanup semaphore to be available. This ensures that + // there are no trim operations in progress. + sharded_cache + .invoke_on( + ss::shard_id{0}, + [](cloud_storage::cache& c) { + auto units = ss::get_units(c._cleanup_sm, 1).get(); + }) + .get(); + } + + uint32_t get_object_count() { + return sharded_cache + .invoke_on( + ss::shard_id{0}, + [](cloud_storage::cache& c) { return c._current_cache_objects; }) + .get(); + } + + scoped_config cfg; }; } // namespace cloud_storage diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 245173bafb911..e04d38c8d4e0a 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -2386,6 +2386,33 @@ configuration::configuration() "Number of chunks to prefetch ahead of every downloaded chunk", {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, 0) + , cloud_storage_cache_num_buckets( + *this, + "cloud_storage_cache_num_buckets", + "Divide cloud storage cache across specified number of buckets. This " + "only works for objects with randomized prefixes. The names will not be " + "changed if the value is set to zero.", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + 0, + {.min = 0, .max = 1024}) + , cloud_storage_cache_trim_threshold_percent_size( + *this, + "cloud_storage_cache_trim_threshold_percent_size", + "Trim is triggered when the cache reaches this percent of the maximum " + "cache size. If this is unset, the default behavior" + "is to start trim when the cache is about 100\% full.", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + std::nullopt, + {.min = 1.0, .max = 100.0}) + , cloud_storage_cache_trim_threshold_percent_objects( + *this, + "cloud_storage_cache_trim_threshold_percent_objects", + "Trim is triggered when the cache reaches this percent of the maximum " + "object count. If this is unset, the default behavior" + "is to start trim when the cache is about 100\% full.", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + std::nullopt, + {.min = 1.0, .max = 100.0}) , superusers( *this, "superusers", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index ab61bf9e79728..3626d2e15f09e 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -432,6 +432,11 @@ struct configuration final : public config_store { enum_property cloud_storage_chunk_eviction_strategy; property cloud_storage_chunk_prefetch; + bounded_property cloud_storage_cache_num_buckets; + bounded_property, numeric_bounds> + cloud_storage_cache_trim_threshold_percent_size; + bounded_property, numeric_bounds> + cloud_storage_cache_trim_threshold_percent_objects; one_or_many_property superusers; diff --git a/src/v/config/property.h b/src/v/config/property.h index 356acd4de1d9f..9a17edceacdc1 100644 --- a/src/v/config/property.h +++ b/src/v/config/property.h @@ -458,6 +458,8 @@ class binding : public binding_base { friend class mock_property; template friend inline binding mock_binding(U&&); + template + friend inline binding mock_binding(U const&); }; /** @@ -474,6 +476,11 @@ inline binding mock_binding(T&& value) { return binding(std::forward(value)); } +template +inline binding mock_binding(T const& value) { + return binding(T(value)); +} + /** * A conversion property binding contains the result of application of * a conversion function to property value. The result is update in-place