Skip to content

Commit

Permalink
topic_recovery_service: coro -> continuation for collect_manifest_paths
Browse files Browse the repository at this point in the history
arm64 + clang-14 causes a miscompilation of this coroutine (evidence:
prefix should be in the form "[0-f]0000000" but it can appear in tests
as 0000000.

this commit translate the coroutine code into a chain of futures.

return type is also simplified, since the error result was never set
  • Loading branch information
andijcr committed May 18, 2023
1 parent 56ec997 commit 3adac73
Showing 1 changed file with 41 additions and 40 deletions.
81 changes: 41 additions & 40 deletions src/v/cluster/topic_recovery_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,35 +222,48 @@ topic_recovery_service::recovery_status_log() const {
return {_status_log.begin(), _status_log.end()};
}

static ss::future<result<std::vector<remote_segment_path>, recovery_error_ctx>>
collect_manifest_paths(
// NOTE rewritten as continuations to address arm64 miscompilation of coroutines
// under clang-14
static ss::future<std::vector<remote_segment_path>> collect_manifest_paths(
remote& remote, ss::abort_source& as, const recovery_task_config& cfg) {
const auto& bucket = cfg.bucket;
auto rtc = make_rtc(as, cfg);

// Look under each manifest prefix for topic manifests.
const char hex_chars[] = "0123456789abcdef";
std::vector<remote_segment_path> paths;
for (int i = 0; i < 16; ++i) {
const auto prefix = fmt::format("{}0000000/", hex_chars[i]);
auto rtc = make_rtc(as, cfg);

// This request is restricted to prefix, it should only return the
// metadata files for a topic.
auto meta = co_await remote.list_objects(
bucket, rtc, cloud_storage_clients::object_key{prefix});
if (meta.has_error()) {
vlog(cst_log.error, "Failed to list meta items: {}", meta.error());
continue;
}

for (auto&& item : meta.value().contents) {
vlog(cst_log.trace, "adding path {} for {}", item.key, prefix);
paths.emplace_back(item.key);
}
}

co_return paths;
constexpr static auto hex_chars = std::string_view{"0123456789abcdef"};
return ss::do_with(std::vector<remote_segment_path>{}, [&](auto& paths) {
return ss::do_for_each(
hex_chars,
[&](char hex_ch) {
return ss::do_with(
std::make_unique<retry_chain_node>(
as, cfg.operation_timeout_ms, cfg.backoff_ms),
fmt::format("{}0000000/", hex_ch),
[&](auto& rtc, auto& prefix) {
return remote
.list_objects(
cfg.bucket,
*rtc,
cloud_storage_clients::object_key{prefix})
.then([&](auto meta) {
if (meta.has_error()) {
vlog(
cst_log.error,
"Failed to list meta items: {}",
meta.error());
return;
}

for (auto&& item : meta.value().contents) {
vlog(
cst_log.trace,
"adding path {} for {}",
item.key,
prefix);
paths.emplace_back(item.key);
}
});
});
})
.then([&] { return std::move(paths); });
});
}

ss::future<result<void, recovery_error_ctx>>
Expand Down Expand Up @@ -282,21 +295,9 @@ topic_recovery_service::start_bg_recovery_task(recovery_request request) {

set_state(state::scanning_bucket);
vlog(cst_log.debug, "scanning bucket {}", _config.bucket);
auto bucket_contents_result = co_await collect_manifest_paths(
auto bucket_contents = co_await collect_manifest_paths(
_remote.local(), _as, _config);

if (bucket_contents_result.has_error()) {
auto error = recovery_error_ctx::make(
fmt::format("error while listing items"),
recovery_error_code::error_listing_items);
vlog(cst_log.error, "{}", error.context);
_recovery_request = std::nullopt;
set_state(state::inactive);
co_return error;
}

auto bucket_contents = bucket_contents_result.value();

auto manifests = co_await filter_existing_topics(
bucket_contents, request, model::ns{"kafka"});

Expand Down

0 comments on commit 3adac73

Please sign in to comment.