From eb194dfcd17c973b90f14abba076ece174ca9fb4 Mon Sep 17 00:00:00 2001 From: Abhijat Malviya Date: Thu, 6 Jun 2024 10:08:16 +0530 Subject: [PATCH 1/5] cst/cache: Add file size to access time tracker * The access time tracker now also tracks file sizes. * The key to the tracker map is now the full file path instead of a hash * The serialized form of tracker now contains a version to distinguish between the old and new data structures. * The recursive directory walker uses tracker entries to avoid file stat operations. (cherry picked from commit d76659a9e2c510a6d70563be9b3e47761daaff11) --- src/v/cloud_storage/access_time_tracker.cc | 116 +++++++++++------- src/v/cloud_storage/access_time_tracker.h | 58 +++++---- src/v/cloud_storage/cache_service.cc | 32 ++--- .../recursive_directory_walker.cc | 23 ++-- src/v/cloud_storage/tests/cache_bench.cc | 2 +- src/v/cloud_storage/tests/cache_test.cc | 39 ++++-- 6 files changed, 160 insertions(+), 110 deletions(-) diff --git a/src/v/cloud_storage/access_time_tracker.cc b/src/v/cloud_storage/access_time_tracker.cc index 52829214f629d..6f78b1b7c79dc 100644 --- a/src/v/cloud_storage/access_time_tracker.cc +++ b/src/v/cloud_storage/access_time_tracker.cc @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -51,20 +52,22 @@ namespace cloud_storage { // is defined by hand in the read/write methods so that it can be done // with streaming. struct table_header - : serde::envelope, serde::compat_version<0>> { + : serde::envelope, serde::compat_version<0>> { size_t table_size{0}; + tracker_version version{tracker_version::v1}; - auto serde_fields() { return std::tie(table_size); } + auto serde_fields() { return std::tie(table_size, version); } }; -ss::future<> access_time_tracker::write(ss::output_stream& out) { +ss::future<> access_time_tracker::write( + ss::output_stream& out, tracker_version version) { // This lock protects us from the _table being mutated while we // are iterating over it and yielding during the loop. auto lock_guard = co_await ss::get_units(_table_lock, 1); _dirty = false; - const table_header h{.table_size = _table.size()}; + const table_header h{.table_size = _table.size(), .version = version}; iobuf header_buf; serde::write(header_buf, h); co_await write_iobuf_to_output_stream(std::move(header_buf), out); @@ -74,11 +77,15 @@ ss::future<> access_time_tracker::write(ss::output_stream& out) { size_t i = 0; iobuf serialize_buf; - for (auto it : _table) { - serde::write(serialize_buf, it.first); - serde::write(serialize_buf, it.second); + for (const auto& [path, metadata] : _table) { + serde::write(serialize_buf, path); + serde::write(serialize_buf, metadata.atime_sec); + serde::write(serialize_buf, metadata.size); ++i; if (i % chunk_count == 0 || i == _table.size()) { + iobuf chunk_size; + serde::write(chunk_size, serialize_buf.size_bytes()); + co_await write_iobuf_to_output_stream(std::move(chunk_size), out); for (const auto& f : serialize_buf) { co_await out.write(f.get(), f.size()); } @@ -139,64 +146,81 @@ ss::future<> access_time_tracker::read(ss::input_stream& in) { auto tmp = co_await in.read_exactly(header_size); header_buf.append(tmp.get(), tmp.size()); auto h_parser = iobuf_parser(std::move(header_buf)); - table_header h = serde::read_nested(h_parser, 0); + auto h = serde::read_nested(h_parser, 0); - // How many items to consume per stream read() - constexpr size_t chunk_count = 2048; + auto defer = ss::defer([&] { + lock_guard.return_all(); + // Drop writes accumulated while reading + _pending_upserts.clear(); + }); + + // Skip loading data for older version + if (h.version == tracker_version::v1) { + co_return; + } - for (size_t i = 0; i < h.table_size; i += chunk_count) { - auto item_count = std::min(chunk_count, h.table_size - i); - auto tmp_buf = co_await in.read_exactly(item_count * table_item_size); + while (!in.eof()) { + auto chunk_sz_buf = co_await in.read_exactly(sizeof(size_t)); + if (chunk_sz_buf.empty() && in.eof()) { + break; + } + + iobuf chunk_sz; + chunk_sz.append(std::move(chunk_sz_buf)); + auto chunk_sz_parser = iobuf_parser{std::move(chunk_sz)}; + auto chunk_size = serde::read(chunk_sz_parser); + auto tmp_buf = co_await in.read_exactly(chunk_size); iobuf items_buf; items_buf.append(std::move(tmp_buf)); auto parser = iobuf_parser(std::move(items_buf)); - for (size_t j = 0; j < item_count; ++j) { - uint32_t hash = serde::read_nested(parser, 0); - timestamp_t t = serde::read_nested(parser, 0); - _table.emplace(hash, t); + while (parser.bytes_left() > 0) { + auto path = serde::read_nested(parser, 0); + auto atime = serde::read_nested(parser, 0); + auto size = serde::read_nested(parser, 0); + _table.emplace( + path, file_metadata{.atime_sec = atime, .size = size}); } } - lock_guard.return_all(); - // Any writes while we were reading are dropped - _pending_upserts.clear(); + vassert( + _table.size() == h.table_size, + "unexpected tracker size, loaded {} items, expected {} items", + _table.size(), + h.table_size); } -void access_time_tracker::add_timestamp( - std::string_view key, std::chrono::system_clock::time_point ts) { - if (!should_track(key)) { +void access_time_tracker::add( + ss::sstring path, std::chrono::system_clock::time_point atime, size_t size) { + if (!should_track(path)) { return; } - uint32_t seconds = std::chrono::time_point_cast(ts) + uint32_t seconds = std::chrono::time_point_cast(atime) .time_since_epoch() .count(); - uint32_t hash = xxhash_32(key.data(), key.size()); - auto units = seastar::try_get_units(_table_lock, 1); if (units.has_value()) { // Got lock, update main table - _table[hash] = seconds; + _table[path] = {.atime_sec = seconds, .size = size}; _dirty = true; } else { // Locked during serialization, defer write - _pending_upserts[hash] = seconds; + _pending_upserts[path] = {.atime_sec = seconds, .size = size}; } } -void access_time_tracker::remove_timestamp(std::string_view key) noexcept { +void access_time_tracker::remove(std::string_view key) noexcept { try { - uint32_t hash = xxhash_32(key.data(), key.size()); - + ss::sstring k{key.data(), key.size()}; auto units = seastar::try_get_units(_table_lock, 1); if (units.has_value()) { // Unlocked, update main table - _table.erase(hash); + _table.erase(k); _dirty = true; } else { // Locked during serialization, defer write - _pending_upserts[hash] = std::nullopt; + _pending_upserts[k] = std::nullopt; } } catch (...) { vassert( @@ -209,15 +233,15 @@ void access_time_tracker::remove_timestamp(std::string_view key) noexcept { ss::future<> access_time_tracker::trim(const fragmented_vector& existent) { - absl::btree_set existent_hashes; + absl::btree_set existent_hashes; for (const auto& i : existent) { - existent_hashes.insert(xxhash_32(i.path.data(), i.path.size())); + existent_hashes.insert(i.path); } auto lock_guard = co_await ss::get_units(_table_lock, 1); table_t tmp; - for (auto it : _table) { + for (const auto& it : _table) { if (existent_hashes.contains(it.first)) { tmp.insert(it); } @@ -233,18 +257,20 @@ access_time_tracker::trim(const fragmented_vector& existent) { on_released_table_lock(); } -std::optional -access_time_tracker::estimate_timestamp(std::string_view key) const { - uint32_t hash = xxhash_32(key.data(), key.size()); - auto it = _table.find(hash); - if (it == _table.end()) { - return std::nullopt; +std::optional +access_time_tracker::get(const std::string& key) const { + if (auto it = _table.find(key); it != _table.end()) { + return it->second; } - auto seconds = std::chrono::seconds(it->second); - std::chrono::system_clock::time_point ts(seconds); - return ts; + return std::nullopt; } bool access_time_tracker::is_dirty() const { return _dirty; } + +std::chrono::system_clock::time_point file_metadata::time_point() const { + return std::chrono::system_clock::time_point{ + std::chrono::seconds{atime_sec}}; +} + } // namespace cloud_storage diff --git a/src/v/cloud_storage/access_time_tracker.h b/src/v/cloud_storage/access_time_tracker.h index 2680b6004d0e6..aa545c5032b75 100644 --- a/src/v/cloud_storage/access_time_tracker.h +++ b/src/v/cloud_storage/access_time_tracker.h @@ -27,50 +27,48 @@ namespace cloud_storage { -/// Access time tracker maintains map from filename hash to -/// the timestamp that represents the time when the file was -/// accessed last. -/// -/// It is possible to have conflicts. In case of conflict -/// 'add_timestamp' method will overwrite another key. For that -/// key we will observe larger access time. When one of the -/// conflicted entries will be deleted another will be deleted -/// as well. This is OK because the code in the -/// 'cloud_storage/cache_service' is ready for that. +enum class tracker_version : uint8_t { v1, v2 }; + +struct file_metadata { + uint32_t atime_sec; + uint64_t size; + std::chrono::system_clock::time_point time_point() const; +}; + +/// Access time tracker maps cache entry file paths to their last accessed +/// timestamp and file size. class access_time_tracker { using timestamp_t = uint32_t; - using table_t = absl::btree_map; - - // Serialized size of each pair in table_t - static constexpr size_t table_item_size = 8; + using table_t = absl::btree_map; public: - /// Add access time to the container. - void add_timestamp( - std::string_view key, std::chrono::system_clock::time_point ts); + /// Add metadata to the container. + void add( + ss::sstring path, + std::chrono::system_clock::time_point atime, + size_t size); /// Remove key from the container. - void remove_timestamp(std::string_view) noexcept; + void remove(std::string_view) noexcept; - /// Return access time estimate (it can differ if there is a conflict - /// on file name hash). - std::optional - estimate_timestamp(std::string_view key) const; + /// Return file metadata for key. + std::optional get(const std::string& key) const; - ss::future<> write(ss::output_stream&); + ss::future<> write( + ss::output_stream&, tracker_version version = tracker_version::v2); ss::future<> read(ss::input_stream&); /// Returns true if tracker has new data which wasn't serialized /// to disk. bool is_dirty() const; - /// Remove every key which isn't present in list of existing files + /// Remove every key which isn't present in list of input files ss::future<> trim(const fragmented_vector&); size_t size() const { return _table.size(); } private: - /// Returns true if the key's access time should be tracked. + /// Returns true if the key's metadata should be tracked. /// We do not wish to track index files and transaction manifests /// as they are just an appendage to segment/chunk files and are /// purged along with them. @@ -79,17 +77,17 @@ class access_time_tracker { /// Drain _pending_upserts for any writes made while table lock was held void on_released_table_lock(); - absl::btree_map _table; + table_t _table; // Lock taken during async loops over the table (ser/de and trim()) // modifications may proceed without the lock if it is not taken. // When releasing lock, drain _pending_upserts. ss::semaphore _table_lock{1}; - // Calls into add_timestamp/remove_timestamp populate this - // if the _serialization_lock is unavailable. The serialization code is - // responsible for draining it upon releasing the lock. - absl::btree_map> _pending_upserts; + // Calls into add/remove populate this if the _serialization_lock is + // unavailable. The serialization code is responsible for draining it upon + // releasing the lock. + absl::btree_map> _pending_upserts; bool _dirty{false}; }; diff --git a/src/v/cloud_storage/cache_service.cc b/src/v/cloud_storage/cache_service.cc index 9ade85b43b28b..09c162b9379de 100644 --- a/src/v/cloud_storage/cache_service.cc +++ b/src/v/cloud_storage/cache_service.cc @@ -626,7 +626,7 @@ cache::remove_segment_full(const file_list_item& file_stat) { // Remove key if possible to make sure there is no resource // leak - _access_time_tracker.remove_timestamp(std::string_view(file_stat.path)); + _access_time_tracker.remove(file_stat.path); vlog( cst_log.trace, @@ -772,8 +772,7 @@ cache::trim_exhaustive(uint64_t size_to_delete, size_t objects_to_delete) { // exhaustive trim because they are occupying too much space. try { co_await delete_file_and_empty_parents(file_stat.path); - _access_time_tracker.remove_timestamp( - std::string_view(file_stat.path)); + _access_time_tracker.remove(file_stat.path); _current_cache_size -= std::min( file_stat.size, _current_cache_size); @@ -1052,19 +1051,22 @@ ss::future> cache::_get(std::filesystem::path key) { vlog(cst_log.debug, "Trying to get {} from archival cache.", key.native()); probe.get(); ss::file cache_file; + + size_t data_size{0}; try { auto source = (_cache_dir / key).native(); cache_file = co_await ss::open_file_dma(source, ss::open_flags::ro); + data_size = co_await cache_file.size(); // Bump access time of the file if (ss::this_shard_id() == 0) { - _access_time_tracker.add_timestamp( - source, std::chrono::system_clock::now()); + _access_time_tracker.add( + source, std::chrono::system_clock::now(), data_size); } else { - ssx::spawn_with_gate(_gate, [this, source] { - return container().invoke_on(0, [source](cache& c) { - c._access_time_tracker.add_timestamp( - source, std::chrono::system_clock::now()); + ssx::spawn_with_gate(_gate, [this, source, data_size] { + return container().invoke_on(0, [source, data_size](cache& c) { + c._access_time_tracker.add( + source, std::chrono::system_clock::now(), data_size); }); }); } @@ -1077,7 +1079,6 @@ ss::future> cache::_get(std::filesystem::path key) { } } - auto data_size = co_await cache_file.size(); probe.cached_get(); co_return std::optional(cache_item{std::move(cache_file), data_size}); } @@ -1264,7 +1265,7 @@ ss::future<> cache::_invalidate(const std::filesystem::path& key) { try { auto path = (_cache_dir / key).native(); auto stat = co_await ss::file_stat(path); - _access_time_tracker.remove_timestamp(key.native()); + _access_time_tracker.remove(key.native()); co_await delete_file_and_empty_parents(path); _current_cache_size -= stat.size; _current_cache_objects -= 1; @@ -1472,15 +1473,16 @@ cache::trim_carryover(uint64_t delete_bytes, uint64_t delete_objects) { auto rel_path = _cache_dir / std::filesystem::relative( std::filesystem::path(file_stat.path), _cache_dir); - auto estimate = _access_time_tracker.estimate_timestamp( - rel_path.native()); - if (estimate != file_stat.access_time) { + + if (auto estimate = _access_time_tracker.get(rel_path.native()); + estimate.has_value() + && estimate->time_point() != file_stat.access_time) { vlog( cst_log.trace, "carryover file {} was accessed ({}) since the last trim ({}), " "ignoring", rel_path.native(), - estimate->time_since_epoch().count(), + estimate->atime_sec, file_stat.access_time.time_since_epoch().count()); // The file was accessed since we get the stats continue; diff --git a/src/v/cloud_storage/recursive_directory_walker.cc b/src/v/cloud_storage/recursive_directory_walker.cc index e6c3cc5f57e14..91ad90dd7d720 100644 --- a/src/v/cloud_storage/recursive_directory_walker.cc +++ b/src/v/cloud_storage/recursive_directory_walker.cc @@ -43,24 +43,31 @@ struct walk_accumulator { ss::future<> visit(ss::sstring const& target, ss::directory_entry entry) { auto entry_path = fmt::format("{}/{}", target, entry.name); if (entry.type && entry.type == ss::directory_entry_type::regular) { - auto file_stats = co_await ss::file_stat(entry_path); + size_t file_size{0}; + std::chrono::system_clock::time_point atime; + if (const auto tracker_entry = tracker.get(entry_path); + tracker_entry.has_value()) { + file_size = tracker_entry->size; + atime = tracker_entry->time_point(); + } else { + auto file_stats = co_await ss::file_stat(entry_path); + file_size = file_stats.size; + atime = file_stats.time_accessed; + } vlog( cst_log.debug, "Regular file found {} ({})", entry_path, - file_stats.size); - - auto last_access_timepoint = tracker.estimate_timestamp(entry_path) - .value_or(file_stats.time_accessed); + file_size); - current_cache_size += static_cast(file_stats.size); + current_cache_size += static_cast(file_size); if (!filter || filter.value()(entry_path)) { files.push_back( - {last_access_timepoint, + {atime, (std::filesystem::path(target) / entry.name.data()).native(), - static_cast(file_stats.size)}); + static_cast(file_size)}); } else if (filter) { ++filtered_out_files; } diff --git a/src/v/cloud_storage/tests/cache_bench.cc b/src/v/cloud_storage/tests/cache_bench.cc index f16a48731470f..70e5c176b5199 100644 --- a/src/v/cloud_storage/tests/cache_bench.cc +++ b/src/v/cloud_storage/tests/cache_bench.cc @@ -28,7 +28,7 @@ static void run_test(int test_scale) { for (int i = 0; i < test_scale; i++) { perf_tests::start_measuring_time(); - tracker.add_timestamp(names[i % test_scale], make_ts(i)); + tracker.add(names[i % test_scale], make_ts(i), 0); perf_tests::stop_measuring_time(); } } diff --git a/src/v/cloud_storage/tests/cache_test.cc b/src/v/cloud_storage/tests/cache_test.cc index 8c2d5b21fbd64..b171ce05224db 100644 --- a/src/v/cloud_storage/tests/cache_test.cc +++ b/src/v/cloud_storage/tests/cache_test.cc @@ -332,7 +332,7 @@ SEASTAR_THREAD_TEST_CASE(test_access_time_tracker) { make_ts(1653000009), }; - std::vector names = { + std::vector names = { "key0", "key1", "key2", @@ -346,20 +346,23 @@ SEASTAR_THREAD_TEST_CASE(test_access_time_tracker) { }; for (int i = 0; i < 10; i++) { - cm.add_timestamp(names[i], timestamps[i]); + cm.add(names[i], timestamps[i], i); } for (int i = 0; i < 10; i++) { - auto ts = cm.estimate_timestamp(names[i]); - BOOST_REQUIRE(ts.value() >= timestamps[i]); + auto ts = cm.get(names[i]); + BOOST_REQUIRE(ts.has_value()); + BOOST_REQUIRE(ts->time_point() == timestamps[i]); + BOOST_REQUIRE(ts->size == i); } } -static access_time_tracker serde_roundtrip(access_time_tracker& t) { +static access_time_tracker serde_roundtrip( + access_time_tracker& t, tracker_version version = tracker_version::v2) { // Round trip iobuf serialized; auto out_stream = make_iobuf_ref_output_stream(serialized); - t.write(out_stream).get(); + t.write(out_stream, version).get(); out_stream.flush().get(); access_time_tracker out; @@ -393,7 +396,7 @@ SEASTAR_THREAD_TEST_CASE(test_access_time_tracker_serializer) { make_ts(0xbeefed09), }; - std::vector names = { + std::vector names = { "key0", "key1", "key2", @@ -407,15 +410,16 @@ SEASTAR_THREAD_TEST_CASE(test_access_time_tracker_serializer) { }; for (int i = 0; i < 10; i++) { - in.add_timestamp(names[i], timestamps[i]); + in.add(names[i], timestamps[i], i); } auto out = serde_roundtrip(in); for (int i = 0; i < timestamps.size(); i++) { - auto ts = out.estimate_timestamp(names[i]); + auto ts = out.get(names[i]); BOOST_REQUIRE(ts.has_value()); - BOOST_REQUIRE(ts.value() >= timestamps[i]); + BOOST_REQUIRE(ts->time_point() == timestamps[i]); + BOOST_REQUIRE(ts->size == i); } } @@ -426,11 +430,24 @@ SEASTAR_THREAD_TEST_CASE(test_access_time_tracker_serializer_large) { // to verify the chunking code works properly; uint32_t item_count = 7777; for (uint32_t i = 0; i < item_count; i++) { - in.add_timestamp(fmt::format("key{:08x}", i), make_ts(i)); + in.add(fmt::format("key{:08x}", i), make_ts(i), i); } auto out = serde_roundtrip(in); BOOST_REQUIRE_EQUAL(out.size(), item_count); + for (size_t i = 0; i < item_count; ++i) { + const auto entry = in.get(fmt::format("key{:08x}", i)); + BOOST_REQUIRE(entry.has_value()); + BOOST_REQUIRE_EQUAL(entry->size, i); + BOOST_REQUIRE_EQUAL(entry->atime_sec, i); + } +} + +SEASTAR_THREAD_TEST_CASE(test_access_time_tracker_read_skipped_on_old_version) { + access_time_tracker in; + in.add("key", make_ts(0), 0); + auto out = serde_roundtrip(in, tracker_version::v1); + BOOST_REQUIRE_EQUAL(out.size(), 0); } /** From 19de647cba2ff384ad1773ee5671a43660f700e0 Mon Sep 17 00:00:00 2001 From: Abhijat Malviya Date: Thu, 6 Jun 2024 10:13:56 +0530 Subject: [PATCH 2/5] cst/cache: Use access tracker for trim When trimming a first attempt is made to get LRU entries from the access tracker. If this does not free up enough space, then we proceed with the existing trim based on directory walk. (cherry picked from commit 63afff2740c45d93ed20bcddaa05e2bb1e1dc1b2) --- src/v/cloud_storage/access_time_tracker.cc | 10 ++ src/v/cloud_storage/access_time_tracker.h | 2 + src/v/cloud_storage/cache_service.cc | 143 ++++++++++++++------- src/v/cloud_storage/cache_service.h | 5 + 4 files changed, 112 insertions(+), 48 deletions(-) diff --git a/src/v/cloud_storage/access_time_tracker.cc b/src/v/cloud_storage/access_time_tracker.cc index 6f78b1b7c79dc..a25774f0bf995 100644 --- a/src/v/cloud_storage/access_time_tracker.cc +++ b/src/v/cloud_storage/access_time_tracker.cc @@ -267,6 +267,16 @@ access_time_tracker::get(const std::string& key) const { bool access_time_tracker::is_dirty() const { return _dirty; } +fragmented_vector access_time_tracker::lru_entries() const { + fragmented_vector items; + items.reserve(_table.size()); + for (const auto& [path, metadata] : _table) { + items.emplace_back(metadata.time_point(), path, metadata.size); + } + std::ranges::sort( + items, {}, [](const auto& item) { return item.access_time; }); + return items; +} std::chrono::system_clock::time_point file_metadata::time_point() const { return std::chrono::system_clock::time_point{ diff --git a/src/v/cloud_storage/access_time_tracker.h b/src/v/cloud_storage/access_time_tracker.h index aa545c5032b75..ff95441cc14ed 100644 --- a/src/v/cloud_storage/access_time_tracker.h +++ b/src/v/cloud_storage/access_time_tracker.h @@ -67,6 +67,8 @@ class access_time_tracker { size_t size() const { return _table.size(); } + fragmented_vector lru_entries() const; + private: /// Returns true if the key's metadata should be tracked. /// We do not wish to track index files and transaction manifests diff --git a/src/v/cloud_storage/cache_service.cc b/src/v/cloud_storage/cache_service.cc index 09c162b9379de..73fb8367ab5ef 100644 --- a/src/v/cloud_storage/cache_service.cc +++ b/src/v/cloud_storage/cache_service.cc @@ -315,20 +315,6 @@ ss::future<> cache::trim( std::optional object_limit_override) { vassert(ss::this_shard_id() == 0, "Method can only be invoked on shard 0"); auto guard = _gate.hold(); - auto [walked_cache_size, filtered_out_files, candidates_for_deletion, _] - = co_await _walker.walk( - _cache_dir.native(), - _access_time_tracker, - _walk_concurrency(), - [](std::string_view path) { - return !( - std::string_view(path).ends_with(".tx") - || std::string_view(path).ends_with(".index")); - }); - - // Updating the access time tracker in case if some files were removed - // from cache directory by the user manually. - co_await _access_time_tracker.trim(candidates_for_deletion); auto size_limit = size_limit_override.value_or(_max_bytes); auto object_limit = object_limit_override.value_or(_max_objects()); @@ -368,6 +354,83 @@ ss::future<> cache::trim( target_size); } + if ( + _current_cache_size + _reserved_cache_size < target_size + && _current_cache_objects + _reserved_cache_objects < target_objects) { + // Exit early if we are already within the target + co_return; + } + + // Calculate how much to delete + auto size_to_delete + = (_current_cache_size + _reserved_cache_size) + - std::min(target_size, _current_cache_size + _reserved_cache_size); + auto objects_to_delete + = _current_cache_objects + _reserved_cache_objects + - std::min( + target_objects, _current_cache_objects + _reserved_cache_objects); + + auto tracker_lru_entries = _access_time_tracker.lru_entries(); + vlog( + cst_log.debug, + "in-memory trim: set target_size {}/{}, size {}/{}, reserved {}/{}, " + "pending {}/{}), candidates for deletion: {}, size to delete: {}, " + "objects to delete: {}", + target_size, + target_objects, + _current_cache_size, + _current_cache_objects, + _reserved_cache_size, + _reserved_cache_objects, + _reservations_pending, + _reservations_pending_objects, + tracker_lru_entries.size()); + + auto trim_result = co_await do_trim( + tracker_lru_entries, size_to_delete, objects_to_delete); + + vlog( + cst_log.debug, + "in-memory trim result: deleted size: {}, deleted count: {}", + trim_result.deleted_size, + trim_result.deleted_count); + + _total_cleaned += trim_result.deleted_size; + probe.set_size(_current_cache_size); + probe.set_num_files(_current_cache_objects); + + size_to_delete -= std::min(trim_result.deleted_size, size_to_delete); + objects_to_delete -= std::min(trim_result.deleted_count, objects_to_delete); + + // Subsequent calculations require knowledge of how much data cannot + // possibly be deleted (because all trims skip it) in order to decide + // whether the trim worked properly. + static constexpr size_t undeletable_objects = 1; + auto undeletable_bytes = (co_await access_time_tracker_size()).value_or(0); + + if ( + size_to_delete < undeletable_bytes + && objects_to_delete < undeletable_objects) { + _last_clean_up = ss::lowres_clock::now(); + _last_trim_failed = false; + co_return; + } + + auto [walked_cache_size, filtered_out_files, candidates_for_deletion, _] + = co_await _walker.walk( + _cache_dir.native(), + _access_time_tracker, + _walk_concurrency(), + [](std::string_view path) { + return !( + std::string_view(path).ends_with(".tx") + || std::string_view(path).ends_with(".index")); + }); + + // Updating the access time tracker in case if some files were removed + // from cache directory by the user manually. + co_await _access_time_tracker.trim(candidates_for_deletion); + // Calculate total space used by tmp files: we will use this later // when updating current_cache_size. uint64_t tmp_files_size{0}; @@ -396,28 +459,12 @@ ss::future<> cache::trim( candidates_for_deletion.size(), filtered_out_files); - if ( - _current_cache_size + _reserved_cache_size < target_size - && _current_cache_objects + _reserved_cache_objects < target_objects) { - // Exit early if we are already within the target - co_return; - } - // Sort by atime for the subsequent LRU trimming loop std::sort( candidates_for_deletion.begin(), candidates_for_deletion.end(), [](auto& a, auto& b) { return a.access_time < b.access_time; }); - // Calculate how much to delete - auto size_to_delete - = (_current_cache_size + _reserved_cache_size) - - std::min(target_size, _current_cache_size + _reserved_cache_size); - auto objects_to_delete - = _current_cache_objects + _reserved_cache_objects - - std::min( - target_objects, _current_cache_objects + _reserved_cache_objects); - vlog( cst_log.debug, "trim: removing {}/{} bytes, {}/{} objects ({}% of cache) to reach " @@ -432,15 +479,9 @@ ss::future<> cache::trim( tmp_files_size); // Execute the ordinary trim, prioritize removing - auto fast_result = co_await trim_fast( + trim_result = co_await trim_fast( candidates_for_deletion, size_to_delete, objects_to_delete); - // Subsequent calculations require knowledge of how much data cannot - // possibly be deleted (because all trims skip it) in order to decide - // whether the trim worked properly. - static constexpr size_t undeletable_objects = 1; - auto undeletable_bytes = (co_await access_time_tracker_size()).value_or(0); - // We aim to keep current_cache_size continuously up to date, but // in case of housekeeping issues, correct it if it apepars to have // drifted too far from the result of our directory walk. @@ -448,7 +489,7 @@ ss::future<> cache::trim( // by the amount of data currently in tmp files, because they may be // updated while the walk is happening. uint64_t cache_size_lower_bound = walked_cache_size - - fast_result.deleted_size + - trim_result.deleted_size - tmp_files_size - undeletable_bytes; if (_current_cache_size < cache_size_lower_bound) { vlog( @@ -459,7 +500,7 @@ ss::future<> cache::trim( _current_cache_size = cache_size_lower_bound; _current_cache_objects = filtered_out_files + candidates_for_deletion.size() - - fast_result.deleted_count; + - trim_result.deleted_count; } const auto cache_entries_before_trim = candidates_for_deletion.size() @@ -468,29 +509,29 @@ ss::future<> cache::trim( vlog( cst_log.debug, "trim: deleted {}/{} files of total size {}. Undeletable size {}.", - fast_result.deleted_count, + trim_result.deleted_count, cache_entries_before_trim, - fast_result.deleted_size, + trim_result.deleted_size, undeletable_bytes); - _total_cleaned += fast_result.deleted_size; + _total_cleaned += trim_result.deleted_size; probe.set_size(_current_cache_size); - probe.set_num_files(cache_entries_before_trim - fast_result.deleted_count); + probe.set_num_files(cache_entries_before_trim - trim_result.deleted_count); - size_to_delete -= std::min(fast_result.deleted_size, size_to_delete); - objects_to_delete -= std::min(fast_result.deleted_count, objects_to_delete); + size_to_delete -= std::min(trim_result.deleted_size, size_to_delete); + objects_to_delete -= std::min(trim_result.deleted_count, objects_to_delete); // Before we (maybe) proceed to do an exhaustive trim, make sure we're not // trying to trim more data than was physically seen while walking the // cache. size_to_delete = std::min( - walked_cache_size - fast_result.deleted_size, size_to_delete); + walked_cache_size - trim_result.deleted_size, size_to_delete); // If we were not able to delete enough files and there are some filtered // out files, force an exhaustive trim. This ensures that if the cache is // dominated by filtered out files, we do not skip trimming them by reducing // the objects_to_delete counter next. - bool force_exhaustive_trim = fast_result.deleted_count < objects_to_delete + bool force_exhaustive_trim = trim_result.deleted_count < objects_to_delete && filtered_out_files > 0; // In the situation where all files in cache are filtered out, @@ -500,7 +541,7 @@ ss::future<> cache::trim( // force_exhaustive_trim avoids this. if (!force_exhaustive_trim) { objects_to_delete = std::min( - candidates_for_deletion.size() - fast_result.deleted_count, + candidates_for_deletion.size() - trim_result.deleted_count, objects_to_delete); } @@ -651,7 +692,13 @@ ss::future cache::trim_fast( uint64_t size_to_delete, size_t objects_to_delete) { probe.fast_trim(); + co_return co_await do_trim(candidates, size_to_delete, objects_to_delete); +} +ss::future cache::do_trim( + const fragmented_vector& candidates, + uint64_t size_to_delete, + size_t objects_to_delete) { trim_result result; // Reset carryover list diff --git a/src/v/cloud_storage/cache_service.h b/src/v/cloud_storage/cache_service.h index aa12d45d8aad3..9827f056d6116 100644 --- a/src/v/cloud_storage/cache_service.h +++ b/src/v/cloud_storage/cache_service.h @@ -235,6 +235,11 @@ class cache : public ss::peering_sharded_service { uint64_t delete_bytes, size_t delete_objects); + ss::future do_trim( + const fragmented_vector& candidates, + uint64_t delete_bytes, + size_t delete_objects); + /// Exhaustive trim: walk all files including indices, remove whatever is /// least recently accessed. ss::future From d853f39977ea9272b4345384f47080290b359757 Mon Sep 17 00:00:00 2001 From: Abhijat Malviya Date: Fri, 7 Jun 2024 09:23:13 +0530 Subject: [PATCH 3/5] cst/cache: Add metric for in-memory trims (cherry picked from commit e256efb83313e0774862996e45e11ea15fa81e3e) --- src/v/cloud_storage/cache_probe.cc | 6 ++++++ src/v/cloud_storage/cache_probe.h | 2 ++ src/v/cloud_storage/cache_service.cc | 1 + 3 files changed, 9 insertions(+) diff --git a/src/v/cloud_storage/cache_probe.cc b/src/v/cloud_storage/cache_probe.cc index 5bd06b72fb7c2..5504af4dc96d1 100644 --- a/src/v/cloud_storage/cache_probe.cc +++ b/src/v/cloud_storage/cache_probe.cc @@ -114,6 +114,12 @@ cache_probe::cache_probe() { "Number of times could not free the expected amount of " "space, indicating possible bug or configuration issue.")) .aggregate(aggregate_labels), + sm::make_counter( + "in_mem_trims", + [this] { return _in_mem_trims; }, + sm::description("Number of times we trimmed the cache using " + "the in-memory access tracker.")) + .aggregate(aggregate_labels), }); } diff --git a/src/v/cloud_storage/cache_probe.h b/src/v/cloud_storage/cache_probe.h index 791d211b5f031..2eb1238c59101 100644 --- a/src/v/cloud_storage/cache_probe.h +++ b/src/v/cloud_storage/cache_probe.h @@ -41,6 +41,7 @@ class cache_probe { void exhaustive_trim() { ++_exhaustive_trims; } void carryover_trim() { ++_carryover_trims; } void failed_trim() { ++_failed_trims; } + void in_mem_trim() { ++_in_mem_trims; } private: uint64_t _num_puts = 0; @@ -58,6 +59,7 @@ class cache_probe { int64_t _exhaustive_trims{0}; int64_t _carryover_trims{0}; int64_t _failed_trims{0}; + int64_t _in_mem_trims{0}; metrics::internal_metric_groups _metrics; metrics::public_metric_groups _public_metrics; diff --git a/src/v/cloud_storage/cache_service.cc b/src/v/cloud_storage/cache_service.cc index 73fb8367ab5ef..f3b8a532e242b 100644 --- a/src/v/cloud_storage/cache_service.cc +++ b/src/v/cloud_storage/cache_service.cc @@ -389,6 +389,7 @@ ss::future<> cache::trim( auto trim_result = co_await do_trim( tracker_lru_entries, size_to_delete, objects_to_delete); + probe.in_mem_trim(); vlog( cst_log.debug, "in-memory trim result: deleted size: {}, deleted count: {}", From 4ad8c1d935a03a962ed8130a4ce8f1b18ab98ace Mon Sep 17 00:00:00 2001 From: Abhijat Malviya Date: Thu, 6 Jun 2024 10:11:32 +0530 Subject: [PATCH 4/5] cst/cache: Sync tracker periodically The tracker is synced periodically with on-disk data using a directory walk. This is necessary because once we switch to trimming using the tracker, the data in tracker and on-disk may drift because of tmp files, aborted downloads or manual deletion of files. A periodic sync brings the tracker in sync with the metadata on disk. (cherry picked from commit ee90a7b52c396b07bdb7dea2dc783020f42ed9a4) --- src/v/cloud_storage/access_time_tracker.cc | 35 ++++- src/v/cloud_storage/access_time_tracker.h | 5 +- src/v/cloud_storage/cache_probe.cc | 13 +- src/v/cloud_storage/cache_probe.h | 7 + src/v/cloud_storage/cache_service.cc | 133 +++++++++++++++--- src/v/cloud_storage/cache_service.h | 7 + .../recursive_directory_walker.cc | 7 +- .../recursive_directory_walker.h | 3 + src/v/cloud_storage/tests/cache_test.cc | 49 +++++++ .../cloud_storage/tests/cache_test_fixture.h | 11 ++ 10 files changed, 238 insertions(+), 32 deletions(-) diff --git a/src/v/cloud_storage/access_time_tracker.cc b/src/v/cloud_storage/access_time_tracker.cc index a25774f0bf995..358050375e5ba 100644 --- a/src/v/cloud_storage/access_time_tracker.cc +++ b/src/v/cloud_storage/access_time_tracker.cc @@ -22,6 +22,7 @@ #include #include +#include #include namespace absl { @@ -98,7 +99,9 @@ ss::future<> access_time_tracker::write( } bool access_time_tracker::should_track(std::string_view key) const { - if (key.ends_with(".tx") || key.ends_with(".index")) { + if ( + key.ends_with(".tx") || key.ends_with(".index") + || key.ends_with(cache_tmp_file_extension)) { return false; } @@ -231,22 +234,42 @@ void access_time_tracker::remove(std::string_view key) noexcept { } } -ss::future<> -access_time_tracker::trim(const fragmented_vector& existent) { - absl::btree_set existent_hashes; +ss::future<> access_time_tracker::sync( + const fragmented_vector& existent, + add_entries_t add_entries) { + absl::btree_set paths; for (const auto& i : existent) { - existent_hashes.insert(i.path); + paths.insert(i.path); } auto lock_guard = co_await ss::get_units(_table_lock, 1); table_t tmp; + for (const auto& it : _table) { - if (existent_hashes.contains(it.first)) { + if (paths.contains(it.first)) { tmp.insert(it); } co_await ss::maybe_yield(); } + + if (add_entries) { + auto should_add = [this, &tmp](const auto& e) { + return should_track(e.path) && !tmp.contains(e.path); + }; + for (const auto& entry : existent | std::views::filter(should_add)) { + _dirty = true; + tmp.insert( + {entry.path, + {static_cast( + std::chrono::time_point_cast( + entry.access_time) + .time_since_epoch() + .count()), + entry.size}}); + } + } + if (_table.size() != tmp.size()) { // We dropped one or more entries, therefore mutated the table. _dirty = true; diff --git a/src/v/cloud_storage/access_time_tracker.h b/src/v/cloud_storage/access_time_tracker.h index ff95441cc14ed..b402be04bf434 100644 --- a/src/v/cloud_storage/access_time_tracker.h +++ b/src/v/cloud_storage/access_time_tracker.h @@ -62,8 +62,11 @@ class access_time_tracker { /// to disk. bool is_dirty() const; + using add_entries_t = ss::bool_class; /// Remove every key which isn't present in list of input files - ss::future<> trim(const fragmented_vector&); + ss::future<> sync( + const fragmented_vector&, + add_entries_t add_entries = add_entries_t::no); size_t size() const { return _table.size(); } diff --git a/src/v/cloud_storage/cache_probe.cc b/src/v/cloud_storage/cache_probe.cc index 5504af4dc96d1..c5da6e1ad77d8 100644 --- a/src/v/cloud_storage/cache_probe.cc +++ b/src/v/cloud_storage/cache_probe.cc @@ -37,7 +37,6 @@ cache_probe::cache_probe() { [this] { return _num_cached_gets; }, sm::description( "Total number of get requests that are already in cache.")), - sm::make_gauge( "size_bytes", [this] { return _cur_size_bytes; }, @@ -84,6 +83,18 @@ cache_probe::cache_probe() { sm::description( "High watermark of number of objects in cache.")) .aggregate(aggregate_labels), + sm::make_counter( + "tracker_syncs", + [this] { return _tracker_syncs; }, + sm::description( + "Number of times the access tracker was updated " + "with cache disk data")) + .aggregate(aggregate_labels), + sm::make_gauge( + "tracker_size", + [this] { return _tracker_size; }, + sm::description("Number of entries in cache access tracker")) + .aggregate(aggregate_labels), }); _public_metrics.add_group( diff --git a/src/v/cloud_storage/cache_probe.h b/src/v/cloud_storage/cache_probe.h index 2eb1238c59101..dfbe4e0f047fb 100644 --- a/src/v/cloud_storage/cache_probe.h +++ b/src/v/cloud_storage/cache_probe.h @@ -37,12 +37,16 @@ class cache_probe { void put_started() { ++_cur_in_progress_files; } void put_ended() { --_cur_in_progress_files; } + void set_tracker_size(uint64_t size) { _tracker_size = size; } + void fast_trim() { ++_fast_trims; } void exhaustive_trim() { ++_exhaustive_trims; } void carryover_trim() { ++_carryover_trims; } void failed_trim() { ++_failed_trims; } void in_mem_trim() { ++_in_mem_trims; } + void tracker_sync() { ++_tracker_syncs; } + private: uint64_t _num_puts = 0; uint64_t _num_gets = 0; @@ -54,6 +58,7 @@ class cache_probe { int64_t _cur_num_files = 0; int64_t _hwm_num_files = 0; int64_t _cur_in_progress_files = 0; + uint64_t _tracker_size{0}; int64_t _fast_trims{0}; int64_t _exhaustive_trims{0}; @@ -61,6 +66,8 @@ class cache_probe { int64_t _failed_trims{0}; int64_t _in_mem_trims{0}; + uint64_t _tracker_syncs{0}; + metrics::internal_metric_groups _metrics; metrics::public_metric_groups _public_metrics; }; diff --git a/src/v/cloud_storage/cache_service.cc b/src/v/cloud_storage/cache_service.cc index f3b8a532e242b..66a2c9184b9b9 100644 --- a/src/v/cloud_storage/cache_service.cc +++ b/src/v/cloud_storage/cache_service.cc @@ -68,7 +68,7 @@ std::ostream& operator<<(std::ostream& o, cache_element_status s) { return o; } -static constexpr std::string_view tmp_extension{".part"}; +static constexpr auto tracker_sync_period = 3600s * 6; cache::cache( std::filesystem::path cache_dir, @@ -190,7 +190,8 @@ uint64_t cache::get_total_cleaned() { return _total_cleaned; } ss::future<> cache::clean_up_at_start() { auto guard = _gate.hold(); - auto [walked_size, filtered_out_files, candidates_for_deletion, empty_dirs] + auto + [walked_size, filtered_out_files, candidates_for_deletion, empty_dirs, _] = co_await _walker.walk( _cache_dir.native(), _access_time_tracker, _walk_concurrency()); @@ -201,7 +202,21 @@ ss::future<> cache::clean_up_at_start() { // The state of the _access_time_tracker and the actual content of the // cache directory might diverge over time (if the user removes segment // files manually). We need to take this into account. - co_await _access_time_tracker.trim(candidates_for_deletion); + + // On startup we perform a bi-directional sync, IE entries found during + // directory walk which are not in tracker are added to it. This covers the + // following scenarios: + // 1. Following an upgrade, the tracker was loaded as empty to discard + // previous serialized data. Now we need to rehydrate the tracker and it is + // easier to do it now than wait for get requests to do this. + // 2. In a previous run the tracker had entries which it was not able to + // write to disk due to a crash. A directory walk will bring the tracker to + // an up to date state. + co_await _access_time_tracker.sync( + candidates_for_deletion, access_time_tracker::add_entries_t::yes); + + probe.tracker_sync(); + probe.set_tracker_size(_access_time_tracker.size()); uint64_t deleted_bytes{0}; size_t deleted_count{0}; @@ -209,7 +224,8 @@ ss::future<> cache::clean_up_at_start() { auto filepath_to_remove = file_item.path; // delete only tmp files that are left from previous RedPanda run - if (std::string_view(filepath_to_remove).ends_with(tmp_extension)) { + if (std::string_view(filepath_to_remove) + .ends_with(cache_tmp_file_extension)) { try { co_await delete_file_and_empty_parents(filepath_to_remove); deleted_bytes += file_item.size; @@ -384,7 +400,9 @@ ss::future<> cache::trim( _reserved_cache_objects, _reservations_pending, _reservations_pending_objects, - tracker_lru_entries.size()); + tracker_lru_entries.size(), + size_to_delete, + objects_to_delete); auto trim_result = co_await do_trim( tracker_lru_entries, size_to_delete, objects_to_delete); @@ -417,7 +435,16 @@ ss::future<> cache::trim( co_return; } - auto [walked_cache_size, filtered_out_files, candidates_for_deletion, _] + // We are going to do a walk, rearm the periodic tracker sync if it is about + // to run soon. + _tracker_sync_timer.rearm(ss::lowres_clock::now() + tracker_sync_period); + + auto + [walked_cache_size, + filtered_out_files, + candidates_for_deletion, + _, + tmp_files_size] = co_await _walker.walk( _cache_dir.native(), _access_time_tracker, @@ -430,16 +457,10 @@ ss::future<> cache::trim( // Updating the access time tracker in case if some files were removed // from cache directory by the user manually. - co_await _access_time_tracker.trim(candidates_for_deletion); - - // Calculate total space used by tmp files: we will use this later - // when updating current_cache_size. - uint64_t tmp_files_size{0}; - for (const auto& i : candidates_for_deletion) { - if (std::string_view(i.path).ends_with(tmp_extension)) { - tmp_files_size += i.size; - } - } + co_await _access_time_tracker.sync(candidates_for_deletion); + + probe.tracker_sync(); + probe.set_tracker_size(_access_time_tracker.size()); vlog( cst_log.debug, @@ -711,7 +732,8 @@ ss::future cache::do_trim( } // skip tmp files since someone may be writing to it - if (std::string_view(file_stat.path).ends_with(tmp_extension)) { + if (std::string_view(file_stat.path) + .ends_with(cache_tmp_file_extension)) { return true; } @@ -785,7 +807,7 @@ cache::trim_exhaustive(uint64_t size_to_delete, size_t objects_to_delete) { // Enumerate ALL files in the cache (as opposed to trim_fast that strips out // indices/tx/tmp files) - auto [walked_cache_size, _filtered_out, candidates, _] + auto [walked_cache_size, _filtered_out, candidates, _, tmp_files_size] = co_await _walker.walk( _cache_dir.native(), _access_time_tracker, _walk_concurrency()); @@ -839,7 +861,7 @@ cache::trim_exhaustive(uint64_t size_to_delete, size_t objects_to_delete) { // We are shutting down, stop iterating and propagate throw; } catch (const std::filesystem::filesystem_error& e) { - if (likely(file_stat.path.ends_with(tmp_extension))) { + if (likely(file_stat.path.ends_with(cache_tmp_file_extension))) { // In exhaustive scan we might hit a .part file and get ENOENT, // this is expected behavior occasionally. result.trim_missed_tmp_files = true; @@ -989,15 +1011,30 @@ ss::future<> cache::start() { }); }); _tracker_timer.arm_periodic(access_timer_period); + + _tracker_sync_timer.set_callback([this] { + ssx::spawn_with_gate(_gate, [this]() -> ss::future<> { + return sync_access_time_tracker().handle_exception( + [](auto eptr) { + vlog( + cst_log.error, + "failed to sync access time tracker: {}", + eptr); + }); + }); + }); + _tracker_sync_timer.arm(tracker_sync_period); } } ss::future<> cache::stop() { vlog(cst_log.debug, "Stopping archival cache service"); _tracker_timer.cancel(); + _tracker_sync_timer.cancel(); _as.request_abort(); _block_puts_cond.broken(); _cleanup_sm.broken(); + _tracker_sync_timer_sem.broken(); if (ss::this_shard_id() == 0) { co_await save_access_time_tracker().handle_exception([](auto eptr) { // NOTE: see issue/11270 if the exception is "filesystem error: @@ -1169,11 +1206,12 @@ ss::future<> cache::put( probe.put_ended(); }); auto filename = normal_key_path.filename(); - if (std::string_view(filename.native()).ends_with(tmp_extension)) { + if (std::string_view(filename.native()) + .ends_with(cache_tmp_file_extension)) { throw std::invalid_argument(fmt::format( "Cache file key {} is ending with tmp extension {}.", normal_key_path.native(), - tmp_extension)); + cache_tmp_file_extension)); } auto dir_path = normal_key_path.remove_filename(); @@ -1187,7 +1225,7 @@ ss::future<> cache::put( filename.native(), ss::this_shard_id(), (++_cnt), - tmp_extension)); + cache_tmp_file_extension)); ss::file tmp_cache_file; while (true) { @@ -1507,7 +1545,8 @@ cache::trim_carryover(uint64_t delete_bytes, uint64_t delete_objects) { // Don't hit access time tracker file/tmp if ( is_trim_exempt(file_stat.path) - || std::string_view(file_stat.path).ends_with(tmp_extension)) { + || std::string_view(file_stat.path) + .ends_with(cache_tmp_file_extension)) { continue; } // Both tx and index files are handled as part of the segment @@ -1865,4 +1904,52 @@ ss::future<> cache::initialize(std::filesystem::path cache_dir) { co_await ss::recursive_touch_directory(cache_dir.string()); } } + +ss::future<> cache::sync_access_time_tracker( + access_time_tracker::add_entries_t add_entries) { + if (_cleanup_sm.available_units() <= 0) { + vlog( + cst_log.debug, + "syncing access time tracker postponed, trim is running"); + _tracker_sync_timer.rearm( + ss::lowres_clock::now() + tracker_sync_period); + co_return; + } + + if (_tracker_sync_timer_sem.try_wait()) { + vlog(cst_log.debug, "syncing access time tracker with disk"); + auto [cache_size, filtered_out, items, empty_dirs, tmp_files_size] + = co_await _walker.walk( + _cache_dir.native(), _access_time_tracker, _walk_concurrency()); + + co_await _access_time_tracker.sync(items, add_entries); + vlog( + cst_log.debug, + "syncing access time tracker with disk complete: cache size {}, " + "items: {}", + cache_size, + items.size()); + + const auto tracker_size + = (co_await access_time_tracker_size()).value_or(0); + + _current_cache_size = cache_size - tmp_files_size - tracker_size; + _current_cache_objects = items.size(); + + probe.set_size(_current_cache_size); + probe.set_num_files(_current_cache_objects); + + probe.tracker_sync(); + probe.set_tracker_size(_access_time_tracker.size()); + + _tracker_sync_timer.rearm( + ss::lowres_clock::now() + tracker_sync_period); + } else { + vlog( + cst_log.debug, + "syncing access time tracker with disk skipped, sync is already " + "running"); + } +} + } // namespace cloud_storage diff --git a/src/v/cloud_storage/cache_service.h b/src/v/cloud_storage/cache_service.h index 9827f056d6116..e88040310f505 100644 --- a/src/v/cloud_storage/cache_service.h +++ b/src/v/cloud_storage/cache_service.h @@ -312,6 +312,9 @@ class cache : public ss::peering_sharded_service { ss::future remove_segment_full(const file_list_item& file_stat); + ss::future<> sync_access_time_tracker( + access_time_tracker::add_entries_t add_entries + = access_time_tracker::add_entries_t::no); std::filesystem::path _cache_dir; size_t _disk_size; config::binding _disk_reservation; @@ -382,6 +385,10 @@ class cache : public ss::peering_sharded_service { // List of probable deletion candidates from the last trim. std::optional> _last_trim_carryover; + + ss::timer _tracker_sync_timer; + ssx::semaphore _tracker_sync_timer_sem{ + 1, "cloud/cache/access_tracker_sync"}; }; } // namespace cloud_storage diff --git a/src/v/cloud_storage/recursive_directory_walker.cc b/src/v/cloud_storage/recursive_directory_walker.cc index 91ad90dd7d720..a90ab34d103fe 100644 --- a/src/v/cloud_storage/recursive_directory_walker.cc +++ b/src/v/cloud_storage/recursive_directory_walker.cc @@ -62,6 +62,9 @@ struct walk_accumulator { file_size); current_cache_size += static_cast(file_size); + if (entry_path.ends_with(cache_tmp_file_extension)) { + tmp_files_size += file_size; + } if (!filter || filter.value()(entry_path)) { files.push_back( @@ -92,6 +95,7 @@ struct walk_accumulator { fragmented_vector files; uint64_t current_cache_size{0}; size_t filtered_out_files{0}; + size_t tmp_files_size{0}; }; } // namespace cloud_storage @@ -198,7 +202,8 @@ ss::future recursive_directory_walker::walk( .cache_size = state.current_cache_size, .filtered_out_files = state.filtered_out_files, .regular_files = std::move(state.files), - .empty_dirs = std::move(empty_dirs)}; + .empty_dirs = std::move(empty_dirs), + .tmp_files_size = state.tmp_files_size}; } ss::future<> recursive_directory_walker::stop() { diff --git a/src/v/cloud_storage/recursive_directory_walker.h b/src/v/cloud_storage/recursive_directory_walker.h index 080660cfd502a..1cabfcd20a4ff 100644 --- a/src/v/cloud_storage/recursive_directory_walker.h +++ b/src/v/cloud_storage/recursive_directory_walker.h @@ -20,6 +20,8 @@ namespace cloud_storage { +constexpr auto cache_tmp_file_extension{".part"}; + class access_time_tracker; struct file_list_item { @@ -33,6 +35,7 @@ struct walk_result { size_t filtered_out_files{0}; fragmented_vector regular_files; fragmented_vector empty_dirs; + size_t tmp_files_size{0}; }; class recursive_directory_walker { diff --git a/src/v/cloud_storage/tests/cache_test.cc b/src/v/cloud_storage/tests/cache_test.cc index b171ce05224db..e8208e2cec360 100644 --- a/src/v/cloud_storage/tests/cache_test.cc +++ b/src/v/cloud_storage/tests/cache_test.cc @@ -719,3 +719,52 @@ FIXTURE_TEST(test_background_maybe_trim, cache_test_fixture) { // = 40%. +1 for the object we just added. BOOST_REQUIRE_EQUAL(get_object_count(), 41); } + +FIXTURE_TEST(test_tracker_sync_only_remove, cache_test_fixture) { + put_into_cache(create_data_string('a', 1_KiB), KEY); + auto& cache = sharded_cache.local(); + cache.get(KEY).get(); + + const auto full_key_path = CACHE_DIR / KEY; + + const auto& t = tracker(); + BOOST_REQUIRE_EQUAL(t.size(), 1); + + { + const auto entry = t.get(full_key_path.native()); + BOOST_REQUIRE(entry.has_value()); + BOOST_REQUIRE_EQUAL(entry->size, 1_KiB); + BOOST_REQUIRE_EQUAL(cache.get_usage_bytes(), 1_KiB); + BOOST_REQUIRE_EQUAL(cache.get_usage_objects(), 1); + } + + ss::remove_file(full_key_path.native()).get(); + + { + const auto entry = t.get(full_key_path.native()); + BOOST_REQUIRE(entry.has_value()); + BOOST_REQUIRE_EQUAL(entry->size, 1_KiB); + BOOST_REQUIRE_EQUAL(cache.get_usage_bytes(), 1_KiB); + BOOST_REQUIRE_EQUAL(cache.get_usage_objects(), 1); + } + + sync_tracker(); + + BOOST_REQUIRE_EQUAL(t.size(), 0); + BOOST_REQUIRE(!t.get(full_key_path.native()).has_value()); + BOOST_REQUIRE_EQUAL(cache.get_usage_bytes(), 0); + BOOST_REQUIRE_EQUAL(cache.get_usage_objects(), 0); +} + +FIXTURE_TEST(test_tracker_sync_add_remove, cache_test_fixture) { + put_into_cache(create_data_string('a', 1_KiB), KEY); + auto& cache = sharded_cache.local(); + const auto full_key_path = CACHE_DIR / KEY; + const auto& t = tracker(); + BOOST_REQUIRE_EQUAL(t.size(), 0); + sync_tracker(access_time_tracker::add_entries_t::yes); + BOOST_REQUIRE_EQUAL(t.size(), 1); + BOOST_REQUIRE(t.get(full_key_path.native()).has_value()); + BOOST_REQUIRE_EQUAL(cache.get_usage_bytes(), 1024); + BOOST_REQUIRE_EQUAL(cache.get_usage_objects(), 1); +} diff --git a/src/v/cloud_storage/tests/cache_test_fixture.h b/src/v/cloud_storage/tests/cache_test_fixture.h index 33a3e44dfed36..26bfbf5edfa4d 100644 --- a/src/v/cloud_storage/tests/cache_test_fixture.h +++ b/src/v/cloud_storage/tests/cache_test_fixture.h @@ -12,6 +12,7 @@ #include "base/seastarx.h" #include "base/units.h" #include "bytes/iobuf.h" +#include "bytes/iostream.h" #include "cloud_storage/cache_service.h" #include "config/property.h" #include "test_utils/scoped_config.h" @@ -168,6 +169,16 @@ class cache_test_fixture { } scoped_config cfg; + + void sync_tracker( + access_time_tracker::add_entries_t add_entries + = access_time_tracker::add_entries_t::no) { + sharded_cache.local().sync_access_time_tracker(add_entries).get(); + } + + const access_time_tracker& tracker() const { + return sharded_cache.local()._access_time_tracker; + } }; } // namespace cloud_storage From 11b5fefa819734d01dd5ba6711edd012904ee130 Mon Sep 17 00:00:00 2001 From: Abhijat Malviya Date: Mon, 10 Jun 2024 14:24:28 +0530 Subject: [PATCH 5/5] cst/ducktape: Add asserts for tracker based trim (cherry picked from commit c17d79d85a729a6a23e8abe7438e1083d3e8c771) --- .../tiered_storage_cache_stress_test.py | 90 +++++++++++++++++-- .../cloud_storage_chunk_read_path_test.py | 5 +- 2 files changed, 85 insertions(+), 10 deletions(-) diff --git a/tests/rptest/scale_tests/tiered_storage_cache_stress_test.py b/tests/rptest/scale_tests/tiered_storage_cache_stress_test.py index 514517c4ae2c4..267494a6ee681 100644 --- a/tests/rptest/scale_tests/tiered_storage_cache_stress_test.py +++ b/tests/rptest/scale_tests/tiered_storage_cache_stress_test.py @@ -6,7 +6,10 @@ # As of the Change Date specified in that file, in accordance with # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0 +import sys +from dataclasses import dataclass +from rptest.services.metrics_check import MetricCheck from rptest.tests.redpanda_test import RedpandaTest from rptest.services.cluster import cluster from rptest.services.kgo_verifier_services import KgoVerifierProducer, KgoVerifierSeqConsumer, KgoVerifierRandomConsumer @@ -28,6 +31,18 @@ class LimitMode(str, Enum): both = 'both' +@dataclass +class CacheExpectations: + tracker_size: int = 0 + min_mem_trims: int = 0 + max_mem_trims: int = sys.maxsize + min_fast_trims: int = 0 + max_fast_trims: int = sys.maxsize + min_exhaustive_trims: int = 0 + max_exhaustive_trims: int = sys.maxsize + num_syncs: int = 0 + + class TieredStorageCacheStressTest(RedpandaTest): segment_upload_interval = 30 manifest_upload_interval = 10 @@ -48,6 +63,7 @@ def setUp(self): self.segment_upload_interval, 'cloud_storage_manifest_max_upload_interval_sec': self.manifest_upload_interval, + 'disable_public_metrics': False, } self.redpanda.set_extra_rp_conf(extra_rp_conf) @@ -93,8 +109,6 @@ def _validate_node_storage(self, node, limit_mode: LimitMode, hwm_size = int(sample.value) elif sample.name == "redpanda_cloud_storage_cache_space_hwm_files": hwm_objects = int(sample.value) - #else: - # self.logger.debug(sample.name) assert hwm_size is not None, "Cache HWM metric not found" any_cache_usage = (usage['cloud_storage_cache_bytes'] > 0 @@ -361,7 +375,9 @@ def tiny_cache_test(self): cache_size, max_objects=None) - def run_test_with_cache_prefilled(self, cache_prefill_command: str): + def run_test_with_cache_prefilled(self, cache_prefill_command: str, + prefill_count: int, + expectations: CacheExpectations): segment_size = 128 * 1024 * 1024 msg_size = 16384 data_size = segment_size * 10 @@ -374,7 +390,6 @@ def run_test_with_cache_prefilled(self, cache_prefill_command: str): self.redpanda.clean_node(n) # Pre-populate caches with files. - prefill_count = 100 for node in self.redpanda.nodes: node.account.ssh(cache_prefill_command.format(prefill_count)) @@ -391,6 +406,7 @@ def run_test_with_cache_prefilled(self, cache_prefill_command: str): # Bring up redpanda self.redpanda.set_si_settings(si_settings) + self.redpanda.start(clean_nodes=False) # Cache startup should have registered the garbage objects in stats @@ -398,10 +414,32 @@ def run_test_with_cache_prefilled(self, cache_prefill_command: str): for node in self.redpanda.nodes: usage = admin.get_local_storage_usage(node) assert usage[ - 'cloud_storage_cache_objects'] >= prefill_count, f"Node {node.name} has unexpectedly few objects {usage['cloud_storage_cache_objects']} < {prefill_count}" + 'cloud_storage_cache_objects'] >= prefill_count, \ + (f"Node {node.name} has unexpectedly few objects " + f"{usage['cloud_storage_cache_objects']} < {prefill_count}") # Inject data self._create_topic(topic_name, 1, segment_size) + + trim_metrics = [ + 'redpanda_cloud_storage_cache_trim_in_mem_trims_total', + 'redpanda_cloud_storage_cache_trim_fast_trims_total', + 'redpanda_cloud_storage_cache_trim_exhaustive_trims_total', + 'redpanda_cloud_storage_cache_space_tracker_syncs_total', + 'redpanda_cloud_storage_cache_space_tracker_size' + ] + m = MetricCheck(self.redpanda.logger, + self.redpanda, + self.redpanda.partitions(topic_name)[0].leader, + trim_metrics, + metrics_endpoint=MetricsEndpoint.PUBLIC_METRICS) + + m.expect([('redpanda_cloud_storage_cache_space_tracker_syncs_total', + lambda a, b: a == expectations.num_syncs == b)]) + + m.expect([('redpanda_cloud_storage_cache_space_tracker_size', + lambda a, b: a == expectations.tracker_size == b)]) + self._produce_and_quiesce(topic_name, msg_size, data_size, expect_bandwidth) @@ -424,7 +462,19 @@ def run_test_with_cache_prefilled(self, cache_prefill_command: str): # reduce in number to the point that the read is able to proceed usage = admin.get_local_storage_usage(leader_node) assert usage[ - 'cloud_storage_cache_objects'] <= cache_object_limit, f"Node {leader_node.name} has unexpectedly many objects {usage['cloud_storage_cache_objects']} > {cache_object_limit}" + 'cloud_storage_cache_objects'] <= cache_object_limit, \ + (f"Node {leader_node.name} has unexpectedly many objects " + f"{usage['cloud_storage_cache_objects']} > {cache_object_limit}") + + m.expect([('redpanda_cloud_storage_cache_trim_in_mem_trims_total', + lambda a, b: a == 0 and expectations.max_mem_trims >= b >= + expectations.min_mem_trims)]) + m.expect([('redpanda_cloud_storage_cache_trim_fast_trims_total', + lambda a, b: a == 0 and expectations.max_fast_trims >= b >= + expectations.min_fast_trims)]) + m.expect([('redpanda_cloud_storage_cache_trim_exhaustive_trims_total', + lambda a, b: a == 0 and expectations.max_exhaustive_trims >= + b >= expectations.min_exhaustive_trims)]) @cluster(num_nodes=4, log_allow_list=S3_ERROR_LOGS) def garbage_objects_test(self): @@ -438,11 +488,22 @@ def garbage_objects_test(self): https://github.com/redpanda-data/redpanda/issues/11835 """ + prefill_count = 100 + expectations = CacheExpectations( + num_syncs=1, + # The tracker will add all non-index/tx/tmp files during startup sync + tracker_size=prefill_count, + # Tracker based trim should be enough to acquire space + min_mem_trims=1, + min_fast_trims=0, + max_fast_trims=0, + min_exhaustive_trims=0, + max_exhaustive_trims=0) self.run_test_with_cache_prefilled( f"mkdir -p {self.redpanda.cache_dir} ; " "for n in `seq 1 {}`; do " - f"dd if=/dev/urandom bs=1k count=4 of={self.redpanda.cache_dir}/garbage_$n.bin ; done" - ) + f"dd if=/dev/urandom bs=1k count=4 of={self.redpanda.cache_dir}/garbage_$n.bin ; done", + prefill_count, expectations) @cluster(num_nodes=4, log_allow_list=S3_ERROR_LOGS) def test_indices_dominate_cache(self): @@ -450,9 +511,20 @@ def test_indices_dominate_cache(self): Ensures that if the cache is filled with index and tx objects alone, trimming still works. """ + + prefill_count = 100 + expectations = CacheExpectations( + num_syncs=1, + # Neither index nor tx files will be added to tracker during startup sync + tracker_size=0, + # Since orphan index and tx files are only cleaned up in exhaustive trim, + # all three trims have to run to acquire space + min_mem_trims=1, + min_fast_trims=1, + min_exhaustive_trims=1) self.run_test_with_cache_prefilled( f"mkdir -pv {self.redpanda.cache_dir}; " "for n in `seq 1 {}`; do " f"touch {self.redpanda.cache_dir}/garbage_$n.index && " f"touch {self.redpanda.cache_dir}/garbage_$n.tx; " - "done") + "done", prefill_count, expectations) diff --git a/tests/rptest/tests/cloud_storage_chunk_read_path_test.py b/tests/rptest/tests/cloud_storage_chunk_read_path_test.py index 0d2c5c1bd84f7..5baef70c46040 100644 --- a/tests/rptest/tests/cloud_storage_chunk_read_path_test.py +++ b/tests/rptest/tests/cloud_storage_chunk_read_path_test.py @@ -183,7 +183,10 @@ def _assert_in_cache(self, expr: str): num_nodes=4, log_allow_list=[ # Ignore trim related errors caused by deleting chunk files manually. - "failed to free sufficient space in exhaustive trim" + "failed to free sufficient space in exhaustive trim", + # With tracker based trim and the manual deletes in this test, sometimes + # the cache tries to trim chunks which have already been removed externally. + "filesystem error: remove failed: No such file or directory" ]) def test_read_chunks(self): self.default_chunk_size = 1048576