Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.1.x] cloud_storage: concurrent directory walking #19815

15 changes: 11 additions & 4 deletions src/v/cloud_storage/cache_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,16 @@ cache::cache(
config::binding<double> disk_reservation,
config::binding<uint64_t> max_bytes_cfg,
config::binding<std::optional<double>> max_percent,
config::binding<uint32_t> max_objects) noexcept
config::binding<uint32_t> max_objects,
config::binding<uint16_t> walk_concurrency) noexcept
: _cache_dir(std::move(cache_dir))
, _disk_size(disk_size)
, _disk_reservation(std::move(disk_reservation))
, _max_bytes_cfg(std::move(max_bytes_cfg))
, _max_percent(std::move(max_percent))
, _max_bytes(_max_bytes_cfg())
, _max_objects(std::move(max_objects))
, _walk_concurrency(std::move(walk_concurrency))
, _cnt(0)
, _total_cleaned(0) {
if (ss::this_shard_id() == ss::shard_id{0}) {
Expand Down Expand Up @@ -191,7 +193,8 @@ uint64_t cache::get_total_cleaned() { return _total_cleaned; }
ss::future<> cache::clean_up_at_start() {
auto guard = _gate.hold();
auto [walked_size, filtered_out_files, candidates_for_deletion, empty_dirs]
= co_await _walker.walk(_cache_dir.native(), _access_time_tracker);
= co_await _walker.walk(
_cache_dir.native(), _access_time_tracker, _walk_concurrency());

vassert(
filtered_out_files == 0,
Expand Down Expand Up @@ -306,7 +309,10 @@ ss::future<> cache::trim(
auto guard = _gate.hold();
auto [walked_cache_size, filtered_out_files, candidates_for_deletion, _]
= co_await _walker.walk(
_cache_dir.native(), _access_time_tracker, [](std::string_view path) {
_cache_dir.native(),
_access_time_tracker,
_walk_concurrency(),
[](std::string_view path) {
return !(
std::string_view(path).ends_with(".tx")
|| std::string_view(path).ends_with(".index"));
Expand Down Expand Up @@ -724,7 +730,8 @@ cache::trim_exhaustive(uint64_t size_to_delete, size_t objects_to_delete) {
// Enumerate ALL files in the cache (as opposed to trim_fast that strips out
// indices/tx/tmp files)
auto [walked_cache_size, _filtered_out, candidates, _]
= co_await _walker.walk(_cache_dir.native(), _access_time_tracker);
= co_await _walker.walk(
_cache_dir.native(), _access_time_tracker, _walk_concurrency());

vlog(
cst_log.debug,
Expand Down
4 changes: 3 additions & 1 deletion src/v/cloud_storage/cache_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ class cache : public ss::peering_sharded_service<cache> {
config::binding<double>,
config::binding<uint64_t>,
config::binding<std::optional<double>>,
config::binding<uint32_t>) noexcept;
config::binding<uint32_t>,
config::binding<uint16_t>) noexcept;

cache(const cache&) = delete;
cache(cache&& rhs) = delete;
Expand Down Expand Up @@ -301,6 +302,7 @@ class cache : public ss::peering_sharded_service<cache> {
config::binding<std::optional<double>> _max_percent;
uint64_t _max_bytes;
config::binding<uint32_t> _max_objects;
config::binding<uint16_t> _walk_concurrency;
void update_max_bytes();

ss::abort_source _as;
Expand Down
115 changes: 81 additions & 34 deletions src/v/cloud_storage/recursive_directory_walker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <seastar/core/coroutine.hh>
#include <seastar/core/file.hh>
#include <seastar/core/loop.hh>
#include <seastar/core/seastar.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/sstring.hh>
Expand All @@ -40,7 +41,6 @@ struct walk_accumulator {
, filter(std::move(collect_filter)) {}

ss::future<> visit(ss::sstring const& target, ss::directory_entry entry) {
seen_dentries = true;
auto entry_path = fmt::format("{}/{}", target, entry.name);
if (entry.type && entry.type == ss::directory_entry_type::regular) {
auto file_stats = co_await ss::file_stat(entry_path);
Expand All @@ -67,33 +67,67 @@ struct walk_accumulator {
} else if (
entry.type && entry.type == ss::directory_entry_type::directory) {
vlog(cst_log.debug, "Dir found {}", entry_path);
dirlist.push_front(entry_path);
dirlist.emplace_front(entry_path);
}
}

bool empty() const { return dirlist.empty(); }

ss::sstring pop() {
auto r = dirlist.back();
auto r = std::move(dirlist.back());
dirlist.pop_back();
return r;
}

void reset_seen_dentries() { seen_dentries = false; }

const access_time_tracker& tracker;
bool seen_dentries{false};
std::deque<ss::sstring> dirlist;
std::optional<recursive_directory_walker::filter_type> filter;
fragmented_vector<file_list_item> files;
uint64_t current_cache_size{0};
size_t filtered_out_files{0};
};
} // namespace cloud_storage

namespace {
ss::future<> walker_process_directory(
const ss::sstring& start_dir,
ss::sstring target,
cloud_storage::walk_accumulator& state,
fragmented_vector<ss::sstring>& empty_dirs) {
try {
ss::file target_dir = co_await open_directory(target);

bool seen_dentries = false;
co_await target_dir
.list_directory(
[&state, &target, &seen_dentries](ss::directory_entry entry) {
seen_dentries = true;
return state.visit(target, std::move(entry));
})
.done()
.finally([target_dir]() mutable { return target_dir.close(); });

if (unlikely(!seen_dentries) && target != start_dir) {
empty_dirs.push_back(target);
}
} catch (std::filesystem::filesystem_error& e) {
if (e.code() == std::errc::no_such_file_or_directory) {
// skip this directory, move to the next one
} else {
throw;
}
}
}
} // namespace
namespace cloud_storage {

ss::future<walk_result> recursive_directory_walker::walk(
ss::sstring start_dir,
const access_time_tracker& tracker,
uint16_t max_concurrency,
std::optional<filter_type> collect_filter) {
vassert(max_concurrency > 0, "Max concurrency must be greater than 0");

auto guard = _gate.hold();

watchdog wd1m(std::chrono::seconds(60), [] {
Expand All @@ -108,36 +142,49 @@ ss::future<walk_result> recursive_directory_walker::walk(

fragmented_vector<ss::sstring> empty_dirs;

// Listing directories involves blocking I/O which in Seastar is serviced by
// a dedicated thread pool and workqueue. When listing directories with
// large number of sub-directories this leads to a large queue of work items
// for each of which we have to incur the cost of task switching. Even for a
// lightly loaded reactor, it can take up to 1ms for a full round-trip. The
// round-trip time is ~100x the syscall duration. With 2 level directory
// structure, 100K files, 2 getdents64 calls per directory this adds up to
// 400K syscalls and 6m of wall time.
//
// By running the directory listing in parallel we also parallelize the
// round-trip overhead. This leads to a significant speedup in the
// directory listing phase.
//
// Limit the number of concurrent directory reads to avoid running out of
// file descriptors.
//
// Empirical testing shows that this value is a good balance between
// performance and resource usage.
std::vector<ss::sstring> targets;
targets.reserve(max_concurrency);

while (!state.empty()) {
auto target = state.pop();
vassert(
std::string_view(target).starts_with(start_dir),
"Looking at directory {}, which is outside of initial dir "
"{}.",
target,
start_dir);

try {
ss::file target_dir = co_await open_directory(target);

state.reset_seen_dentries();
co_await target_dir
.list_directory([&state, &target](ss::directory_entry entry) {
return state.visit(target, std::move(entry));
})
.done()
.finally([target_dir]() mutable { return target_dir.close(); });

if (unlikely(!state.seen_dentries) && target != start_dir) {
empty_dirs.push_back(target);
}
} catch (std::filesystem::filesystem_error& e) {
if (e.code() == std::errc::no_such_file_or_directory) {
// skip this directory, move to the ext one
} else {
throw;
}
targets.clear();

auto concurrency = std::min(
state.dirlist.size(), size_t(max_concurrency));

for (size_t i = 0; i < concurrency; ++i) {
auto target = state.pop();
vassert(
std::string_view(target).starts_with(start_dir),
"Looking at directory {}, which is outside of initial dir "
"{}.",
target,
start_dir);
targets.push_back(std::move(target));
}

co_await ss::parallel_for_each(
targets, [&start_dir, &state, &empty_dirs](ss::sstring target) {
return walker_process_directory(
start_dir, std::move(target), state, empty_dirs);
});
}

co_return walk_result{
Expand Down
1 change: 1 addition & 0 deletions src/v/cloud_storage/recursive_directory_walker.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class recursive_directory_walker {
ss::future<walk_result> walk(
ss::sstring start_dir,
const access_time_tracker& tracker,
uint16_t max_concurrency,
std::optional<filter_type> collect_filter = std::nullopt);

private:
Expand Down
3 changes: 2 additions & 1 deletion src/v/cloud_storage/tests/cache_test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class cache_test_fixture {
config::mock_binding<double>(0.0),
config::mock_binding<uint64_t>(1_MiB + 500_KiB),
config::mock_binding<std::optional<double>>(std::nullopt),
config::mock_binding<uint32_t>(100000))
config::mock_binding<uint32_t>(100000),
config::mock_binding<uint16_t>(3))
.get();
sharded_cache
.invoke_on_all([](cloud_storage::cache& c) { return c.start(); })
Expand Down
3 changes: 2 additions & 1 deletion src/v/cloud_storage/tests/cloud_storage_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ struct cloud_storage_fixture : s3_imposter_fixture {
config::mock_binding<double>(0.0),
config::mock_binding<uint64_t>(1024 * 1024 * 1024),
config::mock_binding<std::optional<double>>(std::nullopt),
config::mock_binding<uint32_t>(100000))
config::mock_binding<uint32_t>(100000),
config::mock_binding<uint16_t>(3))
.get();

cache.invoke_on_all([](cloud_storage::cache& c) { return c.start(); })
Expand Down
12 changes: 8 additions & 4 deletions src/v/cloud_storage/tests/directory_walker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ SEASTAR_THREAD_TEST_CASE(one_level) {
file2.close().get();

access_time_tracker tracker;
auto result = _walker.walk(target_dir.native(), tracker).get();
auto result = _walker.walk(target_dir.native(), tracker, 3).get();

auto expect = std::set<std::string>{
file_path1.native(), file_path2.native()};
Expand All @@ -81,6 +81,8 @@ SEASTAR_THREAD_TEST_CASE(three_levels) {

ss::recursive_touch_directory((target_dir / "a").native()).get();
ss::recursive_touch_directory((target_dir / "b" / "c").native()).get();
ss::recursive_touch_directory((target_dir / "b" / "c-empty").native())
.get();

auto flags = ss::open_flags::wo | ss::open_flags::create
| ss::open_flags::exclusive;
Expand All @@ -94,10 +96,11 @@ SEASTAR_THREAD_TEST_CASE(three_levels) {
file3.close().get();

access_time_tracker tracker;
auto result = _walker.walk(target_dir.native(), tracker).get();
auto result = _walker.walk(target_dir.native(), tracker, 3).get();

BOOST_REQUIRE_EQUAL(result.cache_size, 0);
BOOST_REQUIRE_EQUAL(result.regular_files.size(), 3);
BOOST_REQUIRE_EQUAL(result.empty_dirs.size(), 1);

auto expect = std::set<std::string>{
file_path1.native(), file_path2.native(), file_path3.native()};
Expand All @@ -115,7 +118,7 @@ SEASTAR_THREAD_TEST_CASE(no_files) {
ss::recursive_touch_directory(dir2.native()).get();

access_time_tracker tracker;
auto result = _walker.walk(target_dir.native(), tracker).get();
auto result = _walker.walk(target_dir.native(), tracker, 3).get();

BOOST_REQUIRE_EQUAL(result.cache_size, 0);
BOOST_REQUIRE_EQUAL(result.regular_files.size(), 0);
Expand All @@ -127,7 +130,7 @@ SEASTAR_THREAD_TEST_CASE(empty_dir) {
const std::filesystem::path target_dir = tmpdir.get_path();

access_time_tracker tracker;
auto result = _walker.walk(target_dir.native(), tracker).get();
auto result = _walker.walk(target_dir.native(), tracker, 3).get();

BOOST_REQUIRE_EQUAL(result.cache_size, 0);
BOOST_REQUIRE_EQUAL(result.regular_files.size(), 0);
Expand Down Expand Up @@ -181,6 +184,7 @@ SEASTAR_THREAD_TEST_CASE(total_size_correct) {
.walk(
target_dir.native(),
tracker,
3,
[](std::string_view path) {
return !(
std::string_view(path).ends_with(".tx")
Expand Down
10 changes: 10 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2392,6 +2392,16 @@ configuration::configuration()
"elapsed",
{.visibility = visibility::tunable},
5s)
, cloud_storage_cache_trim_walk_concurrency(
*this,
"cloud_storage_cache_trim_walk_concurrency",
"The maximum number of concurrent tasks launched for directory walk "
"during cache trimming. A higher number allows cache trimming to run "
"faster but can cause latency spikes due to increased pressure on I/O "
"subsystem and syscall threads.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
1,
{.min = 1, .max = 1000})
, cloud_storage_max_segment_readers_per_shard(
*this,
"cloud_storage_max_segment_readers_per_shard",
Expand Down
1 change: 1 addition & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ struct configuration final : public config_store {
property<uint32_t> cloud_storage_cache_max_objects;
property<uint32_t> cloud_storage_cache_trim_carryover_bytes;
property<std::chrono::milliseconds> cloud_storage_cache_check_interval_ms;
bounded_property<uint16_t> cloud_storage_cache_trim_walk_concurrency;
property<std::optional<uint32_t>>
cloud_storage_max_segment_readers_per_shard;
property<std::optional<uint32_t>>
Expand Down
4 changes: 4 additions & 0 deletions src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1688,6 +1688,10 @@ void application::wire_up_redpanda_services(
ss::sharded_parameter([] {
return config::shard_local_cfg()
.cloud_storage_cache_max_objects.bind();
}),
ss::sharded_parameter([] {
return config::shard_local_cfg()
.cloud_storage_cache_trim_walk_concurrency.bind();
}))
.get();

Expand Down
Loading