Skip to content

Commit

Permalink
Merge pull request #18758 from nvartolomei/nv/walk-time
Browse files Browse the repository at this point in the history
cloud_storage: concurrent directory walking
  • Loading branch information
nvartolomei authored Jun 11, 2024
2 parents 7dfef88 + 3fedeb3 commit 4458d3b
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 45 deletions.
15 changes: 11 additions & 4 deletions src/v/cloud_storage/cache_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,16 @@ cache::cache(
config::binding<double> disk_reservation,
config::binding<uint64_t> max_bytes_cfg,
config::binding<std::optional<double>> max_percent,
config::binding<uint32_t> max_objects) noexcept
config::binding<uint32_t> max_objects,
config::binding<uint16_t> walk_concurrency) noexcept
: _cache_dir(std::move(cache_dir))
, _disk_size(disk_size)
, _disk_reservation(std::move(disk_reservation))
, _max_bytes_cfg(std::move(max_bytes_cfg))
, _max_percent(std::move(max_percent))
, _max_bytes(_max_bytes_cfg())
, _max_objects(std::move(max_objects))
, _walk_concurrency(std::move(walk_concurrency))
, _cnt(0)
, _total_cleaned(0) {
if (ss::this_shard_id() == ss::shard_id{0}) {
Expand Down Expand Up @@ -190,7 +192,8 @@ uint64_t cache::get_total_cleaned() { return _total_cleaned; }
ss::future<> cache::clean_up_at_start() {
auto guard = _gate.hold();
auto [walked_size, filtered_out_files, candidates_for_deletion, empty_dirs]
= co_await _walker.walk(_cache_dir.native(), _access_time_tracker);
= co_await _walker.walk(
_cache_dir.native(), _access_time_tracker, _walk_concurrency());

vassert(
filtered_out_files == 0,
Expand Down Expand Up @@ -315,7 +318,10 @@ ss::future<> cache::trim(
auto guard = _gate.hold();
auto [walked_cache_size, filtered_out_files, candidates_for_deletion, _]
= co_await _walker.walk(
_cache_dir.native(), _access_time_tracker, [](std::string_view path) {
_cache_dir.native(),
_access_time_tracker,
_walk_concurrency(),
[](std::string_view path) {
return !(
std::string_view(path).ends_with(".tx")
|| std::string_view(path).ends_with(".index"));
Expand Down Expand Up @@ -733,7 +739,8 @@ cache::trim_exhaustive(uint64_t size_to_delete, size_t objects_to_delete) {
// Enumerate ALL files in the cache (as opposed to trim_fast that strips out
// indices/tx/tmp files)
auto [walked_cache_size, _filtered_out, candidates, _]
= co_await _walker.walk(_cache_dir.native(), _access_time_tracker);
= co_await _walker.walk(
_cache_dir.native(), _access_time_tracker, _walk_concurrency());

vlog(
cst_log.debug,
Expand Down
4 changes: 3 additions & 1 deletion src/v/cloud_storage/cache_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ class cache : public ss::peering_sharded_service<cache> {
config::binding<double>,
config::binding<uint64_t>,
config::binding<std::optional<double>>,
config::binding<uint32_t>) noexcept;
config::binding<uint32_t>,
config::binding<uint16_t>) noexcept;

cache(const cache&) = delete;
cache(cache&& rhs) = delete;
Expand Down Expand Up @@ -313,6 +314,7 @@ class cache : public ss::peering_sharded_service<cache> {
config::binding<std::optional<double>> _max_percent;
uint64_t _max_bytes;
config::binding<uint32_t> _max_objects;
config::binding<uint16_t> _walk_concurrency;
void update_max_bytes();

ss::abort_source _as;
Expand Down
115 changes: 81 additions & 34 deletions src/v/cloud_storage/recursive_directory_walker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <seastar/core/coroutine.hh>
#include <seastar/core/file.hh>
#include <seastar/core/loop.hh>
#include <seastar/core/seastar.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/sstring.hh>
Expand All @@ -40,7 +41,6 @@ struct walk_accumulator {
, filter(std::move(collect_filter)) {}

ss::future<> visit(ss::sstring const& target, ss::directory_entry entry) {
seen_dentries = true;
auto entry_path = fmt::format("{}/{}", target, entry.name);
if (entry.type && entry.type == ss::directory_entry_type::regular) {
auto file_stats = co_await ss::file_stat(entry_path);
Expand All @@ -67,33 +67,67 @@ struct walk_accumulator {
} else if (
entry.type && entry.type == ss::directory_entry_type::directory) {
vlog(cst_log.debug, "Dir found {}", entry_path);
dirlist.push_front(entry_path);
dirlist.emplace_front(entry_path);
}
}

bool empty() const { return dirlist.empty(); }

ss::sstring pop() {
auto r = dirlist.back();
auto r = std::move(dirlist.back());
dirlist.pop_back();
return r;
}

void reset_seen_dentries() { seen_dentries = false; }

const access_time_tracker& tracker;
bool seen_dentries{false};
std::deque<ss::sstring> dirlist;
std::optional<recursive_directory_walker::filter_type> filter;
fragmented_vector<file_list_item> files;
uint64_t current_cache_size{0};
size_t filtered_out_files{0};
};
} // namespace cloud_storage

namespace {
ss::future<> walker_process_directory(
const ss::sstring& start_dir,
ss::sstring target,
cloud_storage::walk_accumulator& state,
fragmented_vector<ss::sstring>& empty_dirs) {
try {
ss::file target_dir = co_await open_directory(target);

bool seen_dentries = false;
co_await target_dir
.list_directory(
[&state, &target, &seen_dentries](ss::directory_entry entry) {
seen_dentries = true;
return state.visit(target, std::move(entry));
})
.done()
.finally([target_dir]() mutable { return target_dir.close(); });

if (unlikely(!seen_dentries) && target != start_dir) {
empty_dirs.push_back(target);
}
} catch (std::filesystem::filesystem_error& e) {
if (e.code() == std::errc::no_such_file_or_directory) {
// skip this directory, move to the next one
} else {
throw;
}
}
}
} // namespace
namespace cloud_storage {

ss::future<walk_result> recursive_directory_walker::walk(
ss::sstring start_dir,
const access_time_tracker& tracker,
uint16_t max_concurrency,
std::optional<filter_type> collect_filter) {
vassert(max_concurrency > 0, "Max concurrency must be greater than 0");

auto guard = _gate.hold();

watchdog wd1m(std::chrono::seconds(60), [] {
Expand All @@ -108,36 +142,49 @@ ss::future<walk_result> recursive_directory_walker::walk(

fragmented_vector<ss::sstring> empty_dirs;

// Listing directories involves blocking I/O which in Seastar is serviced by
// a dedicated thread pool and workqueue. When listing directories with
// large number of sub-directories this leads to a large queue of work items
// for each of which we have to incur the cost of task switching. Even for a
// lightly loaded reactor, it can take up to 1ms for a full round-trip. The
// round-trip time is ~100x the syscall duration. With 2 level directory
// structure, 100K files, 2 getdents64 calls per directory this adds up to
// 400K syscalls and 6m of wall time.
//
// By running the directory listing in parallel we also parallelize the
// round-trip overhead. This leads to a significant speedup in the
// directory listing phase.
//
// Limit the number of concurrent directory reads to avoid running out of
// file descriptors.
//
// Empirical testing shows that this value is a good balance between
// performance and resource usage.
std::vector<ss::sstring> targets;
targets.reserve(max_concurrency);

while (!state.empty()) {
auto target = state.pop();
vassert(
std::string_view(target).starts_with(start_dir),
"Looking at directory {}, which is outside of initial dir "
"{}.",
target,
start_dir);

try {
ss::file target_dir = co_await open_directory(target);

state.reset_seen_dentries();
co_await target_dir
.list_directory([&state, &target](ss::directory_entry entry) {
return state.visit(target, std::move(entry));
})
.done()
.finally([target_dir]() mutable { return target_dir.close(); });

if (unlikely(!state.seen_dentries) && target != start_dir) {
empty_dirs.push_back(target);
}
} catch (std::filesystem::filesystem_error& e) {
if (e.code() == std::errc::no_such_file_or_directory) {
// skip this directory, move to the ext one
} else {
throw;
}
targets.clear();

auto concurrency = std::min(
state.dirlist.size(), size_t(max_concurrency));

for (size_t i = 0; i < concurrency; ++i) {
auto target = state.pop();
vassert(
std::string_view(target).starts_with(start_dir),
"Looking at directory {}, which is outside of initial dir "
"{}.",
target,
start_dir);
targets.push_back(std::move(target));
}

co_await ss::parallel_for_each(
targets, [&start_dir, &state, &empty_dirs](ss::sstring target) {
return walker_process_directory(
start_dir, std::move(target), state, empty_dirs);
});
}

co_return walk_result{
Expand Down
1 change: 1 addition & 0 deletions src/v/cloud_storage/recursive_directory_walker.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class recursive_directory_walker {
ss::future<walk_result> walk(
ss::sstring start_dir,
const access_time_tracker& tracker,
uint16_t max_concurrency,
std::optional<filter_type> collect_filter = std::nullopt);

private:
Expand Down
3 changes: 2 additions & 1 deletion src/v/cloud_storage/tests/cache_test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class cache_test_fixture {
config::mock_binding<double>(0.0),
config::mock_binding<uint64_t>(1_MiB + 500_KiB),
config::mock_binding<std::optional<double>>(std::nullopt),
config::mock_binding<uint32_t>(100000))
config::mock_binding<uint32_t>(100000),
config::mock_binding<uint16_t>(3))
.get();
sharded_cache
.invoke_on_all([](cloud_storage::cache& c) { return c.start(); })
Expand Down
3 changes: 2 additions & 1 deletion src/v/cloud_storage/tests/cloud_storage_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ struct cloud_storage_fixture : s3_imposter_fixture {
config::mock_binding<double>(0.0),
config::mock_binding<uint64_t>(1024 * 1024 * 1024),
config::mock_binding<std::optional<double>>(std::nullopt),
config::mock_binding<uint32_t>(100000))
config::mock_binding<uint32_t>(100000),
config::mock_binding<uint16_t>(3))
.get();

cache.invoke_on_all([](cloud_storage::cache& c) { return c.start(); })
Expand Down
12 changes: 8 additions & 4 deletions src/v/cloud_storage/tests/directory_walker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ SEASTAR_THREAD_TEST_CASE(one_level) {
file2.close().get();

access_time_tracker tracker;
auto result = _walker.walk(target_dir.native(), tracker).get();
auto result = _walker.walk(target_dir.native(), tracker, 3).get();

auto expect = std::set<std::string>{
file_path1.native(), file_path2.native()};
Expand All @@ -81,6 +81,8 @@ SEASTAR_THREAD_TEST_CASE(three_levels) {

ss::recursive_touch_directory((target_dir / "a").native()).get();
ss::recursive_touch_directory((target_dir / "b" / "c").native()).get();
ss::recursive_touch_directory((target_dir / "b" / "c-empty").native())
.get();

auto flags = ss::open_flags::wo | ss::open_flags::create
| ss::open_flags::exclusive;
Expand All @@ -94,10 +96,11 @@ SEASTAR_THREAD_TEST_CASE(three_levels) {
file3.close().get();

access_time_tracker tracker;
auto result = _walker.walk(target_dir.native(), tracker).get();
auto result = _walker.walk(target_dir.native(), tracker, 3).get();

BOOST_REQUIRE_EQUAL(result.cache_size, 0);
BOOST_REQUIRE_EQUAL(result.regular_files.size(), 3);
BOOST_REQUIRE_EQUAL(result.empty_dirs.size(), 1);

auto expect = std::set<std::string>{
file_path1.native(), file_path2.native(), file_path3.native()};
Expand All @@ -115,7 +118,7 @@ SEASTAR_THREAD_TEST_CASE(no_files) {
ss::recursive_touch_directory(dir2.native()).get();

access_time_tracker tracker;
auto result = _walker.walk(target_dir.native(), tracker).get();
auto result = _walker.walk(target_dir.native(), tracker, 3).get();

BOOST_REQUIRE_EQUAL(result.cache_size, 0);
BOOST_REQUIRE_EQUAL(result.regular_files.size(), 0);
Expand All @@ -127,7 +130,7 @@ SEASTAR_THREAD_TEST_CASE(empty_dir) {
const std::filesystem::path target_dir = tmpdir.get_path();

access_time_tracker tracker;
auto result = _walker.walk(target_dir.native(), tracker).get();
auto result = _walker.walk(target_dir.native(), tracker, 3).get();

BOOST_REQUIRE_EQUAL(result.cache_size, 0);
BOOST_REQUIRE_EQUAL(result.regular_files.size(), 0);
Expand Down Expand Up @@ -181,6 +184,7 @@ SEASTAR_THREAD_TEST_CASE(total_size_correct) {
.walk(
target_dir.native(),
tracker,
3,
[](std::string_view path) {
return !(
std::string_view(path).ends_with(".tx")
Expand Down
10 changes: 10 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2439,6 +2439,16 @@ configuration::configuration()
"elapsed",
{.visibility = visibility::tunable},
5s)
, cloud_storage_cache_trim_walk_concurrency(
*this,
"cloud_storage_cache_trim_walk_concurrency",
"The maximum number of concurrent tasks launched for directory walk "
"during cache trimming. A higher number allows cache trimming to run "
"faster but can cause latency spikes due to increased pressure on I/O "
"subsystem and syscall threads.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
1,
{.min = 1, .max = 1000})
, cloud_storage_max_segment_readers_per_shard(
*this,
"cloud_storage_max_segment_readers_per_shard",
Expand Down
1 change: 1 addition & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ struct configuration final : public config_store {
property<uint32_t> cloud_storage_cache_max_objects;
property<uint32_t> cloud_storage_cache_trim_carryover_bytes;
property<std::chrono::milliseconds> cloud_storage_cache_check_interval_ms;
bounded_property<uint16_t> cloud_storage_cache_trim_walk_concurrency;
property<std::optional<uint32_t>>
cloud_storage_max_segment_readers_per_shard;
property<std::optional<uint32_t>>
Expand Down
4 changes: 4 additions & 0 deletions src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1748,6 +1748,10 @@ void application::wire_up_redpanda_services(
ss::sharded_parameter([] {
return config::shard_local_cfg()
.cloud_storage_cache_max_objects.bind();
}),
ss::sharded_parameter([] {
return config::shard_local_cfg()
.cloud_storage_cache_trim_walk_concurrency.bind();
}))
.get();

Expand Down

0 comments on commit 4458d3b

Please sign in to comment.