Skip to content

Commit

Permalink
Merge pull request #18762 from Lazin/fix/slow-trim-problem
Browse files Browse the repository at this point in the history
cloud_storage: Rehash segment name prefixes
  • Loading branch information
Lazin authored Jun 5, 2024
2 parents 3355fa7 + 46609dc commit bebd391
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 1 deletion.
109 changes: 108 additions & 1 deletion src/v/cloud_storage/cache_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,85 @@ ss::future<> cache::stop() {
co_await _gate.close();
}

static constexpr uint32_t expected_rand_prefix_length = 8;

static std::optional<std::filesystem::path>
rehash_object_name(const std::filesystem::path& p) {
if (p.empty()) {
return std::nullopt;
}
auto num_buckets
= config::shard_local_cfg().cloud_storage_cache_num_buckets();

if (num_buckets == 0) {
return std::nullopt;
}
auto it = p.begin();
auto prefix = (*it++).native();
if (prefix.size() != expected_rand_prefix_length) {
// The name doesn't match the pattern
// "8-char-prefix/namespace/topic/...etc" so we shouldn't rehash it.
return std::nullopt;
}
uint64_t hash = 0;
try {
hash = std::stoull(prefix.c_str(), 0, 16);
} catch (std::invalid_argument const&) {
// The first component of the name is not a hex integer
return std::nullopt;
}
auto bucket_ix = fmt::format("{}", hash % num_buckets);
std::filesystem::path result(bucket_ix);
for (; it != p.end(); it++) {
result /= *it;
}
return result;
}

static std::vector<std::filesystem::path> make_candidate_object_names(
const std::filesystem::path& key, const char* operation_name) {
std::vector<std::filesystem::path> keys = {key};
if (config::shard_local_cfg().cloud_storage_cache_num_buckets() > 0) {
// If the config option was enabled and then disabled the objects will
// not be found in the cache and eventually be removed by cache
// eviction. Note that if the feature was disabled and then enabled the
// cache will be able to find both old objects and new objects. But if
// the feature was enabled and then disabled the objects in the cache
// will be inaccessible. They will be evicted and cache will be
// repopulated eventually.
auto rehashed = rehash_object_name(key);
if (rehashed.has_value()) {
vlog(
cst_log.debug,
"{} object name {} converted to {}",
operation_name,
key,
rehashed.value());
keys.emplace_back(std::move(rehashed.value()));
}
}
return keys;
}

ss::future<std::optional<cache_item>> cache::get(std::filesystem::path key) {
std::vector<std::filesystem::path> keys = make_candidate_object_names(
key, "get");
std::optional<cache_item> result;
for (auto k : keys) {
result = co_await _get(std::move(k));
if (result.has_value()) {
break;
}
}
if (result.has_value()) {
probe.cached_get();
} else {
probe.miss_get();
}
co_return std::move(result);
}

ss::future<std::optional<cache_item>> cache::_get(std::filesystem::path key) {
auto guard = _gate.hold();
vlog(cst_log.debug, "Trying to get {} from archival cache.", key.native());
probe.get();
Expand Down Expand Up @@ -996,8 +1074,12 @@ ss::future<> cache::put(
ss::io_priority_class io_priority,
size_t write_buffer_size,
unsigned int write_behind) {
auto guard = _gate.hold();
vlog(cst_log.debug, "Trying to put {} to archival cache.", key.native());

auto keys = make_candidate_object_names(key, "put");
key = keys.back();

auto guard = _gate.hold();
probe.put();

std::filesystem::path normal_cache_dir = _cache_dir.lexically_normal();
Expand Down Expand Up @@ -1122,6 +1204,20 @@ ss::future<> cache::put(

ss::future<cache_element_status>
cache::is_cached(const std::filesystem::path& key) {
std::vector<std::filesystem::path> keys = make_candidate_object_names(
key, "is_cached");
auto result = cache_element_status::not_available;
for (auto k : keys) {
result = co_await _is_cached(k);
if (result != cache_element_status::not_available) {
break;
}
}
co_return result;
}

ss::future<cache_element_status>
cache::_is_cached(const std::filesystem::path& key) {
auto guard = _gate.hold();
vlog(cst_log.debug, "Checking {} in archival cache.", key.native());
if (_files_in_progress.contains(key)) {
Expand All @@ -1137,6 +1233,17 @@ cache::is_cached(const std::filesystem::path& key) {
}

ss::future<> cache::invalidate(const std::filesystem::path& key) {
std::vector<std::filesystem::path> keys = make_candidate_object_names(
key, "invalidate");
for (const auto& k : keys) {
// We shouldn't stop invalidating if we actually deleted the file
// because cache may store two files, one with old-style name and
// another one with new-style name.
co_await _invalidate(k);
}
}

ss::future<> cache::_invalidate(const std::filesystem::path& key) {
auto guard = _gate.hold();
vlog(
cst_log.debug,
Expand Down
8 changes: 8 additions & 0 deletions src/v/cloud_storage/cache_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,14 @@ class cache : public ss::peering_sharded_service<cache> {
bool trim_missed_tmp_files{false};
};

ss::future<std::optional<cache_item>> _get(std::filesystem::path key);

/// Remove object from cache
ss::future<> _invalidate(const std::filesystem::path& key);

ss::future<cache_element_status>
_is_cached(const std::filesystem::path& key);

/// Ordinary trim: prioritze trimming data chunks, only delete indices etc
/// if all their chunks are dropped.
ss::future<trim_result> trim_fast(
Expand Down
42 changes: 42 additions & 0 deletions src/v/cloud_storage/tests/cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "random/generators.h"
#include "ssx/sformat.h"
#include "test_utils/fixture.h"
#include "test_utils/scoped_config.h"
#include "utils/file_io.h"
#include "utils/human.h"

Expand All @@ -31,6 +32,7 @@

#include <chrono>
#include <fstream>
#include <optional>
#include <stdexcept>

using namespace cloud_storage;
Expand Down Expand Up @@ -621,3 +623,43 @@ FIXTURE_TEST(test_cache_carryover_trim, cache_test_fixture) {
BOOST_REQUIRE_EQUAL(after_bytes, 0);
BOOST_REQUIRE_EQUAL(after_objects, 0);
}

FIXTURE_TEST(bucketing_works_with_old_objects, cache_test_fixture) {
auto data_string = create_data_string('a', 1_KiB);
const std::filesystem::path key1{"80000001/a/b/c/d/file1.txt"};
const std::filesystem::path key2{"80000002/a/b/c/d/file2.txt"};
const std::filesystem::path key2_rehashed{"2/a/b/c/d/file2.txt"};
put_into_cache(data_string, key1);

scoped_config cfg;
cfg.get("cloud_storage_cache_num_buckets").set_value((uint32_t)16);
// Now key2 should be mapped to "2/a/b/c/d/file2.txt"

ss::sleep(1s).get();
put_into_cache(data_string, key2);

BOOST_CHECK(ss::file_exists((CACHE_DIR / key1).native()).get());
BOOST_CHECK(!ss::file_exists((CACHE_DIR / key2).native()).get());
BOOST_CHECK(ss::file_exists((CACHE_DIR / key2_rehashed).native()).get());

// check that GET works
auto r1 = sharded_cache.local().get(key1).get();
auto r2 = sharded_cache.local().get(key2).get();
BOOST_CHECK(r1.has_value());
BOOST_CHECK(r2.has_value());

// check is_cached
BOOST_CHECK(
sharded_cache.local().is_cached(key1).get()
== cache_element_status::available);
BOOST_CHECK(
sharded_cache.local().is_cached(key2).get()
== cache_element_status::available);

// check cache invalidation
sharded_cache.local().invalidate(key1).get();
sharded_cache.local().invalidate(key2).get();
BOOST_CHECK(!ss::file_exists((CACHE_DIR / key1).native()).get());
BOOST_CHECK(!ss::file_exists((CACHE_DIR / key2).native()).get());
BOOST_CHECK(!ss::file_exists((CACHE_DIR / key2_rehashed).native()).get());
}
9 changes: 9 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2531,6 +2531,15 @@ configuration::configuration()
"Number of chunks to prefetch ahead of every downloaded chunk",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
0)
, cloud_storage_cache_num_buckets(
*this,
"cloud_storage_cache_num_buckets",
"Divide cloud storage cache across specified number of buckets. This "
"only works for objects with randomized prefixes. The names will not be "
"changed if the value is set to zero.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
0,
{.min = 0, .max = 1024})
, superusers(
*this,
"superusers",
Expand Down
1 change: 1 addition & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ struct configuration final : public config_store {
enum_property<model::cloud_storage_chunk_eviction_strategy>
cloud_storage_chunk_eviction_strategy;
property<uint16_t> cloud_storage_chunk_prefetch;
bounded_property<uint32_t> cloud_storage_cache_num_buckets;

one_or_many_property<ss::sstring> superusers;

Expand Down

0 comments on commit bebd391

Please sign in to comment.