Skip to content

Commit

Permalink
cloud_storage: concurrent directory walking
Browse files Browse the repository at this point in the history
This speeds up cache trimming by 2 orders of magnitude even on a lightly
loaded reactor. At the cost of higher number of open fds (up to 2K
increase: one for directory, one for the stat call in
`walk_accumulator::visit`). Also, higher pressure on the reactor (extra
1K futures/tasks), IO subsystem, syscall thread.

"Lightly loaded reactor" test case:

```cpp
SEASTAR_THREAD_TEST_CASE(empty_dir) {
    cloud_storage::recursive_directory_walker w;
    const std::filesystem::path target_dir = "/tmp/recdirtest";

    stress_config cfg;
    cfg.min_spins_per_scheduling_point = 1;
    cfg.max_spins_per_scheduling_point = 1;
    cfg.num_fibers = 1;
    auto mgr = stress_fiber_manager{};
    BOOST_REQUIRE(mgr.start(cfg));

    access_time_tracker tracker;
    auto result = w.walk(target_dir.native(), tracker).get();

    mgr.stop().get();
}
```

/tmp/recdirtest is filled by the following script:

```py
import os
import xxhash
from pathlib import Path

for folders in range (0, 600000):
    x = xxhash.xxh32()
    x.update(f'folder{folders}')
    d = x.hexdigest();
    Path(f"./{d}/kafka").mkdir(parents=True, exist_ok=True)
    open(f'./{d}/kafka/segment.bin', 'a').close()
```

(cherry picked from commit ee34998)
  • Loading branch information
nvartolomei authored and vbotbuildovich committed Jun 12, 2024
1 parent 348526e commit b086bb8
Showing 1 changed file with 45 additions and 10 deletions.
55 changes: 45 additions & 10 deletions src/v/cloud_storage/recursive_directory_walker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <seastar/core/coroutine.hh>
#include <seastar/core/file.hh>
#include <seastar/core/loop.hh>
#include <seastar/core/seastar.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/sstring.hh>
Expand Down Expand Up @@ -138,17 +139,51 @@ ss::future<walk_result> recursive_directory_walker::walk(

fragmented_vector<ss::sstring> empty_dirs;

// Listing directories involves blocking I/O which in Seastar is serviced by
// a dedicated thread pool and workqueue. When listing directories with
// large number of sub-directories this leads to a large queue of work items
// for each of which we have to incur the cost of task switching. Even for a
// lightly loaded reactor, it can take up to 1ms for a full round-trip. The
// round-trip time is ~100x the syscall duration. With 2 level directory
// structure, 100K files, 2 getdents64 calls per directory this adds up to
// 400K syscalls and 6m of wall time.
//
// By running the directory listing in parallel we also parallelize the
// round-trip overhead. This leads to a significant speedup in the
// directory listing phase.
//
// Limit the number of concurrent directory reads to avoid running out of
// file descriptors.
//
// Empirical testing shows that this value is a good balance between
// performance and resource usage.
const size_t max_concurrency = 1000;

std::vector<ss::sstring> targets;
targets.reserve(max_concurrency);

while (!state.empty()) {
auto target = state.pop();
vassert(
std::string_view(target).starts_with(start_dir),
"Looking at directory {}, which is outside of initial dir "
"{}.",
target,
start_dir);

co_await walker_process_directory(
start_dir, std::move(target), state, empty_dirs);
targets.clear();

auto concurrency = std::min(
state.dirlist.size(), size_t(max_concurrency));

for (size_t i = 0; i < concurrency; ++i) {
auto target = state.pop();
vassert(
std::string_view(target).starts_with(start_dir),
"Looking at directory {}, which is outside of initial dir "
"{}.",
target,
start_dir);
targets.push_back(std::move(target));
}

co_await ss::parallel_for_each(
targets, [&start_dir, &state, &empty_dirs](ss::sstring target) {
return walker_process_directory(
start_dir, std::move(target), state, empty_dirs);
});
}

co_return walk_result{
Expand Down

0 comments on commit b086bb8

Please sign in to comment.