Skip to content

Commit

Permalink
config: cloud_storage_cache_trim_walk_concurrency
Browse files Browse the repository at this point in the history
(cherry picked from commit 5d48458)
  • Loading branch information
nvartolomei authored and vbotbuildovich committed Jun 12, 2024
1 parent b086bb8 commit 0f283a7
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 13 deletions.
15 changes: 11 additions & 4 deletions src/v/cloud_storage/cache_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,16 @@ cache::cache(
config::binding<double> disk_reservation,
config::binding<uint64_t> max_bytes_cfg,
config::binding<std::optional<double>> max_percent,
config::binding<uint32_t> max_objects) noexcept
config::binding<uint32_t> max_objects,
config::binding<uint16_t> walk_concurrency) noexcept
: _cache_dir(std::move(cache_dir))
, _disk_size(disk_size)
, _disk_reservation(std::move(disk_reservation))
, _max_bytes_cfg(std::move(max_bytes_cfg))
, _max_percent(std::move(max_percent))
, _max_bytes(_max_bytes_cfg())
, _max_objects(std::move(max_objects))
, _walk_concurrency(std::move(walk_concurrency))
, _cnt(0)
, _total_cleaned(0) {
if (ss::this_shard_id() == ss::shard_id{0}) {
Expand Down Expand Up @@ -191,7 +193,8 @@ uint64_t cache::get_total_cleaned() { return _total_cleaned; }
ss::future<> cache::clean_up_at_start() {
auto guard = _gate.hold();
auto [walked_size, filtered_out_files, candidates_for_deletion, empty_dirs]
= co_await _walker.walk(_cache_dir.native(), _access_time_tracker);
= co_await _walker.walk(
_cache_dir.native(), _access_time_tracker, _walk_concurrency());

vassert(
filtered_out_files == 0,
Expand Down Expand Up @@ -306,7 +309,10 @@ ss::future<> cache::trim(
auto guard = _gate.hold();
auto [walked_cache_size, filtered_out_files, candidates_for_deletion, _]
= co_await _walker.walk(
_cache_dir.native(), _access_time_tracker, [](std::string_view path) {
_cache_dir.native(),
_access_time_tracker,
_walk_concurrency(),
[](std::string_view path) {
return !(
std::string_view(path).ends_with(".tx")
|| std::string_view(path).ends_with(".index"));
Expand Down Expand Up @@ -724,7 +730,8 @@ cache::trim_exhaustive(uint64_t size_to_delete, size_t objects_to_delete) {
// Enumerate ALL files in the cache (as opposed to trim_fast that strips out
// indices/tx/tmp files)
auto [walked_cache_size, _filtered_out, candidates, _]
= co_await _walker.walk(_cache_dir.native(), _access_time_tracker);
= co_await _walker.walk(
_cache_dir.native(), _access_time_tracker, _walk_concurrency());

vlog(
cst_log.debug,
Expand Down
4 changes: 3 additions & 1 deletion src/v/cloud_storage/cache_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ class cache : public ss::peering_sharded_service<cache> {
config::binding<double>,
config::binding<uint64_t>,
config::binding<std::optional<double>>,
config::binding<uint32_t>) noexcept;
config::binding<uint32_t>,
config::binding<uint16_t>) noexcept;

cache(const cache&) = delete;
cache(cache&& rhs) = delete;
Expand Down Expand Up @@ -301,6 +302,7 @@ class cache : public ss::peering_sharded_service<cache> {
config::binding<std::optional<double>> _max_percent;
uint64_t _max_bytes;
config::binding<uint32_t> _max_objects;
config::binding<uint16_t> _walk_concurrency;
void update_max_bytes();

ss::abort_source _as;
Expand Down
5 changes: 3 additions & 2 deletions src/v/cloud_storage/recursive_directory_walker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ namespace cloud_storage {
ss::future<walk_result> recursive_directory_walker::walk(
ss::sstring start_dir,
const access_time_tracker& tracker,
uint16_t max_concurrency,
std::optional<filter_type> collect_filter) {
vassert(max_concurrency > 0, "Max concurrency must be greater than 0");

auto guard = _gate.hold();

watchdog wd1m(std::chrono::seconds(60), [] {
Expand Down Expand Up @@ -157,8 +160,6 @@ ss::future<walk_result> recursive_directory_walker::walk(
//
// Empirical testing shows that this value is a good balance between
// performance and resource usage.
const size_t max_concurrency = 1000;

std::vector<ss::sstring> targets;
targets.reserve(max_concurrency);

Expand Down
1 change: 1 addition & 0 deletions src/v/cloud_storage/recursive_directory_walker.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class recursive_directory_walker {
ss::future<walk_result> walk(
ss::sstring start_dir,
const access_time_tracker& tracker,
uint16_t max_concurrency,
std::optional<filter_type> collect_filter = std::nullopt);

private:
Expand Down
3 changes: 2 additions & 1 deletion src/v/cloud_storage/tests/cache_test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class cache_test_fixture {
config::mock_binding<double>(0.0),
config::mock_binding<uint64_t>(1_MiB + 500_KiB),
config::mock_binding<std::optional<double>>(std::nullopt),
config::mock_binding<uint32_t>(100000))
config::mock_binding<uint32_t>(100000),
config::mock_binding<uint16_t>(3))
.get();
sharded_cache
.invoke_on_all([](cloud_storage::cache& c) { return c.start(); })
Expand Down
3 changes: 2 additions & 1 deletion src/v/cloud_storage/tests/cloud_storage_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ struct cloud_storage_fixture : s3_imposter_fixture {
config::mock_binding<double>(0.0),
config::mock_binding<uint64_t>(1024 * 1024 * 1024),
config::mock_binding<std::optional<double>>(std::nullopt),
config::mock_binding<uint32_t>(100000))
config::mock_binding<uint32_t>(100000),
config::mock_binding<uint16_t>(3))
.get();

cache.invoke_on_all([](cloud_storage::cache& c) { return c.start(); })
Expand Down
9 changes: 5 additions & 4 deletions src/v/cloud_storage/tests/directory_walker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ SEASTAR_THREAD_TEST_CASE(one_level) {
file2.close().get();

access_time_tracker tracker;
auto result = _walker.walk(target_dir.native(), tracker).get();
auto result = _walker.walk(target_dir.native(), tracker, 3).get();

auto expect = std::set<std::string>{
file_path1.native(), file_path2.native()};
Expand Down Expand Up @@ -96,7 +96,7 @@ SEASTAR_THREAD_TEST_CASE(three_levels) {
file3.close().get();

access_time_tracker tracker;
auto result = _walker.walk(target_dir.native(), tracker).get();
auto result = _walker.walk(target_dir.native(), tracker, 3).get();

BOOST_REQUIRE_EQUAL(result.cache_size, 0);
BOOST_REQUIRE_EQUAL(result.regular_files.size(), 3);
Expand All @@ -118,7 +118,7 @@ SEASTAR_THREAD_TEST_CASE(no_files) {
ss::recursive_touch_directory(dir2.native()).get();

access_time_tracker tracker;
auto result = _walker.walk(target_dir.native(), tracker).get();
auto result = _walker.walk(target_dir.native(), tracker, 3).get();

BOOST_REQUIRE_EQUAL(result.cache_size, 0);
BOOST_REQUIRE_EQUAL(result.regular_files.size(), 0);
Expand All @@ -130,7 +130,7 @@ SEASTAR_THREAD_TEST_CASE(empty_dir) {
const std::filesystem::path target_dir = tmpdir.get_path();

access_time_tracker tracker;
auto result = _walker.walk(target_dir.native(), tracker).get();
auto result = _walker.walk(target_dir.native(), tracker, 3).get();

BOOST_REQUIRE_EQUAL(result.cache_size, 0);
BOOST_REQUIRE_EQUAL(result.regular_files.size(), 0);
Expand Down Expand Up @@ -184,6 +184,7 @@ SEASTAR_THREAD_TEST_CASE(total_size_correct) {
.walk(
target_dir.native(),
tracker,
3,
[](std::string_view path) {
return !(
std::string_view(path).ends_with(".tx")
Expand Down
10 changes: 10 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2392,6 +2392,16 @@ configuration::configuration()
"elapsed",
{.visibility = visibility::tunable},
5s)
, cloud_storage_cache_trim_walk_concurrency(
*this,
"cloud_storage_cache_trim_walk_concurrency",
"The maximum number of concurrent tasks launched for directory walk "
"during cache trimming. A higher number allows cache trimming to run "
"faster but can cause latency spikes due to increased pressure on I/O "
"subsystem and syscall threads.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
1,
{.min = 1, .max = 1000})
, cloud_storage_max_segment_readers_per_shard(
*this,
"cloud_storage_max_segment_readers_per_shard",
Expand Down
1 change: 1 addition & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ struct configuration final : public config_store {
property<uint32_t> cloud_storage_cache_max_objects;
property<uint32_t> cloud_storage_cache_trim_carryover_bytes;
property<std::chrono::milliseconds> cloud_storage_cache_check_interval_ms;
bounded_property<uint16_t> cloud_storage_cache_trim_walk_concurrency;
property<std::optional<uint32_t>>
cloud_storage_max_segment_readers_per_shard;
property<std::optional<uint32_t>>
Expand Down
4 changes: 4 additions & 0 deletions src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1688,6 +1688,10 @@ void application::wire_up_redpanda_services(
ss::sharded_parameter([] {
return config::shard_local_cfg()
.cloud_storage_cache_max_objects.bind();
}),
ss::sharded_parameter([] {
return config::shard_local_cfg()
.cloud_storage_cache_trim_walk_concurrency.bind();
}))
.get();

Expand Down

0 comments on commit 0f283a7

Please sign in to comment.