From 42935131d75d9c0d9e05b68a0f823fb5b9d0c469 Mon Sep 17 00:00:00 2001 From: Evgeny Lazin <4lazin@gmail.com> Date: Mon, 3 Jun 2024 10:18:55 -0400 Subject: [PATCH 1/3] config: Add new configuration options for cache_service The configuration option enables bucketing of the segment and manifest files using different prefixes. --- src/v/config/configuration.cc | 9 +++++++++ src/v/config/configuration.h | 1 + 2 files changed, 10 insertions(+) diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 0de6b13a781f..f6d33d87d85b 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -2524,6 +2524,15 @@ configuration::configuration() "Number of chunks to prefetch ahead of every downloaded chunk", {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, 0) + , cloud_storage_cache_num_buckets( + *this, + "cloud_storage_cache_num_buckets", + "Divide cloud storage cache across specified number of buckets. This " + "only works for objects with randomized prefixes. The names will not be " + "changed if the value is set to zero.", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + 0, + {.min = 0, .max = 1024}) , superusers( *this, "superusers", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index b4d12071bc91..4ffca420805c 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -451,6 +451,7 @@ struct configuration final : public config_store { enum_property cloud_storage_chunk_eviction_strategy; property cloud_storage_chunk_prefetch; + bounded_property cloud_storage_cache_num_buckets; one_or_many_property superusers; From be39e81945c7169560a29dc61d04b289738437e3 Mon Sep 17 00:00:00 2001 From: Evgeny Lazin <4lazin@gmail.com> Date: Mon, 3 Jun 2024 10:19:55 -0400 Subject: [PATCH 2/3] cloud_storage: Add cloud storage cache flattening The names that start with randomized prefix are now a subject to rehashing procedure. The prefix is replaced with another prefix which is computed from the hash. This new prefix has lower cardinality. During the lookup we will try to probe two paths. The rehashed and original because the cache might store objects from before this feature was enabled. --- src/v/cloud_storage/cache_service.cc | 109 ++++++++++++++++++++++++++- src/v/cloud_storage/cache_service.h | 8 ++ 2 files changed, 116 insertions(+), 1 deletion(-) diff --git a/src/v/cloud_storage/cache_service.cc b/src/v/cloud_storage/cache_service.cc index 27ed0315173a..34e68f5f3399 100644 --- a/src/v/cloud_storage/cache_service.cc +++ b/src/v/cloud_storage/cache_service.cc @@ -954,7 +954,85 @@ ss::future<> cache::stop() { co_await _gate.close(); } +static constexpr uint32_t expected_rand_prefix_length = 8; + +static std::optional +rehash_object_name(const std::filesystem::path& p) { + if (p.empty()) { + return std::nullopt; + } + auto num_buckets + = config::shard_local_cfg().cloud_storage_cache_num_buckets(); + + if (num_buckets == 0) { + return std::nullopt; + } + auto it = p.begin(); + auto prefix = (*it++).native(); + if (prefix.size() != expected_rand_prefix_length) { + // The name doesn't match the pattern + // "8-char-prefix/namespace/topic/...etc" so we shouldn't rehash it. + return std::nullopt; + } + uint64_t hash = 0; + try { + hash = std::stoull(prefix.c_str(), 0, 16); + } catch (std::invalid_argument const&) { + // The first component of the name is not a hex integer + return std::nullopt; + } + auto bucket_ix = fmt::format("{}", hash % num_buckets); + std::filesystem::path result(bucket_ix); + for (; it != p.end(); it++) { + result /= *it; + } + return result; +} + +static std::vector make_candidate_object_names( + const std::filesystem::path& key, const char* operation_name) { + std::vector keys = {key}; + if (config::shard_local_cfg().cloud_storage_cache_num_buckets() > 0) { + // If the config option was enabled and then disabled the objects will + // not be found in the cache and eventually be removed by cache + // eviction. Note that if the feature was disabled and then enabled the + // cache will be able to find both old objects and new objects. But if + // the feature was enabled and then disabled the objects in the cache + // will be inaccessible. They will be evicted and cache will be + // repopulated eventually. + auto rehashed = rehash_object_name(key); + if (rehashed.has_value()) { + vlog( + cst_log.debug, + "{} object name {} converted to {}", + operation_name, + key, + rehashed.value()); + keys.emplace_back(std::move(rehashed.value())); + } + } + return keys; +} + ss::future> cache::get(std::filesystem::path key) { + std::vector keys = make_candidate_object_names( + key, "get"); + std::optional result; + for (auto k : keys) { + result = co_await _get(std::move(k)); + if (result.has_value()) { + break; + } + } + if (result.has_value()) { + probe.cached_get(); + } else { + probe.miss_get(); + } + co_return std::move(result); +} + +ss::future> cache::_get(std::filesystem::path key) { auto guard = _gate.hold(); vlog(cst_log.debug, "Trying to get {} from archival cache.", key.native()); probe.get(); @@ -996,8 +1074,12 @@ ss::future<> cache::put( ss::io_priority_class io_priority, size_t write_buffer_size, unsigned int write_behind) { - auto guard = _gate.hold(); vlog(cst_log.debug, "Trying to put {} to archival cache.", key.native()); + + auto keys = make_candidate_object_names(key, "put"); + key = keys.back(); + + auto guard = _gate.hold(); probe.put(); std::filesystem::path normal_cache_dir = _cache_dir.lexically_normal(); @@ -1122,6 +1204,20 @@ ss::future<> cache::put( ss::future cache::is_cached(const std::filesystem::path& key) { + std::vector keys = make_candidate_object_names( + key, "is_cached"); + auto result = cache_element_status::not_available; + for (auto k : keys) { + result = co_await _is_cached(k); + if (result != cache_element_status::not_available) { + break; + } + } + co_return result; +} + +ss::future +cache::_is_cached(const std::filesystem::path& key) { auto guard = _gate.hold(); vlog(cst_log.debug, "Checking {} in archival cache.", key.native()); if (_files_in_progress.contains(key)) { @@ -1137,6 +1233,17 @@ cache::is_cached(const std::filesystem::path& key) { } ss::future<> cache::invalidate(const std::filesystem::path& key) { + std::vector keys = make_candidate_object_names( + key, "invalidate"); + for (const auto& k : keys) { + // We shouldn't stop invalidating if we actually deleted the file + // because cache may store two files, one with old-style name and + // another one with new-style name. + co_await _invalidate(k); + } +} + +ss::future<> cache::_invalidate(const std::filesystem::path& key) { auto guard = _gate.hold(); vlog( cst_log.debug, diff --git a/src/v/cloud_storage/cache_service.h b/src/v/cloud_storage/cache_service.h index 7a44fdb5f79d..b0f2f45f3d97 100644 --- a/src/v/cloud_storage/cache_service.h +++ b/src/v/cloud_storage/cache_service.h @@ -216,6 +216,14 @@ class cache : public ss::peering_sharded_service { bool trim_missed_tmp_files{false}; }; + ss::future> _get(std::filesystem::path key); + + /// Remove object from cache + ss::future<> _invalidate(const std::filesystem::path& key); + + ss::future + _is_cached(const std::filesystem::path& key); + /// Ordinary trim: prioritze trimming data chunks, only delete indices etc /// if all their chunks are dropped. ss::future trim_fast( From 46609dccb69d315f132fe6d6a99c70b96fe4936a Mon Sep 17 00:00:00 2001 From: Evgeny Lazin <4lazin@gmail.com> Date: Mon, 3 Jun 2024 11:50:27 -0400 Subject: [PATCH 3/3] cloud_storage: Add tests for flattened cache --- src/v/cloud_storage/tests/cache_test.cc | 42 +++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/src/v/cloud_storage/tests/cache_test.cc b/src/v/cloud_storage/tests/cache_test.cc index ca7447589871..ef4c86ff4e5d 100644 --- a/src/v/cloud_storage/tests/cache_test.cc +++ b/src/v/cloud_storage/tests/cache_test.cc @@ -18,6 +18,7 @@ #include "random/generators.h" #include "ssx/sformat.h" #include "test_utils/fixture.h" +#include "test_utils/scoped_config.h" #include "utils/file_io.h" #include "utils/human.h" @@ -31,6 +32,7 @@ #include #include +#include #include using namespace cloud_storage; @@ -621,3 +623,43 @@ FIXTURE_TEST(test_cache_carryover_trim, cache_test_fixture) { BOOST_REQUIRE_EQUAL(after_bytes, 0); BOOST_REQUIRE_EQUAL(after_objects, 0); } + +FIXTURE_TEST(bucketing_works_with_old_objects, cache_test_fixture) { + auto data_string = create_data_string('a', 1_KiB); + const std::filesystem::path key1{"80000001/a/b/c/d/file1.txt"}; + const std::filesystem::path key2{"80000002/a/b/c/d/file2.txt"}; + const std::filesystem::path key2_rehashed{"2/a/b/c/d/file2.txt"}; + put_into_cache(data_string, key1); + + scoped_config cfg; + cfg.get("cloud_storage_cache_num_buckets").set_value((uint32_t)16); + // Now key2 should be mapped to "2/a/b/c/d/file2.txt" + + ss::sleep(1s).get(); + put_into_cache(data_string, key2); + + BOOST_CHECK(ss::file_exists((CACHE_DIR / key1).native()).get()); + BOOST_CHECK(!ss::file_exists((CACHE_DIR / key2).native()).get()); + BOOST_CHECK(ss::file_exists((CACHE_DIR / key2_rehashed).native()).get()); + + // check that GET works + auto r1 = sharded_cache.local().get(key1).get(); + auto r2 = sharded_cache.local().get(key2).get(); + BOOST_CHECK(r1.has_value()); + BOOST_CHECK(r2.has_value()); + + // check is_cached + BOOST_CHECK( + sharded_cache.local().is_cached(key1).get() + == cache_element_status::available); + BOOST_CHECK( + sharded_cache.local().is_cached(key2).get() + == cache_element_status::available); + + // check cache invalidation + sharded_cache.local().invalidate(key1).get(); + sharded_cache.local().invalidate(key2).get(); + BOOST_CHECK(!ss::file_exists((CACHE_DIR / key1).native()).get()); + BOOST_CHECK(!ss::file_exists((CACHE_DIR / key2).native()).get()); + BOOST_CHECK(!ss::file_exists((CACHE_DIR / key2_rehashed).native()).get()); +}