From 57985b74ff30f1cb8a27f06a4cbb688b2201b569 Mon Sep 17 00:00:00 2001 From: Evgeny Lazin Date: Wed, 24 Apr 2024 11:55:45 +0000 Subject: [PATCH 01/10] cloud_storage: Add watchdog to the directory walk --- src/v/cloud_storage/recursive_directory_walker.cc | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/v/cloud_storage/recursive_directory_walker.cc b/src/v/cloud_storage/recursive_directory_walker.cc index a3c582e054582..0d432b31c5aa1 100644 --- a/src/v/cloud_storage/recursive_directory_walker.cc +++ b/src/v/cloud_storage/recursive_directory_walker.cc @@ -14,10 +14,12 @@ #include "base/vlog.h" #include "cloud_storage/access_time_tracker.h" #include "cloud_storage/logger.h" +#include "ssx/watchdog.h" #include #include #include +#include #include #include @@ -94,6 +96,13 @@ ss::future recursive_directory_walker::walk( std::optional collect_filter) { auto guard = _gate.hold(); + watchdog wd1m(std::chrono::seconds(60), [] { + vlog(cst_log.info, "Directory walk is taking more than 1 min"); + }); + watchdog wd10m(std::chrono::seconds(600), [] { + vlog(cst_log.warn, "Directory walk is taking more than 10 min"); + }); + // Object to accumulate data as we walk directories walk_accumulator state(start_dir, tracker, std::move(collect_filter)); @@ -103,7 +112,8 @@ ss::future recursive_directory_walker::walk( auto target = state.pop(); vassert( std::string_view(target).starts_with(start_dir), - "Looking at directory {}, which is outside of initial dir {}.", + "Looking at directory {}, which is outside of initial dir " + "{}.", target, start_dir); From 940fcd4d802e1f3665243f681d58b323c9e27ece Mon Sep 17 00:00:00 2001 From: Evgeny Lazin Date: Wed, 24 Apr 2024 18:24:24 +0000 Subject: [PATCH 02/10] config: Add cloud_storage_cache_trim_carryover Add new parameter that controls cache carryover behavior. --- src/v/config/configuration.cc | 11 +++++++++++ src/v/config/configuration.h | 1 + 2 files changed, 12 insertions(+) diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 4fa14cff60d19..20a78defa49f7 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -2332,6 +2332,17 @@ configuration::configuration() // Enough for a >1TiB cache of 16MiB objects. Decrease this in case // of issues with trim performance. 100000) + , cloud_storage_cache_trim_carryover( + *this, + "cloud_storage_cache_trim_carryover", + "The cache performs a recursive directory inspection during the cache " + "trim. The information obtained during the inspection can be carried " + "over to the next trim operation. This parameter sets a limit on the " + "number of objects that can be carried over from one trim to next, and " + "allows cache to quickly unblock readers before starting the directory " + "inspection.", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + 512) , cloud_storage_cache_check_interval_ms( *this, "cloud_storage_cache_check_interval", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 73260863689c4..010b1c693a0eb 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -425,6 +425,7 @@ struct configuration final : public config_store { bounded_property, numeric_bounds> cloud_storage_cache_size_percent; property cloud_storage_cache_max_objects; + property> cloud_storage_cache_trim_carryover; property cloud_storage_cache_check_interval_ms; property> cloud_storage_max_segment_readers_per_shard; From 5ece9d401917d929a8d5e986b6aabacfd54884c1 Mon Sep 17 00:00:00 2001 From: Evgeny Lazin Date: Wed, 24 Apr 2024 18:25:00 +0000 Subject: [PATCH 03/10] cloud_storage: Implement "carryover" behavior The "carryover" behavior allows cache to use information from the previous trim to quickly trim the cache without scanning the whole directory. This allows cache to avoid blocking readers. In a situation when the cache cntains very large number of files the recursive directory walk could take few minutes. We're not allowing number of objects stored in the cache to overshoot so all the readers are blocked until the walk is finished. This commit adds new "carryover" trim mechanism which is running before the normal trim and uses information obtained during the previous fast or full trim to delete some objects wihtout walking the directory tree. --- src/v/cloud_storage/cache_service.cc | 382 ++++++++++++++++++++------- src/v/cloud_storage/cache_service.h | 12 + 2 files changed, 296 insertions(+), 98 deletions(-) diff --git a/src/v/cloud_storage/cache_service.cc b/src/v/cloud_storage/cache_service.cc index 937427bb88eb3..2ec7b2a35fdb3 100644 --- a/src/v/cloud_storage/cache_service.cc +++ b/src/v/cloud_storage/cache_service.cc @@ -13,9 +13,13 @@ #include "bytes/iostream.h" #include "cloud_storage/access_time_tracker.h" #include "cloud_storage/logger.h" +#include "cloud_storage/recursive_directory_walker.h" +#include "config/configuration.h" #include "seastar/util/file.hh" #include "ssx/future-util.h" +#include "ssx/sformat.h" #include "storage/segment.h" +#include "utils/human.h" #include #include @@ -529,6 +533,99 @@ ss::future<> cache::trim( _last_trim_failed = false; } +ss::future +cache::remove_segment_full(const file_list_item& file_stat) { + trim_result result; + try { + uint64_t this_segment_deleted_bytes{0}; + + auto deleted_parents = co_await delete_file_and_empty_parents( + file_stat.path); + result.deleted_size += file_stat.size; + this_segment_deleted_bytes += file_stat.size; + _current_cache_size -= file_stat.size; + _current_cache_objects -= 1; + result.deleted_count += 1; + + // Determine whether we should delete indices along with the + // object we have just deleted + std::optional tx_file; + std::optional index_file; + + if (RE2::FullMatch(file_stat.path.data(), segment_expr)) { + // If this was a legacy whole-segment item, delete the index + // and tx file along with the segment + tx_file = fmt::format("{}.tx", file_stat.path); + index_file = fmt::format("{}.index", file_stat.path); + } else if (deleted_parents) { + auto immediate_parent = std::string( + std::filesystem::path(file_stat.path).parent_path()); + static constexpr std::string_view chunks_suffix{"_chunks"}; + if (immediate_parent.ends_with(chunks_suffix)) { + // We just deleted the last chunk from a _chunks segment + // directory. We may delete the index + tx state for + // that segment. + auto base_segment_path = immediate_parent.substr( + 0, immediate_parent.size() - chunks_suffix.size()); + tx_file = fmt::format("{}.tx", base_segment_path); + index_file = fmt::format("{}.index", base_segment_path); + } + } + + if (tx_file.has_value()) { + try { + auto sz = co_await ss::file_size(tx_file.value()); + co_await ss::remove_file(tx_file.value()); + result.deleted_size += sz; + this_segment_deleted_bytes += sz; + result.deleted_count += 1; + _current_cache_size -= sz; + _current_cache_objects -= 1; + } catch (std::filesystem::filesystem_error& e) { + if (e.code() != std::errc::no_such_file_or_directory) { + throw; + } + } + } + + if (index_file.has_value()) { + try { + auto sz = co_await ss::file_size(index_file.value()); + co_await ss::remove_file(index_file.value()); + result.deleted_size += sz; + this_segment_deleted_bytes += sz; + result.deleted_count += 1; + _current_cache_size -= sz; + _current_cache_objects -= 1; + } catch (std::filesystem::filesystem_error& e) { + if (e.code() != std::errc::no_such_file_or_directory) { + throw; + } + } + } + + // Remove key if possible to make sure there is no resource + // leak + _access_time_tracker.remove_timestamp(std::string_view(file_stat.path)); + + vlog( + cst_log.trace, + "trim: reclaimed(fast) {} bytes from {}", + this_segment_deleted_bytes, + file_stat.path); + } catch (const ss::gate_closed_exception&) { + // We are shutting down, stop iterating and propagate + throw; + } catch (const std::exception& e) { + vlog( + cst_log.error, + "trim: couldn't delete {}: {}.", + file_stat.path, + e.what()); + } + co_return result; +} + ss::future cache::trim_fast( const fragmented_vector& candidates, uint64_t size_to_delete, @@ -537,19 +634,17 @@ ss::future cache::trim_fast( trim_result result; - size_t candidate_i = 0; - while ( - candidate_i < candidates.size() - && (result.deleted_size < size_to_delete || result.deleted_count < objects_to_delete)) { - auto& file_stat = candidates[candidate_i++]; + // Reset carryover list + _last_trim_carryover = std::nullopt; + auto need_to_skip = [this](const file_list_item& file_stat) { if (is_trim_exempt(file_stat.path)) { - continue; + return true; } // skip tmp files since someone may be writing to it if (std::string_view(file_stat.path).ends_with(tmp_extension)) { - continue; + return true; } // Doesn't make sense to demote these independent of the segment @@ -558,97 +653,45 @@ ss::future cache::trim_fast( if ( std::string_view(file_stat.path).ends_with(".tx") || std::string_view(file_stat.path).ends_with(".index")) { - continue; + return true; } + return false; + }; - try { - uint64_t this_segment_deleted_bytes{0}; - - auto deleted_parents = co_await delete_file_and_empty_parents( - file_stat.path); - result.deleted_size += file_stat.size; - this_segment_deleted_bytes += file_stat.size; - _current_cache_size -= file_stat.size; - _current_cache_objects -= 1; - result.deleted_count += 1; - - // Determine whether we should delete indices along with the - // object we have just deleted - std::optional tx_file; - std::optional index_file; - - if (RE2::FullMatch(file_stat.path.data(), segment_expr)) { - // If this was a legacy whole-segment item, delete the index - // and tx file along with the segment - tx_file = fmt::format("{}.tx", file_stat.path); - index_file = fmt::format("{}.index", file_stat.path); - } else if (deleted_parents) { - auto immediate_parent = std::string( - std::filesystem::path(file_stat.path).parent_path()); - static constexpr std::string_view chunks_suffix{"_chunks"}; - if (immediate_parent.ends_with(chunks_suffix)) { - // We just deleted the last chunk from a _chunks segment - // directory. We may delete the index + tx state for - // that segment. - auto base_segment_path = immediate_parent.substr( - 0, immediate_parent.size() - chunks_suffix.size()); - tx_file = fmt::format("{}.tx", base_segment_path); - index_file = fmt::format("{}.index", base_segment_path); - } - } - - if (tx_file.has_value()) { - try { - auto sz = co_await ss::file_size(tx_file.value()); - co_await ss::remove_file(tx_file.value()); - result.deleted_size += sz; - this_segment_deleted_bytes += sz; - result.deleted_count += 1; - _current_cache_size -= sz; - _current_cache_objects -= 1; - } catch (std::filesystem::filesystem_error& e) { - if (e.code() != std::errc::no_such_file_or_directory) { - throw; - } - } - } + size_t candidate_i = 0; + while ( + candidate_i < candidates.size() + && (result.deleted_size < size_to_delete || result.deleted_count < objects_to_delete)) { + auto& file_stat = candidates[candidate_i++]; - if (index_file.has_value()) { - try { - auto sz = co_await ss::file_size(index_file.value()); - co_await ss::remove_file(index_file.value()); - result.deleted_size += sz; - this_segment_deleted_bytes += sz; - result.deleted_count += 1; - _current_cache_size -= sz; - _current_cache_objects -= 1; - } catch (std::filesystem::filesystem_error& e) { - if (e.code() != std::errc::no_such_file_or_directory) { - throw; - } - } - } + if (need_to_skip(file_stat)) { + continue; + } - // Remove key if possible to make sure there is no resource - // leak - _access_time_tracker.remove_timestamp( - std::string_view(file_stat.path)); + auto op_res = co_await this->remove_segment_full(file_stat); + result.deleted_count += op_res.deleted_count; + result.deleted_size += op_res.deleted_size; + } - vlog( - cst_log.trace, - "trim: reclaimed(fast) {} bytes from {}", - this_segment_deleted_bytes, - file_stat.path); - } catch (const ss::gate_closed_exception&) { - // We are shutting down, stop iterating and propagate - throw; - } catch (const std::exception& e) { - vlog( - cst_log.error, - "trim: couldn't delete {}: {}.", - file_stat.path, - e.what()); + auto max_carryover_files = config::shard_local_cfg() + .cloud_storage_cache_trim_carryover.value() + .value_or(0); + fragmented_vector tmp; + auto estimated_size = std::min( + static_cast(max_carryover_files), + candidates.size() - candidate_i); + tmp.reserve(estimated_size); + while (max_carryover_files > 0 && candidate_i < candidates.size()) { + const auto& fs = candidates[candidate_i++]; + if (need_to_skip(fs)) { + continue; } + max_carryover_files--; + tmp.push_back(fs); + } + + if (!tmp.empty()) { + _last_trim_carryover = std::move(tmp); } co_return result; @@ -669,6 +712,8 @@ cache::trim_exhaustive(uint64_t size_to_delete, size_t objects_to_delete) { probe.exhaustive_trim(); trim_result result; + _last_trim_carryover = std::nullopt; + // Enumerate ALL files in the cache (as opposed to trim_fast that strips out // indices/tx/tmp files) auto [walked_cache_size, _filtered_out, candidates, _] @@ -1228,6 +1273,114 @@ bool cache::may_exceed_limits(uint64_t bytes, size_t objects) { && !would_fit_in_cache; } +ss::future +cache::trim_carryover(uint64_t delete_bytes, uint64_t delete_objects) { + // During the normal trim we're doing the recursive directory walk to + // generate a exhaustive list of files stored in the cache. If we store very + // large number of files in the cache this operation could take long time. + // We have a limit for number of objects that the cache could support but + // it's often set to relatively high value. Also, when we reach the object + // count limit the cache blocks all new 'put' operations because it doesn't + // allow any overallocation in this case. + // + // This creates a corner case when every trim is caused by the object count + // limit being reached. In this case the trim is blocking readers every + // time. + // + // The solution is to quickly delete objects without doing the full + // recursive directory walk and unblock the readers proactively allowing + // them object count to overshoot for very brief period of time. In order to + // be able to do this we need to have the list of candidates for deletion. + // Such list is stored in the _last_trim_carryover field. This is a list of + // files with oldest access times from the last directory walk. The + // carryover trim compares the access times from the carryover list to their + // actual access times from the access time tracker. All objects with + // matching access times wasn't accessed since the last trim and can be + // deleted. This doesn't change the LRU behavior since the + // _last_trim_carryover stores objects in LRU order. + trim_result result; + vlog( + cst_log.trace, + "trim carryover: list available {}", + _last_trim_carryover.has_value()); + + if (!_last_trim_carryover.has_value()) { + co_return result; + } + auto it = _last_trim_carryover->begin(); + for (; it < _last_trim_carryover->end(); it++) { + vlog( + cst_log.trace, + "carryover trim: check object {} ({})", + it->path, + it->size); + if ( + result.deleted_size >= delete_bytes + && result.deleted_count >= delete_objects) { + vlog( + cst_log.trace, + "carryover trim: stop, deleted {} / {}, requested to delete {} / " + "{}", + human::bytes(result.deleted_size), + result.deleted_count, + human::bytes(delete_bytes), + delete_objects); + break; + } + auto& file_stat = *it; + // Don't hit access time tracker file/tmp + if ( + is_trim_exempt(file_stat.path) + || std::string_view(file_stat.path).ends_with(tmp_extension)) { + continue; + } + // Both tx and index files are handled as part of the segment + // deletion. + if ( + std::string_view(file_stat.path).ends_with(".tx") + || std::string_view(file_stat.path).ends_with(".index")) { + continue; + } + // Check that access time didn't change + auto rel_path = _cache_dir + / std::filesystem::relative( + std::filesystem::path(file_stat.path), _cache_dir); + auto estimate = _access_time_tracker.estimate_timestamp( + rel_path.native()); + if (estimate != file_stat.access_time) { + vlog( + cst_log.trace, + "carryover file {} was accessed ({}) since the last trim ({}), " + "ignoring", + rel_path.native(), + estimate->time_since_epoch().count(), + file_stat.access_time.time_since_epoch().count()); + // The file was accessed since we get the stats + continue; + } + auto op_res = co_await this->remove_segment_full(file_stat); + result.deleted_count += op_res.deleted_count; + result.deleted_size += op_res.deleted_size; + } + vlog( + cst_log.debug, + "carryover trim reclaimed {} bytes from {} files", + result.deleted_size, + result.deleted_count); + + if (it == _last_trim_carryover->end()) { + _last_trim_carryover = std::nullopt; + } else { + fragmented_vector tmp; + size_t estimate = _last_trim_carryover->end() - it; + tmp.reserve(estimate); + std::copy(it, _last_trim_carryover->end(), std::back_inserter(tmp)); + _last_trim_carryover = std::move(tmp); + } + + co_return result; +} + ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) { vassert(ss::this_shard_id() == ss::shard_id{0}, "Only call on shard 0"); @@ -1238,12 +1391,6 @@ ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) { co_return; } - // Slow path: register a pending need for bytes that will be used in - // clean_up_cache to make space available, and then proceed to cooperatively - // call clean_up_cache along with anyone else who is waiting. - _reservations_pending += bytes; - _reservations_pending_objects += objects; - vlog( cst_log.trace, "Out of space reserving {} bytes (size={}/{} " @@ -1258,6 +1405,45 @@ ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) { try { auto units = co_await ss::get_units(_cleanup_sm, 1); + + if (_last_trim_carryover.has_value()) { + // Slow path: try to run carryover trim if we have data + // from the previous trim. + + auto short_term_hydrations_estimate + = config::shard_local_cfg().cloud_storage_max_connections() + * ss::smp::count; + + // Here we're trying to estimate how much space do we need to + // free to allow all TS resources to be used again to download + // data from S3. This is only a crude estimate. + auto trim_bytes = std::min( + config::shard_local_cfg().log_segment_size() + * short_term_hydrations_estimate / 3, + _max_bytes); + auto trim_objects = std::min( + short_term_hydrations_estimate * 3, _max_objects()); + + vlog( + cst_log.debug, + "Carryover trim list has {} elements, trying to remove {} bytes " + "and {} objects", + _last_trim_carryover->size(), + human::bytes(trim_bytes), + trim_objects); + + co_await trim_carryover(trim_bytes, trim_objects); + } else { + vlog(cst_log.debug, "Carryover trim list is empty"); + } + + // Slowest path: register a pending need for bytes that will be used in + // clean_up_cache to make space available, and then proceed to + // cooperatively call clean_up_cache along with anyone else who is + // waiting. + _reservations_pending += bytes; + _reservations_pending_objects += objects; + while (!may_reserve_space(bytes, objects)) { bool may_exceed = may_exceed_limits(bytes, objects) && _last_trim_failed; diff --git a/src/v/cloud_storage/cache_service.h b/src/v/cloud_storage/cache_service.h index 67a851596dc1f..bdba3bd34f042 100644 --- a/src/v/cloud_storage/cache_service.h +++ b/src/v/cloud_storage/cache_service.h @@ -253,6 +253,10 @@ class cache : public ss::peering_sharded_service { /// (only runs on shard 0) ss::future<> do_reserve_space(uint64_t, size_t); + /// Trim cache using results from the previous recursive directory walk + ss::future + trim_carryover(uint64_t delete_bytes, uint64_t delete_objects); + /// Return true if the sum of used space and reserved space is far enough /// below max size to accommodate a new reservation of `bytes` /// (only runs on shard 0) @@ -273,6 +277,11 @@ class cache : public ss::peering_sharded_service { /// update. void set_block_puts(bool); + /// Remove segment or chunk subdirectory with all its auxilary files (tx, + /// index) + ss::future + remove_segment_full(const file_list_item& file_stat); + std::filesystem::path _cache_dir; size_t _disk_size; config::binding _disk_reservation; @@ -339,6 +348,9 @@ class cache : public ss::peering_sharded_service { ss::condition_variable _block_puts_cond; friend class cache_test_fixture; + + // List of probable deletion candidates from the last trim. + std::optional> _last_trim_carryover; }; } // namespace cloud_storage From fdf34981fac71b011a3c47e3adf86006e6d9da09 Mon Sep 17 00:00:00 2001 From: Evgeny Lazin Date: Thu, 25 Apr 2024 19:22:27 +0000 Subject: [PATCH 04/10] cloud_storage: Add get_max_bytes/objects to cache --- src/v/cloud_storage/cache_service.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/v/cloud_storage/cache_service.h b/src/v/cloud_storage/cache_service.h index bdba3bd34f042..7a44fdb5f79d0 100644 --- a/src/v/cloud_storage/cache_service.h +++ b/src/v/cloud_storage/cache_service.h @@ -177,6 +177,10 @@ class cache : public ss::peering_sharded_service { uint64_t get_usage_bytes() { return _current_cache_size; } + uint64_t get_max_bytes() { return _max_bytes; } + + uint64_t get_max_objects() { return _max_objects(); } + /// Administrative trim, that specifies its own limits instead of using /// the configured limits (skips throttling, and can e.g. trim to zero bytes /// if they want to) From 125659467d743a3821516b975efe4724bdd24da0 Mon Sep 17 00:00:00 2001 From: Evgeny Lazin Date: Fri, 26 Apr 2024 12:53:39 +0000 Subject: [PATCH 05/10] cloud_storage: Add test_carryover unit-test --- src/v/cloud_storage/tests/cache_test.cc | 83 +++++++++++++++++++ .../cloud_storage/tests/cache_test_fixture.h | 4 + 2 files changed, 87 insertions(+) diff --git a/src/v/cloud_storage/tests/cache_test.cc b/src/v/cloud_storage/tests/cache_test.cc index 4acd78f3554c8..ca74475898713 100644 --- a/src/v/cloud_storage/tests/cache_test.cc +++ b/src/v/cloud_storage/tests/cache_test.cc @@ -9,13 +9,17 @@ */ #include "base/units.h" +#include "bytes/bytes.h" #include "bytes/iobuf.h" #include "bytes/iostream.h" #include "cache_test_fixture.h" #include "cloud_storage/access_time_tracker.h" #include "cloud_storage/cache_service.h" +#include "random/generators.h" +#include "ssx/sformat.h" #include "test_utils/fixture.h" #include "utils/file_io.h" +#include "utils/human.h" #include #include @@ -31,6 +35,8 @@ using namespace cloud_storage; +static ss::logger test_log("cache_test_logger"); + FIXTURE_TEST(put_creates_file, cache_test_fixture) { auto data_string = create_data_string('a', 1_MiB + 1_KiB); put_into_cache(data_string, KEY); @@ -538,3 +544,80 @@ FIXTURE_TEST(test_log_segment_cleanup, cache_test_fixture) { return !std::filesystem::exists(path); })); } + +FIXTURE_TEST(test_cache_carryover_trim, cache_test_fixture) { + std::string write_buf(1_MiB, ' '); + random_generators::fill_buffer_randomchars( + write_buf.data(), write_buf.size()); + size_t bytes_used = 0; + size_t num_objects = 0; + std::vector object_keys; + for (int i = 0; i < 10; i++) { + object_keys.emplace_back(fmt::format("test_{}.log.1", i)); + std::ofstream segment{CACHE_DIR / object_keys.back()}; + segment.write( + write_buf.data(), static_cast(write_buf.size())); + segment.flush(); + bytes_used += write_buf.size(); + num_objects++; + object_keys.emplace_back(fmt::format("test_{}.log.1.index", i)); + std::ofstream index{CACHE_DIR / object_keys.back()}; + index.write( + write_buf.data(), static_cast(write_buf.size())); + index.flush(); + bytes_used += write_buf.size(); + num_objects++; + object_keys.emplace_back(fmt::format("test_{}.log.1.tx", i)); + std::ofstream tx{CACHE_DIR / object_keys.back()}; + tx.write( + write_buf.data(), static_cast(write_buf.size())); + tx.flush(); + bytes_used += write_buf.size(); + num_objects++; + } + // Account all files in the cache (30 MiB). + clean_up_at_start().get(); + for (const auto& key : object_keys) { + // Touch every object so they have access times assigned to them + auto item = sharded_cache.local().get(key).get(); + item->body.close().get(); + } + + // Force trim to create a carryover list. + // Nothing is deleted by the trim. + vlog(test_log.info, "Initial trim"); + auto cache_size_target_bytes = bytes_used; + auto cache_size_target_objects = num_objects; + trim_cache(cache_size_target_bytes, cache_size_target_objects); + + // Start trim using only carryover data + auto before_bytes = sharded_cache.local().get_usage_bytes(); + auto before_objects = sharded_cache.local().get_usage_objects(); + vlog( + test_log.info, + "Trim {}, {} bytes used, {} objects", + human::bytes(bytes_used), + human::bytes(before_bytes), + before_objects); + BOOST_REQUIRE(before_bytes > 0); + BOOST_REQUIRE(before_objects > 0); + + // Note that 'trim_carryover' accepts number of bytes/objects + // that has to be deleted. This is the opposite of how 'trim' + // method behaves which accepts target size for the cache. + // This behavior is similar to 'trim_fast'. + auto bytes_to_delete = before_bytes; + auto objects_to_delete = before_objects; + trim_carryover(bytes_to_delete, objects_to_delete); + + // At this point we should be able to delete all objects + auto after_bytes = sharded_cache.local().get_usage_bytes(); + auto after_objects = sharded_cache.local().get_usage_objects(); + vlog( + test_log.info, + "After trim {} bytes used, {} objects", + human::bytes(after_bytes), + after_objects); + BOOST_REQUIRE_EQUAL(after_bytes, 0); + BOOST_REQUIRE_EQUAL(after_objects, 0); +} diff --git a/src/v/cloud_storage/tests/cache_test_fixture.h b/src/v/cloud_storage/tests/cache_test_fixture.h index e7a235fc268ad..4cc8a1311720f 100644 --- a/src/v/cloud_storage/tests/cache_test_fixture.h +++ b/src/v/cloud_storage/tests/cache_test_fixture.h @@ -119,6 +119,10 @@ class cache_test_fixture { }) .get(); } + + void trim_carryover(uint64_t size_limit, uint64_t object_limit) { + sharded_cache.local().trim_carryover(size_limit, object_limit).get(); + } }; } // namespace cloud_storage From e1c30bc6e18a08de9fb74d4331450488642b5b53 Mon Sep 17 00:00:00 2001 From: Evgeny Lazin Date: Fri, 26 Apr 2024 15:01:58 +0000 Subject: [PATCH 06/10] cloud_storage: Express carryover limit in bytes Change the configuration parameter and treat the value as number of bytes that we can use to store carryover data. --- src/v/cloud_storage/cache_service.cc | 13 +++++++------ src/v/config/configuration.cc | 13 +++++++------ src/v/config/configuration.h | 2 +- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/src/v/cloud_storage/cache_service.cc b/src/v/cloud_storage/cache_service.cc index 2ec7b2a35fdb3..58cc2055fe1f2 100644 --- a/src/v/cloud_storage/cache_service.cc +++ b/src/v/cloud_storage/cache_service.cc @@ -673,20 +673,21 @@ ss::future cache::trim_fast( result.deleted_size += op_res.deleted_size; } - auto max_carryover_files = config::shard_local_cfg() - .cloud_storage_cache_trim_carryover.value() - .value_or(0); + ssize_t max_carryover_bytes + = config::shard_local_cfg() + .cloud_storage_cache_trim_carryover_bytes.value(); fragmented_vector tmp; auto estimated_size = std::min( - static_cast(max_carryover_files), + static_cast(max_carryover_bytes), candidates.size() - candidate_i); tmp.reserve(estimated_size); - while (max_carryover_files > 0 && candidate_i < candidates.size()) { + while (max_carryover_bytes > 0 && candidate_i < candidates.size()) { const auto& fs = candidates[candidate_i++]; if (need_to_skip(fs)) { continue; } - max_carryover_files--; + max_carryover_bytes -= static_cast( + sizeof(fs) + fs.path.size()); tmp.push_back(fs); } diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 20a78defa49f7..edf8f0f297271 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -2332,17 +2332,18 @@ configuration::configuration() // Enough for a >1TiB cache of 16MiB objects. Decrease this in case // of issues with trim performance. 100000) - , cloud_storage_cache_trim_carryover( + , cloud_storage_cache_trim_carryover_bytes( *this, - "cloud_storage_cache_trim_carryover", + "cloud_storage_cache_trim_carryover_bytes", "The cache performs a recursive directory inspection during the cache " "trim. The information obtained during the inspection can be carried " "over to the next trim operation. This parameter sets a limit on the " - "number of objects that can be carried over from one trim to next, and " - "allows cache to quickly unblock readers before starting the directory " - "inspection.", + "memory occupied by objects that can be carried over from one trim to " + "next, and allows cache to quickly unblock readers before starting the " + "directory inspection.", {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, - 512) + // This roughly translates to around 4000 carryover file names + 1_MiB) , cloud_storage_cache_check_interval_ms( *this, "cloud_storage_cache_check_interval", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 010b1c693a0eb..784d4c76ba061 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -425,7 +425,7 @@ struct configuration final : public config_store { bounded_property, numeric_bounds> cloud_storage_cache_size_percent; property cloud_storage_cache_max_objects; - property> cloud_storage_cache_trim_carryover; + property cloud_storage_cache_trim_carryover_bytes; property cloud_storage_cache_check_interval_ms; property> cloud_storage_max_segment_readers_per_shard; From 0f84fdbbbecd80221ee199de0850a00f16de0789 Mon Sep 17 00:00:00 2001 From: Evgeny Lazin Date: Fri, 26 Apr 2024 15:03:07 +0000 Subject: [PATCH 07/10] cloud_storage: Reserve memory units for carryover Reserve memory units for the carryover mechanism in materialized_resrouces. --- src/v/cloud_storage/materialized_resources.cc | 50 ++++++++++++++++++- src/v/cloud_storage/materialized_resources.h | 3 ++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/src/v/cloud_storage/materialized_resources.cc b/src/v/cloud_storage/materialized_resources.cc index 1eb804d53894d..43b4c6413a5b0 100644 --- a/src/v/cloud_storage/materialized_resources.cc +++ b/src/v/cloud_storage/materialized_resources.cc @@ -177,7 +177,9 @@ materialized_resources::materialized_resources() , _throughput_shard_limit_config( config::shard_local_cfg().cloud_storage_max_throughput_per_shard.bind()) , _relative_throughput( - config::shard_local_cfg().cloud_storage_throughput_limit_percent.bind()) { + config::shard_local_cfg().cloud_storage_throughput_limit_percent.bind()) + , _cache_carryover_bytes(config::shard_local_cfg() + .cloud_storage_cache_trim_carryover_bytes.bind()) { auto update_max_mem = [this]() { // Update memory capacity to accommodate new max number of segment // readers @@ -203,6 +205,52 @@ materialized_resources::materialized_resources() }); }); + if (ss::this_shard_id() == 0) { + // Take into account number of bytes used by cache carryover mechanism. + // The cache doesn't have access to 'materialized_resources' because + // otherwise it'd create a dependency cycle. + _carryover_units = _mem_units.try_get_units(_cache_carryover_bytes()); + vlog( + cst_log.info, + "{} units reserved for cache trim carryover mechanism", + _carryover_units.has_value() ? _carryover_units->count() : 0); + + _cache_carryover_bytes.watch([this] { + // We're using best effort approach here. Under memory pressure we + // might not be able to change reservation + auto current_units = _carryover_units.has_value() + ? _carryover_units->count() + : 0; + auto upd = _cache_carryover_bytes(); + if (upd < current_units) { + // Free units that represent memory used by carryover cache + // trim. It's guaranteed that optional is not null. + _carryover_units->return_units(current_units - upd); + } else { + // Acquire new units + auto tmp = _mem_units.try_get_units(upd - current_units); + if (tmp.has_value()) { + if (_carryover_units.has_value()) { + _carryover_units->adopt(std::move(tmp.value())); + } else { + _carryover_units = std::move(tmp.value()); + } + } else { + vlog( + cst_log.info, + "Failed to reserve {} units for the cache carryover " + "mechanism because tiered-storage is likely under memory " + "pressure", + upd - current_units); + } + } + vlog( + cst_log.info, + "{} units reserved for cache trim carryover mechanism", + _carryover_units.has_value() ? _carryover_units->count() : 0); + }); + } + auto reset_tp = [this] { ssx::spawn_with_gate(_gate, [this] { return update_throughput(); }); }; diff --git a/src/v/cloud_storage/materialized_resources.h b/src/v/cloud_storage/materialized_resources.h index cb20e5eaae35d..ebbba599b8c18 100644 --- a/src/v/cloud_storage/materialized_resources.h +++ b/src/v/cloud_storage/materialized_resources.h @@ -179,6 +179,9 @@ class materialized_resources { config::binding> _relative_throughput; bool _throttling_disabled{false}; std::optional _device_throughput; + config::binding _cache_carryover_bytes; + // Memory reserved for cache carryover mechanism + std::optional _carryover_units; }; } // namespace cloud_storage From 6b57e0c167eb4da482b5122f482f3135f070ab02 Mon Sep 17 00:00:00 2001 From: Evgeny Lazin Date: Fri, 26 Apr 2024 15:08:08 +0000 Subject: [PATCH 08/10] cloud_storage: Add carryover cache metric --- src/v/cloud_storage/cache_probe.cc | 5 +++++ src/v/cloud_storage/cache_probe.h | 2 ++ src/v/cloud_storage/cache_service.cc | 1 + 3 files changed, 8 insertions(+) diff --git a/src/v/cloud_storage/cache_probe.cc b/src/v/cloud_storage/cache_probe.cc index a1e7915ec5d80..5bd06b72fb7c2 100644 --- a/src/v/cloud_storage/cache_probe.cc +++ b/src/v/cloud_storage/cache_probe.cc @@ -102,6 +102,11 @@ cache_probe::cache_probe() { "Number of times we couldn't free enough space with a fast " "trim and had to fall back to a slower exhaustive trim.")) .aggregate(aggregate_labels), + sm::make_counter( + "carryover_trims", + [this] { return _carryover_trims; }, + sm::description("Number of times we invoked carryover trim.")) + .aggregate(aggregate_labels), sm::make_counter( "failed_trims", [this] { return _failed_trims; }, diff --git a/src/v/cloud_storage/cache_probe.h b/src/v/cloud_storage/cache_probe.h index 3cc46f602e768..791d211b5f031 100644 --- a/src/v/cloud_storage/cache_probe.h +++ b/src/v/cloud_storage/cache_probe.h @@ -39,6 +39,7 @@ class cache_probe { void fast_trim() { ++_fast_trims; } void exhaustive_trim() { ++_exhaustive_trims; } + void carryover_trim() { ++_carryover_trims; } void failed_trim() { ++_failed_trims; } private: @@ -55,6 +56,7 @@ class cache_probe { int64_t _fast_trims{0}; int64_t _exhaustive_trims{0}; + int64_t _carryover_trims{0}; int64_t _failed_trims{0}; metrics::internal_metric_groups _metrics; diff --git a/src/v/cloud_storage/cache_service.cc b/src/v/cloud_storage/cache_service.cc index 58cc2055fe1f2..c94562fcc060e 100644 --- a/src/v/cloud_storage/cache_service.cc +++ b/src/v/cloud_storage/cache_service.cc @@ -1308,6 +1308,7 @@ cache::trim_carryover(uint64_t delete_bytes, uint64_t delete_objects) { if (!_last_trim_carryover.has_value()) { co_return result; } + probe.carryover_trim(); auto it = _last_trim_carryover->begin(); for (; it < _last_trim_carryover->end(); it++) { vlog( From 61a09b47099e2422b86c401e11f5e5c6cfba9157 Mon Sep 17 00:00:00 2001 From: Evgeny Lazin Date: Fri, 26 Apr 2024 16:08:50 +0000 Subject: [PATCH 09/10] Fixup --- src/v/config/configuration.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index edf8f0f297271..3bde53cea6c78 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -2342,8 +2342,8 @@ configuration::configuration() "next, and allows cache to quickly unblock readers before starting the " "directory inspection.", {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, - // This roughly translates to around 4000 carryover file names - 1_MiB) + // This roughly translates to around 1000 carryover file names + 256_KiB) , cloud_storage_cache_check_interval_ms( *this, "cloud_storage_cache_check_interval", From 9f1b51b6a7e3455f3bc1c25965f3d19bb657dada Mon Sep 17 00:00:00 2001 From: Evgeny Lazin Date: Fri, 26 Apr 2024 16:08:54 +0000 Subject: [PATCH 10/10] cloud_storage: Run trim in the background In case if carryover trim was able to release enough space start trim in the background and return early. This unblocks the hydration that reserved space and triggered the trim. We need to run normal trim anyway to avoid corner case when the carryover list becomes empty and we have to block readers for the duration of the full trim. --- src/v/cloud_storage/cache_service.cc | 99 ++++++++++++++++++---------- 1 file changed, 64 insertions(+), 35 deletions(-) diff --git a/src/v/cloud_storage/cache_service.cc b/src/v/cloud_storage/cache_service.cc index c94562fcc060e..f55c74c15b789 100644 --- a/src/v/cloud_storage/cache_service.cc +++ b/src/v/cloud_storage/cache_service.cc @@ -1405,44 +1405,73 @@ ss::future<> cache::do_reserve_space(uint64_t bytes, size_t objects) { _reservations_pending, _reservations_pending_objects); - try { - auto units = co_await ss::get_units(_cleanup_sm, 1); - - if (_last_trim_carryover.has_value()) { - // Slow path: try to run carryover trim if we have data - // from the previous trim. - - auto short_term_hydrations_estimate - = config::shard_local_cfg().cloud_storage_max_connections() - * ss::smp::count; - - // Here we're trying to estimate how much space do we need to - // free to allow all TS resources to be used again to download - // data from S3. This is only a crude estimate. - auto trim_bytes = std::min( - config::shard_local_cfg().log_segment_size() - * short_term_hydrations_estimate / 3, - _max_bytes); - auto trim_objects = std::min( - short_term_hydrations_estimate * 3, _max_objects()); + auto units = co_await ss::get_units(_cleanup_sm, 1); - vlog( - cst_log.debug, - "Carryover trim list has {} elements, trying to remove {} bytes " - "and {} objects", - _last_trim_carryover->size(), - human::bytes(trim_bytes), - trim_objects); + // Situation may change after a scheduling point. Another fiber could + // trigger carryover trim which released some resources. Exit early in this + // case. + if (may_reserve_space(bytes, objects)) { + _reserved_cache_size += bytes; + _reserved_cache_objects += objects; + co_return; + } - co_await trim_carryover(trim_bytes, trim_objects); - } else { - vlog(cst_log.debug, "Carryover trim list is empty"); - } + // Do not increment _reservations_pending* before carryover trim is + // completed. + if (_last_trim_carryover.has_value()) { + // Slow path: try to run carryover trim if we have data + // from the previous trim. + + auto short_term_hydrations_estimate + = config::shard_local_cfg().cloud_storage_max_connections() + * ss::smp::count; + + // Here we're trying to estimate how much space do we need to + // free to allow all TS resources to be used again to download + // data from S3. This is only a crude estimate. + auto trim_bytes = std::min( + config::shard_local_cfg().log_segment_size() + * short_term_hydrations_estimate / 3, + _max_bytes); + auto trim_objects = std::min( + short_term_hydrations_estimate * 3, _max_objects()); - // Slowest path: register a pending need for bytes that will be used in - // clean_up_cache to make space available, and then proceed to - // cooperatively call clean_up_cache along with anyone else who is - // waiting. + vlog( + cst_log.debug, + "Carryover trim list has {} elements, trying to remove {} bytes " + "and {} objects", + _last_trim_carryover->size(), + human::bytes(trim_bytes), + trim_objects); + + co_await trim_carryover(trim_bytes, trim_objects); + } else { + vlog(cst_log.debug, "Carryover trim list is empty"); + } + + if (may_reserve_space(bytes, objects)) { + _reserved_cache_size += bytes; + _reserved_cache_objects += objects; + // Carryover trim released enough space for this fiber to continue. But + // we are starting the trim in the background to release more space and + // refresh the carryover list. Without this subsequent 'reserve_space' + // calls will be removing elements from the carryover list until it's + // empty. After that the blocking trim will be forced and the readers + // will be blocked for the duration of the trim. To avoid this we need + // to run trim in the background even if the fiber is unblocked. + // We want number of full trims to match number of carryover trims. + vlog(cst_log.debug, "Spawning background trim_throttled"); + ssx::spawn_with_gate(_gate, [this, u = std::move(units)]() mutable { + return trim_throttled().finally([u = std::move(u)] {}); + }); + co_return; + } + + // Slowest path: register a pending need for bytes that will be used in + // clean_up_cache to make space available, and then proceed to + // cooperatively call clean_up_cache along with anyone else who is + // waiting. + try { _reservations_pending += bytes; _reservations_pending_objects += objects;