From 9d83003b5a4618dfb61e87f9a970e398072cca10 Mon Sep 17 00:00:00 2001 From: Andrea Barbadoro Date: Tue, 27 Jun 2023 18:13:32 +0200 Subject: [PATCH] topic_recovery_service: coro -> continuation for collect_manifest_paths arm64 + clang-14 causes a miscompilation of this coroutine (evidence: prefix should be in the form "[0-f]0000000" but it can appear in tests as 0000000. this commit translate the coroutine code into a chain of futures. return type is also simplified, since the error result was never set partial backport of e37eabd255021d2259b5fccc25432607f7fe63fd and backport of 3adac73a5cab2386f21442ff575f2726b23246c4 --- src/v/cloud_storage/topic_recovery_service.cc | 110 +++++++----------- 1 file changed, 42 insertions(+), 68 deletions(-) diff --git a/src/v/cloud_storage/topic_recovery_service.cc b/src/v/cloud_storage/topic_recovery_service.cc index 9a501cd75ed78..e1c90e47cb25b 100644 --- a/src/v/cloud_storage/topic_recovery_service.cc +++ b/src/v/cloud_storage/topic_recovery_service.cc @@ -221,62 +221,48 @@ topic_recovery_service::recovery_status_log() const { return {_status_log.begin(), _status_log.end()}; } -static ss::future, recovery_error_ctx>> -collect_manifest_paths( +// NOTE rewritten as continuations to address arm64 miscompilation of coroutines +// under clang-14 +static ss::future> collect_manifest_paths( remote& remote, ss::abort_source& as, const recovery_task_config& cfg) { - const auto& bucket = cfg.bucket; - auto rtc = make_rtc(as, cfg); - - // List only the items at the top of the bucket hierarchy. The delimiter - // ensures that any "directories" will be collected into the common_prefixes - // field of the result. - auto top_level = co_await remote.list_objects( - bucket, rtc, std::nullopt, '/'); - if (top_level.has_error()) { - vlog( - cst_log.error, - "Failed to list top level items: {}", - top_level.error()); - co_return recovery_error_ctx::make("failed to list top level items"); - } - - auto prefixes = top_level.value().common_prefixes; - for (const auto& prefix : prefixes) { - vlog(cst_log.trace, "found top level prefix: {}", prefix); - } - - // Filter out prefixes which do not match the prefix expression that topic - // manifests use - auto it = std::remove_if( - prefixes.begin(), prefixes.end(), [](const auto& prefix) { - return !std::regex_match(prefix.cbegin(), prefix.cend(), prefix_expr); - }); - prefixes.erase(it, prefixes.end()); - - for (auto& prefix : prefixes) { - vlog(cst_log.trace, "found possible topic meta prefix: {}", prefix); - } - - std::vector paths; - for (const auto& prefix : prefixes) { - auto rtc = make_rtc(as, cfg); - - // This request is restricted to prefix, it should only return the - // metadata files for a topic. - auto meta = co_await remote.list_objects( - bucket, rtc, cloud_storage_clients::object_key{prefix}); - if (meta.has_error()) { - vlog(cst_log.error, "Failed to list meta items: {}", meta.error()); - continue; - } - - for (auto&& item : meta.value().contents) { - vlog(cst_log.trace, "adding path {} for {}", item.key, prefix); - paths.emplace_back(item.key); - } - } - - co_return paths; + // Look under each manifest prefix for topic manifests. + constexpr static auto hex_chars = std::string_view{"0123456789abcdef"}; + return ss::do_with(std::vector{}, [&](auto& paths) { + return ss::do_for_each( + hex_chars, + [&](char hex_ch) { + return ss::do_with( + std::make_unique( + as, cfg.operation_timeout_ms, cfg.backoff_ms), + fmt::format("{}0000000/", hex_ch), + [&](auto& rtc, auto& prefix) { + return remote + .list_objects( + cfg.bucket, + *rtc, + cloud_storage_clients::object_key{prefix}) + .then([&](auto meta) { + if (meta.has_error()) { + vlog( + cst_log.error, + "Failed to list meta items: {}", + meta.error()); + return; + } + + for (auto&& item : meta.value().contents) { + vlog( + cst_log.trace, + "adding path {} for {}", + item.key, + prefix); + paths.emplace_back(item.key); + } + }); + }); + }) + .then([&] { return std::move(paths); }); + }); } ss::future> @@ -320,21 +306,9 @@ topic_recovery_service::start_bg_recovery_task(recovery_request request) { set_state(state::scanning_bucket); vlog(cst_log.debug, "scanning bucket {}", _config.bucket); - auto bucket_contents_result = co_await collect_manifest_paths( + auto bucket_contents = co_await collect_manifest_paths( _remote.local(), _as, _config); - if (bucket_contents_result.has_error()) { - auto error = recovery_error_ctx::make( - fmt::format("error while listing items"), - recovery_error_code::error_listing_items); - vlog(cst_log.error, "{}", error.context); - _recovery_request = std::nullopt; - set_state(state::inactive); - co_return error; - } - - auto bucket_contents = bucket_contents_result.value(); - auto manifests = co_await filter_existing_topics( bucket_contents, request, model::ns{"kafka"});