diff --git a/src/v/cloud_storage/cache_service.cc b/src/v/cloud_storage/cache_service.cc index 34e68f5f3399f..eb04b16314dd3 100644 --- a/src/v/cloud_storage/cache_service.cc +++ b/src/v/cloud_storage/cache_service.cc @@ -75,7 +75,8 @@ cache::cache( config::binding disk_reservation, config::binding max_bytes_cfg, config::binding> max_percent, - config::binding max_objects) noexcept + config::binding max_objects, + config::binding walk_concurrency) noexcept : _cache_dir(std::move(cache_dir)) , _disk_size(disk_size) , _disk_reservation(std::move(disk_reservation)) @@ -83,6 +84,7 @@ cache::cache( , _max_percent(std::move(max_percent)) , _max_bytes(_max_bytes_cfg()) , _max_objects(std::move(max_objects)) + , _walk_concurrency(std::move(walk_concurrency)) , _cnt(0) , _total_cleaned(0) { if (ss::this_shard_id() == ss::shard_id{0}) { @@ -191,7 +193,8 @@ uint64_t cache::get_total_cleaned() { return _total_cleaned; } ss::future<> cache::clean_up_at_start() { auto guard = _gate.hold(); auto [walked_size, filtered_out_files, candidates_for_deletion, empty_dirs] - = co_await _walker.walk(_cache_dir.native(), _access_time_tracker); + = co_await _walker.walk( + _cache_dir.native(), _access_time_tracker, _walk_concurrency()); vassert( filtered_out_files == 0, @@ -306,7 +309,10 @@ ss::future<> cache::trim( auto guard = _gate.hold(); auto [walked_cache_size, filtered_out_files, candidates_for_deletion, _] = co_await _walker.walk( - _cache_dir.native(), _access_time_tracker, [](std::string_view path) { + _cache_dir.native(), + _access_time_tracker, + _walk_concurrency(), + [](std::string_view path) { return !( std::string_view(path).ends_with(".tx") || std::string_view(path).ends_with(".index")); @@ -724,7 +730,8 @@ cache::trim_exhaustive(uint64_t size_to_delete, size_t objects_to_delete) { // Enumerate ALL files in the cache (as opposed to trim_fast that strips out // indices/tx/tmp files) auto [walked_cache_size, _filtered_out, candidates, _] - = co_await _walker.walk(_cache_dir.native(), _access_time_tracker); + = co_await _walker.walk( + _cache_dir.native(), _access_time_tracker, _walk_concurrency()); vlog( cst_log.debug, diff --git a/src/v/cloud_storage/cache_service.h b/src/v/cloud_storage/cache_service.h index b0f2f45f3d97a..330ace4a73959 100644 --- a/src/v/cloud_storage/cache_service.h +++ b/src/v/cloud_storage/cache_service.h @@ -108,7 +108,8 @@ class cache : public ss::peering_sharded_service { config::binding, config::binding, config::binding>, - config::binding) noexcept; + config::binding, + config::binding) noexcept; cache(const cache&) = delete; cache(cache&& rhs) = delete; @@ -301,6 +302,7 @@ class cache : public ss::peering_sharded_service { config::binding> _max_percent; uint64_t _max_bytes; config::binding _max_objects; + config::binding _walk_concurrency; void update_max_bytes(); ss::abort_source _as; diff --git a/src/v/cloud_storage/recursive_directory_walker.cc b/src/v/cloud_storage/recursive_directory_walker.cc index 0d432b31c5aa1..e6c3cc5f57e14 100644 --- a/src/v/cloud_storage/recursive_directory_walker.cc +++ b/src/v/cloud_storage/recursive_directory_walker.cc @@ -18,6 +18,7 @@ #include #include +#include #include #include #include @@ -40,7 +41,6 @@ struct walk_accumulator { , filter(std::move(collect_filter)) {} ss::future<> visit(ss::sstring const& target, ss::directory_entry entry) { - seen_dentries = true; auto entry_path = fmt::format("{}/{}", target, entry.name); if (entry.type && entry.type == ss::directory_entry_type::regular) { auto file_stats = co_await ss::file_stat(entry_path); @@ -67,33 +67,67 @@ struct walk_accumulator { } else if ( entry.type && entry.type == ss::directory_entry_type::directory) { vlog(cst_log.debug, "Dir found {}", entry_path); - dirlist.push_front(entry_path); + dirlist.emplace_front(entry_path); } } bool empty() const { return dirlist.empty(); } ss::sstring pop() { - auto r = dirlist.back(); + auto r = std::move(dirlist.back()); dirlist.pop_back(); return r; } - void reset_seen_dentries() { seen_dentries = false; } - const access_time_tracker& tracker; - bool seen_dentries{false}; std::deque dirlist; std::optional filter; fragmented_vector files; uint64_t current_cache_size{0}; size_t filtered_out_files{0}; }; +} // namespace cloud_storage + +namespace { +ss::future<> walker_process_directory( + const ss::sstring& start_dir, + ss::sstring target, + cloud_storage::walk_accumulator& state, + fragmented_vector& empty_dirs) { + try { + ss::file target_dir = co_await open_directory(target); + + bool seen_dentries = false; + co_await target_dir + .list_directory( + [&state, &target, &seen_dentries](ss::directory_entry entry) { + seen_dentries = true; + return state.visit(target, std::move(entry)); + }) + .done() + .finally([target_dir]() mutable { return target_dir.close(); }); + + if (unlikely(!seen_dentries) && target != start_dir) { + empty_dirs.push_back(target); + } + } catch (std::filesystem::filesystem_error& e) { + if (e.code() == std::errc::no_such_file_or_directory) { + // skip this directory, move to the next one + } else { + throw; + } + } +} +} // namespace +namespace cloud_storage { ss::future recursive_directory_walker::walk( ss::sstring start_dir, const access_time_tracker& tracker, + uint16_t max_concurrency, std::optional collect_filter) { + vassert(max_concurrency > 0, "Max concurrency must be greater than 0"); + auto guard = _gate.hold(); watchdog wd1m(std::chrono::seconds(60), [] { @@ -108,36 +142,49 @@ ss::future recursive_directory_walker::walk( fragmented_vector empty_dirs; + // Listing directories involves blocking I/O which in Seastar is serviced by + // a dedicated thread pool and workqueue. When listing directories with + // large number of sub-directories this leads to a large queue of work items + // for each of which we have to incur the cost of task switching. Even for a + // lightly loaded reactor, it can take up to 1ms for a full round-trip. The + // round-trip time is ~100x the syscall duration. With 2 level directory + // structure, 100K files, 2 getdents64 calls per directory this adds up to + // 400K syscalls and 6m of wall time. + // + // By running the directory listing in parallel we also parallelize the + // round-trip overhead. This leads to a significant speedup in the + // directory listing phase. + // + // Limit the number of concurrent directory reads to avoid running out of + // file descriptors. + // + // Empirical testing shows that this value is a good balance between + // performance and resource usage. + std::vector targets; + targets.reserve(max_concurrency); + while (!state.empty()) { - auto target = state.pop(); - vassert( - std::string_view(target).starts_with(start_dir), - "Looking at directory {}, which is outside of initial dir " - "{}.", - target, - start_dir); - - try { - ss::file target_dir = co_await open_directory(target); - - state.reset_seen_dentries(); - co_await target_dir - .list_directory([&state, &target](ss::directory_entry entry) { - return state.visit(target, std::move(entry)); - }) - .done() - .finally([target_dir]() mutable { return target_dir.close(); }); - - if (unlikely(!state.seen_dentries) && target != start_dir) { - empty_dirs.push_back(target); - } - } catch (std::filesystem::filesystem_error& e) { - if (e.code() == std::errc::no_such_file_or_directory) { - // skip this directory, move to the ext one - } else { - throw; - } + targets.clear(); + + auto concurrency = std::min( + state.dirlist.size(), size_t(max_concurrency)); + + for (size_t i = 0; i < concurrency; ++i) { + auto target = state.pop(); + vassert( + std::string_view(target).starts_with(start_dir), + "Looking at directory {}, which is outside of initial dir " + "{}.", + target, + start_dir); + targets.push_back(std::move(target)); } + + co_await ss::parallel_for_each( + targets, [&start_dir, &state, &empty_dirs](ss::sstring target) { + return walker_process_directory( + start_dir, std::move(target), state, empty_dirs); + }); } co_return walk_result{ diff --git a/src/v/cloud_storage/recursive_directory_walker.h b/src/v/cloud_storage/recursive_directory_walker.h index 13892550a676a..080660cfd502a 100644 --- a/src/v/cloud_storage/recursive_directory_walker.h +++ b/src/v/cloud_storage/recursive_directory_walker.h @@ -46,6 +46,7 @@ class recursive_directory_walker { ss::future walk( ss::sstring start_dir, const access_time_tracker& tracker, + uint16_t max_concurrency, std::optional collect_filter = std::nullopt); private: diff --git a/src/v/cloud_storage/tests/cache_test_fixture.h b/src/v/cloud_storage/tests/cache_test_fixture.h index 4cc8a1311720f..7d923875428ec 100644 --- a/src/v/cloud_storage/tests/cache_test_fixture.h +++ b/src/v/cloud_storage/tests/cache_test_fixture.h @@ -59,7 +59,8 @@ class cache_test_fixture { config::mock_binding(0.0), config::mock_binding(1_MiB + 500_KiB), config::mock_binding>(std::nullopt), - config::mock_binding(100000)) + config::mock_binding(100000), + config::mock_binding(3)) .get(); sharded_cache .invoke_on_all([](cloud_storage::cache& c) { return c.start(); }) diff --git a/src/v/cloud_storage/tests/cloud_storage_fixture.h b/src/v/cloud_storage/tests/cloud_storage_fixture.h index 89cd5e685d600..cb0b5225cb25f 100644 --- a/src/v/cloud_storage/tests/cloud_storage_fixture.h +++ b/src/v/cloud_storage/tests/cloud_storage_fixture.h @@ -49,7 +49,8 @@ struct cloud_storage_fixture : s3_imposter_fixture { config::mock_binding(0.0), config::mock_binding(1024 * 1024 * 1024), config::mock_binding>(std::nullopt), - config::mock_binding(100000)) + config::mock_binding(100000), + config::mock_binding(3)) .get(); cache.invoke_on_all([](cloud_storage::cache& c) { return c.start(); }) diff --git a/src/v/cloud_storage/tests/directory_walker_test.cc b/src/v/cloud_storage/tests/directory_walker_test.cc index 3b5c1eac343c3..4c6601ef0545b 100644 --- a/src/v/cloud_storage/tests/directory_walker_test.cc +++ b/src/v/cloud_storage/tests/directory_walker_test.cc @@ -62,7 +62,7 @@ SEASTAR_THREAD_TEST_CASE(one_level) { file2.close().get(); access_time_tracker tracker; - auto result = _walker.walk(target_dir.native(), tracker).get(); + auto result = _walker.walk(target_dir.native(), tracker, 3).get(); auto expect = std::set{ file_path1.native(), file_path2.native()}; @@ -81,6 +81,8 @@ SEASTAR_THREAD_TEST_CASE(three_levels) { ss::recursive_touch_directory((target_dir / "a").native()).get(); ss::recursive_touch_directory((target_dir / "b" / "c").native()).get(); + ss::recursive_touch_directory((target_dir / "b" / "c-empty").native()) + .get(); auto flags = ss::open_flags::wo | ss::open_flags::create | ss::open_flags::exclusive; @@ -94,10 +96,11 @@ SEASTAR_THREAD_TEST_CASE(three_levels) { file3.close().get(); access_time_tracker tracker; - auto result = _walker.walk(target_dir.native(), tracker).get(); + auto result = _walker.walk(target_dir.native(), tracker, 3).get(); BOOST_REQUIRE_EQUAL(result.cache_size, 0); BOOST_REQUIRE_EQUAL(result.regular_files.size(), 3); + BOOST_REQUIRE_EQUAL(result.empty_dirs.size(), 1); auto expect = std::set{ file_path1.native(), file_path2.native(), file_path3.native()}; @@ -115,7 +118,7 @@ SEASTAR_THREAD_TEST_CASE(no_files) { ss::recursive_touch_directory(dir2.native()).get(); access_time_tracker tracker; - auto result = _walker.walk(target_dir.native(), tracker).get(); + auto result = _walker.walk(target_dir.native(), tracker, 3).get(); BOOST_REQUIRE_EQUAL(result.cache_size, 0); BOOST_REQUIRE_EQUAL(result.regular_files.size(), 0); @@ -127,7 +130,7 @@ SEASTAR_THREAD_TEST_CASE(empty_dir) { const std::filesystem::path target_dir = tmpdir.get_path(); access_time_tracker tracker; - auto result = _walker.walk(target_dir.native(), tracker).get(); + auto result = _walker.walk(target_dir.native(), tracker, 3).get(); BOOST_REQUIRE_EQUAL(result.cache_size, 0); BOOST_REQUIRE_EQUAL(result.regular_files.size(), 0); @@ -181,6 +184,7 @@ SEASTAR_THREAD_TEST_CASE(total_size_correct) { .walk( target_dir.native(), tracker, + 3, [](std::string_view path) { return !( std::string_view(path).ends_with(".tx") diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index df21a2b34834c..5b338dedc2f3c 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -2392,6 +2392,16 @@ configuration::configuration() "elapsed", {.visibility = visibility::tunable}, 5s) + , cloud_storage_cache_trim_walk_concurrency( + *this, + "cloud_storage_cache_trim_walk_concurrency", + "The maximum number of concurrent tasks launched for directory walk " + "during cache trimming. A higher number allows cache trimming to run " + "faster but can cause latency spikes due to increased pressure on I/O " + "subsystem and syscall threads.", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + 1, + {.min = 1, .max = 1000}) , cloud_storage_max_segment_readers_per_shard( *this, "cloud_storage_max_segment_readers_per_shard", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index a7a4ce71035c6..2d17eb533f9b7 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -431,6 +431,7 @@ struct configuration final : public config_store { property cloud_storage_cache_max_objects; property cloud_storage_cache_trim_carryover_bytes; property cloud_storage_cache_check_interval_ms; + bounded_property cloud_storage_cache_trim_walk_concurrency; property> cloud_storage_max_segment_readers_per_shard; property> diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index acf7d1b732f43..8ab7b447c2d9d 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -1688,6 +1688,10 @@ void application::wire_up_redpanda_services( ss::sharded_parameter([] { return config::shard_local_cfg() .cloud_storage_cache_max_objects.bind(); + }), + ss::sharded_parameter([] { + return config::shard_local_cfg() + .cloud_storage_cache_trim_walk_concurrency.bind(); })) .get();