diff --git a/src/v/cloud_storage/cache_service.cc b/src/v/cloud_storage/cache_service.cc index 34e68f5f3399f..eb04b16314dd3 100644 --- a/src/v/cloud_storage/cache_service.cc +++ b/src/v/cloud_storage/cache_service.cc @@ -75,7 +75,8 @@ cache::cache( config::binding disk_reservation, config::binding max_bytes_cfg, config::binding> max_percent, - config::binding max_objects) noexcept + config::binding max_objects, + config::binding walk_concurrency) noexcept : _cache_dir(std::move(cache_dir)) , _disk_size(disk_size) , _disk_reservation(std::move(disk_reservation)) @@ -83,6 +84,7 @@ cache::cache( , _max_percent(std::move(max_percent)) , _max_bytes(_max_bytes_cfg()) , _max_objects(std::move(max_objects)) + , _walk_concurrency(std::move(walk_concurrency)) , _cnt(0) , _total_cleaned(0) { if (ss::this_shard_id() == ss::shard_id{0}) { @@ -191,7 +193,8 @@ uint64_t cache::get_total_cleaned() { return _total_cleaned; } ss::future<> cache::clean_up_at_start() { auto guard = _gate.hold(); auto [walked_size, filtered_out_files, candidates_for_deletion, empty_dirs] - = co_await _walker.walk(_cache_dir.native(), _access_time_tracker); + = co_await _walker.walk( + _cache_dir.native(), _access_time_tracker, _walk_concurrency()); vassert( filtered_out_files == 0, @@ -306,7 +309,10 @@ ss::future<> cache::trim( auto guard = _gate.hold(); auto [walked_cache_size, filtered_out_files, candidates_for_deletion, _] = co_await _walker.walk( - _cache_dir.native(), _access_time_tracker, [](std::string_view path) { + _cache_dir.native(), + _access_time_tracker, + _walk_concurrency(), + [](std::string_view path) { return !( std::string_view(path).ends_with(".tx") || std::string_view(path).ends_with(".index")); @@ -724,7 +730,8 @@ cache::trim_exhaustive(uint64_t size_to_delete, size_t objects_to_delete) { // Enumerate ALL files in the cache (as opposed to trim_fast that strips out // indices/tx/tmp files) auto [walked_cache_size, _filtered_out, candidates, _] - = co_await _walker.walk(_cache_dir.native(), _access_time_tracker); + = co_await _walker.walk( + _cache_dir.native(), _access_time_tracker, _walk_concurrency()); vlog( cst_log.debug, diff --git a/src/v/cloud_storage/cache_service.h b/src/v/cloud_storage/cache_service.h index b0f2f45f3d97a..330ace4a73959 100644 --- a/src/v/cloud_storage/cache_service.h +++ b/src/v/cloud_storage/cache_service.h @@ -108,7 +108,8 @@ class cache : public ss::peering_sharded_service { config::binding, config::binding, config::binding>, - config::binding) noexcept; + config::binding, + config::binding) noexcept; cache(const cache&) = delete; cache(cache&& rhs) = delete; @@ -301,6 +302,7 @@ class cache : public ss::peering_sharded_service { config::binding> _max_percent; uint64_t _max_bytes; config::binding _max_objects; + config::binding _walk_concurrency; void update_max_bytes(); ss::abort_source _as; diff --git a/src/v/cloud_storage/recursive_directory_walker.cc b/src/v/cloud_storage/recursive_directory_walker.cc index 9931498838125..0035fe265efaa 100644 --- a/src/v/cloud_storage/recursive_directory_walker.cc +++ b/src/v/cloud_storage/recursive_directory_walker.cc @@ -124,7 +124,10 @@ namespace cloud_storage { ss::future recursive_directory_walker::walk( ss::sstring start_dir, const access_time_tracker& tracker, + uint16_t max_concurrency, std::optional collect_filter) { + vassert(max_concurrency > 0, "Max concurrency must be greater than 0"); + auto guard = _gate.hold(); watchdog wd1m(std::chrono::seconds(60), [] { @@ -157,8 +160,6 @@ ss::future recursive_directory_walker::walk( // // Empirical testing shows that this value is a good balance between // performance and resource usage. - const size_t max_concurrency = 1000; - std::vector targets; targets.reserve(max_concurrency); diff --git a/src/v/cloud_storage/recursive_directory_walker.h b/src/v/cloud_storage/recursive_directory_walker.h index 13892550a676a..080660cfd502a 100644 --- a/src/v/cloud_storage/recursive_directory_walker.h +++ b/src/v/cloud_storage/recursive_directory_walker.h @@ -46,6 +46,7 @@ class recursive_directory_walker { ss::future walk( ss::sstring start_dir, const access_time_tracker& tracker, + uint16_t max_concurrency, std::optional collect_filter = std::nullopt); private: diff --git a/src/v/cloud_storage/tests/cache_test_fixture.h b/src/v/cloud_storage/tests/cache_test_fixture.h index 4cc8a1311720f..7d923875428ec 100644 --- a/src/v/cloud_storage/tests/cache_test_fixture.h +++ b/src/v/cloud_storage/tests/cache_test_fixture.h @@ -59,7 +59,8 @@ class cache_test_fixture { config::mock_binding(0.0), config::mock_binding(1_MiB + 500_KiB), config::mock_binding>(std::nullopt), - config::mock_binding(100000)) + config::mock_binding(100000), + config::mock_binding(3)) .get(); sharded_cache .invoke_on_all([](cloud_storage::cache& c) { return c.start(); }) diff --git a/src/v/cloud_storage/tests/cloud_storage_fixture.h b/src/v/cloud_storage/tests/cloud_storage_fixture.h index 89cd5e685d600..cb0b5225cb25f 100644 --- a/src/v/cloud_storage/tests/cloud_storage_fixture.h +++ b/src/v/cloud_storage/tests/cloud_storage_fixture.h @@ -49,7 +49,8 @@ struct cloud_storage_fixture : s3_imposter_fixture { config::mock_binding(0.0), config::mock_binding(1024 * 1024 * 1024), config::mock_binding>(std::nullopt), - config::mock_binding(100000)) + config::mock_binding(100000), + config::mock_binding(3)) .get(); cache.invoke_on_all([](cloud_storage::cache& c) { return c.start(); }) diff --git a/src/v/cloud_storage/tests/directory_walker_test.cc b/src/v/cloud_storage/tests/directory_walker_test.cc index 3d8f713458098..4c6601ef0545b 100644 --- a/src/v/cloud_storage/tests/directory_walker_test.cc +++ b/src/v/cloud_storage/tests/directory_walker_test.cc @@ -62,7 +62,7 @@ SEASTAR_THREAD_TEST_CASE(one_level) { file2.close().get(); access_time_tracker tracker; - auto result = _walker.walk(target_dir.native(), tracker).get(); + auto result = _walker.walk(target_dir.native(), tracker, 3).get(); auto expect = std::set{ file_path1.native(), file_path2.native()}; @@ -96,7 +96,7 @@ SEASTAR_THREAD_TEST_CASE(three_levels) { file3.close().get(); access_time_tracker tracker; - auto result = _walker.walk(target_dir.native(), tracker).get(); + auto result = _walker.walk(target_dir.native(), tracker, 3).get(); BOOST_REQUIRE_EQUAL(result.cache_size, 0); BOOST_REQUIRE_EQUAL(result.regular_files.size(), 3); @@ -118,7 +118,7 @@ SEASTAR_THREAD_TEST_CASE(no_files) { ss::recursive_touch_directory(dir2.native()).get(); access_time_tracker tracker; - auto result = _walker.walk(target_dir.native(), tracker).get(); + auto result = _walker.walk(target_dir.native(), tracker, 3).get(); BOOST_REQUIRE_EQUAL(result.cache_size, 0); BOOST_REQUIRE_EQUAL(result.regular_files.size(), 0); @@ -130,7 +130,7 @@ SEASTAR_THREAD_TEST_CASE(empty_dir) { const std::filesystem::path target_dir = tmpdir.get_path(); access_time_tracker tracker; - auto result = _walker.walk(target_dir.native(), tracker).get(); + auto result = _walker.walk(target_dir.native(), tracker, 3).get(); BOOST_REQUIRE_EQUAL(result.cache_size, 0); BOOST_REQUIRE_EQUAL(result.regular_files.size(), 0); @@ -184,6 +184,7 @@ SEASTAR_THREAD_TEST_CASE(total_size_correct) { .walk( target_dir.native(), tracker, + 3, [](std::string_view path) { return !( std::string_view(path).ends_with(".tx") diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index df21a2b34834c..5b338dedc2f3c 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -2392,6 +2392,16 @@ configuration::configuration() "elapsed", {.visibility = visibility::tunable}, 5s) + , cloud_storage_cache_trim_walk_concurrency( + *this, + "cloud_storage_cache_trim_walk_concurrency", + "The maximum number of concurrent tasks launched for directory walk " + "during cache trimming. A higher number allows cache trimming to run " + "faster but can cause latency spikes due to increased pressure on I/O " + "subsystem and syscall threads.", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + 1, + {.min = 1, .max = 1000}) , cloud_storage_max_segment_readers_per_shard( *this, "cloud_storage_max_segment_readers_per_shard", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index a7a4ce71035c6..2d17eb533f9b7 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -431,6 +431,7 @@ struct configuration final : public config_store { property cloud_storage_cache_max_objects; property cloud_storage_cache_trim_carryover_bytes; property cloud_storage_cache_check_interval_ms; + bounded_property cloud_storage_cache_trim_walk_concurrency; property> cloud_storage_max_segment_readers_per_shard; property> diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index acf7d1b732f43..8ab7b447c2d9d 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -1688,6 +1688,10 @@ void application::wire_up_redpanda_services( ss::sharded_parameter([] { return config::shard_local_cfg() .cloud_storage_cache_max_objects.bind(); + }), + ss::sharded_parameter([] { + return config::shard_local_cfg() + .cloud_storage_cache_trim_walk_concurrency.bind(); })) .get();