Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cloud_storage: remote labels #20778

Merged
merged 34 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
44978ef
offline_log_viewer: support remote_labels in topic properties
andrwng Jun 20, 2024
7d38dba
cluster: add remote label as topic property
andrwng Jun 12, 2024
fab93f1
archival_stm: add remote path provider to archival stm
andrwng Jun 29, 2024
cda86a5
ntp_archiver: use path provider for naming
andrwng Jun 20, 2024
160a40e
cloud_storage: plumb path provider into async manifest view
andrwng Jun 20, 2024
291ed3f
cluster: use topic_manifest_downloader in topic recovery
andrwng Jun 17, 2024
b6d1e16
cluster: use topic_manifest_downloader in list-based recovery
andrwng Jun 25, 2024
41df563
archival_stm: use partition_manifest_downloader for snapshot recovery
andrwng Jun 18, 2024
e16c247
ntp_archiver: use partition_manifest_downlaoder for read replicas
andrwng Jun 17, 2024
13a1567
cluster: use partition_manifest_downloader in partition recovery
andrwng Jun 17, 2024
fe640c9
cluster: use partition_manifest_downloader in topic recovery validation
andrwng Jun 20, 2024
9d97ff6
cluster: use partition_manifest_downloader for unsafe reset from cloud
andrwng Jun 17, 2024
8cb5db2
cloud_storage: use partition_manifest_downloader in scrubber/anomaly …
andrwng Jun 20, 2024
ebf2fa1
remote_partition: use partition_manifest_downloader for manifest fina…
andrwng Jun 17, 2024
653c9ec
cloud_storage: remove try_download_partition_manifest
andrwng Jun 17, 2024
eaa7b95
cloud_storage: remove remote::partition_manifest_exists()
andrwng Jun 20, 2024
849aef7
cloud_storage: use path provider for spillovers in manifest view
andrwng Jun 19, 2024
5248547
remote_partition: use path provider for segment paths
andrwng Jun 18, 2024
3937454
cloud_storage: add utils for lifecycle marker paths
andrwng Jun 29, 2024
d10a60d
archival: use path provider to generate lifecycle marker paths
andrwng Jun 29, 2024
5d30320
archival: use the path provider throughout the purger
andrwng Jun 18, 2024
a0be06a
archival/segment_merger: use path provider for remote runs
andrwng Jun 18, 2024
d2681ba
admin: use path provider for anomalies report
andrwng Jun 19, 2024
1066344
features: add flag for remote labels
andrwng Jul 4, 2024
2d7e19a
config: property for disabling remote labels for tests
andrwng Jun 20, 2024
e7669a9
cloud_storage: clean up manifest path generation from manifests
andrwng Jun 20, 2024
46ed453
cloud_storage: clean up remote segment name generation
andrwng Jun 29, 2024
459106c
topic_manifest: remove feature table
andrwng Jun 22, 2024
902f522
rptest/services: make get_cluster_uuid() node optional
andrwng Jun 24, 2024
245b4f2
rptest: support remote labels in BucketView
andrwng Jun 24, 2024
df36562
rptest: option to change si_settings bucket
andrwng Jul 1, 2024
45c62a4
cluster: plug cluster uuid into new topics
andrwng Jun 29, 2024
b6ae520
config: enable remote labels by default
andrwng Jul 3, 2024
263d4b3
rptest: add remote_label_test
andrwng Jul 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 24 additions & 14 deletions src/v/cloud_storage/tests/topic_recovery_service_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@ generate_no_manifests_expectations(
.body = no_manifests,
});
}
expectations.emplace_back(s3_imposter_fixture::expectation{
.url = fmt::format(
"?list-type=2&prefix=meta/{}/{}/", tp_ns.ns(), tp_ns.tp()),
.body = no_manifests,
});
expectations.emplace_back(s3_imposter_fixture::expectation{
.url = "?list-type=2&prefix=meta/",
.body = no_manifests,
});
for (auto& e : additional_expectations) {
expectations.emplace_back(std::move(e));
}
Expand Down Expand Up @@ -228,15 +237,15 @@ FIXTURE_TEST(recovery_with_no_topics_exits_early, fixture) {

const auto& list_topics_req = get_requests()[0];
BOOST_REQUIRE_EQUAL(
list_topics_req.url, "/" + url_base() + "?list-type=2&prefix=00000000/");
list_topics_req.url, "/" + url_base() + "?list-type=2&prefix=meta/");

// Wait until recovery exits after finding no topics to create
tests::cooperative_spin_wait_with_timeout(10s, [&service] {
return service.local().is_active() == false;
}).get();

// No other calls were made
BOOST_REQUIRE_EQUAL(get_requests().size(), 16);
BOOST_REQUIRE_EQUAL(get_requests().size(), 17);
}

void do_test(fixture& f) {
Expand All @@ -250,20 +259,20 @@ void do_test(fixture& f) {
BOOST_REQUIRE_EQUAL(result, expected);

// Wait until three requests are received:
// 1..16. to list bucket for topic meta prefixes
// 17. to download manifest
f.wait_for_n_requests(17, fixture::equals::yes);
// 1. meta/kafka for labeled topic manifests
// 2..17. to list bucket for topic meta prefixes
// 18..20. to download manifest, which now takes three requests
f.wait_for_n_requests(20, fixture::equals::yes);

const auto& get_manifest_req = f.get_requests()[16];
const auto& get_manifest_req = f.get_requests()[19];
BOOST_REQUIRE_EQUAL(
get_manifest_req.url, "/" + f.url_base() + manifest.url);

// Wait until recovery exits after finding no topics to create
tests::cooperative_spin_wait_with_timeout(10s, [&service] {
return service.local().is_active() == false;
}).get();
RPTEST_REQUIRE_EVENTUALLY(
10s, [&service] { return service.local().is_active() == false; });

BOOST_REQUIRE_EQUAL(f.get_requests().size(), 17);
BOOST_REQUIRE_EQUAL(f.get_requests().size(), 20);
}

FIXTURE_TEST(recovery_with_unparseable_topic_manifest, fixture) {
Expand Down Expand Up @@ -360,9 +369,10 @@ FIXTURE_TEST(recovery_result_clear_before_start, fixture) {
start_recovery();
wait_for_n_requests(22);

// 16 to check each manifest prefix, 1 to download the topic manifest, 1 to
// check recovery results, 1 to delete.
const auto& delete_request = get_requests()[18];
// 1 to check the labeled root, 16 to check each manifest prefix, 3 to
// download the JSON topic manifest, 1 to check recovery results, 1 to
// delete.
const auto& delete_request = get_requests()[21];
BOOST_REQUIRE_EQUAL(delete_request.url, "/" + url_base() + "?delete");
BOOST_REQUIRE_EQUAL(delete_request.method, "POST");
}
Expand Down Expand Up @@ -404,7 +414,7 @@ FIXTURE_TEST(recovery_with_topic_name_pattern_without_match, fixture) {
return !service.local().is_active();
}).get();

BOOST_REQUIRE_EQUAL(get_requests().size(), 16);
BOOST_REQUIRE_EQUAL(get_requests().size(), 17);
}

FIXTURE_TEST(recovery_with_topic_name_pattern_with_match, fixture) {
Expand Down
214 changes: 55 additions & 159 deletions src/v/cluster/topic_recovery_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
#include "cloud_storage/recovery_request.h"
#include "cloud_storage/recovery_utils.h"
#include "cloud_storage/topic_manifest.h"
#include "cloud_storage/topic_manifest_downloader.h"
#include "cluster/topic_recovery_status_frontend.h"
#include "cluster/topics_frontend.h"
#include "cluster/types.h"

#include <seastar/core/lowres_clock.hh>
#include <seastar/coroutine/as_future.hh>
#include <seastar/http/request.hh>
#include <seastar/util/defer.hh>

Expand All @@ -31,12 +34,6 @@

namespace {

const std::regex manifest_path_expr{
R"REGEX(\w+/meta/(.*?)/(.*?)/topic_manifest\.(json|bin))REGEX"};

// Possible prefix for a path which contains a topic manifest file
const std::regex prefix_expr{"[a-fA-F0-9]0000000/"};

constexpr size_t list_api_timeout_multiplier{10};

constexpr ss::lowres_clock::duration downloads_check_interval{60s};
Expand Down Expand Up @@ -226,50 +223,6 @@ topic_recovery_service::recovery_status_log() const {
return {_status_log.begin(), _status_log.end()};
}

// NOTE rewritten as continuations to address arm64 miscompilation of coroutines
// under clang-14
static ss::future<std::vector<remote_segment_path>> collect_manifest_paths(
remote& remote, ss::abort_source& as, const recovery_task_config& cfg) {
// Look under each manifest prefix for topic manifests.
constexpr static auto hex_chars = std::string_view{"0123456789abcdef"};
return ss::do_with(std::vector<remote_segment_path>{}, [&](auto& paths) {
return ss::do_for_each(
hex_chars,
[&](char hex_ch) {
return ss::do_with(
std::make_unique<retry_chain_node>(
as, cfg.operation_timeout_ms, cfg.backoff_ms),
fmt::format("{}0000000/", hex_ch),
[&](auto& rtc, auto& prefix) {
return remote
.list_objects(
cfg.bucket,
*rtc,
cloud_storage_clients::object_key{prefix})
.then([&](auto meta) {
if (meta.has_error()) {
vlog(
cst_log.error,
"Failed to list meta items: {}",
meta.error());
return;
}

for (auto&& item : meta.value().contents) {
vlog(
cst_log.trace,
"adding path {} for {}",
item.key,
prefix);
paths.emplace_back(item.key);
}
});
});
})
.then([&] { return std::move(paths); });
});
}

ss::future<result<void, recovery_error_ctx>>
topic_recovery_service::start_bg_recovery_task(recovery_request request) {
vlog(cst_log.info, "Starting recovery task with request: {}", request);
Expand Down Expand Up @@ -298,12 +251,59 @@ topic_recovery_service::start_bg_recovery_task(recovery_request request) {
_recovery_request.emplace(request);

set_state(state::scanning_bucket);

vlog(cst_log.debug, "scanning bucket {}", _config.bucket);
auto bucket_contents = co_await collect_manifest_paths(
_remote.local(), _as, _config);
auto fib = make_rtc(_as, _config);
std::optional<std::regex> requested_pattern = std::nullopt;
if (request.topic_names_pattern().has_value()) {
requested_pattern.emplace(
request.topic_names_pattern().value().data(),
request.topic_names_pattern().value().size());
}
const auto requested_topic =
[&requested_pattern](const model::topic_namespace& topic) {
if (!requested_pattern) {
return true;
}
return std::regex_search(topic.tp().c_str(), *requested_pattern);
};

absl::flat_hash_set<model::topic_namespace> existing_topics;
for (auto topic : _topic_state.local().all_topics()) {
if (requested_topic(topic)) {
existing_topics.emplace(std::move(topic));
}
}

auto manifests = co_await filter_existing_topics(
bucket_contents, request, model::ns{"kafka"});
auto should_create = [&requested_topic, &existing_topics](
const model::topic_namespace& topic) {
return requested_topic(topic) && !existing_topics.contains(topic);
};

chunked_vector<topic_manifest> manifests;
auto res
= co_await cloud_storage::topic_manifest_downloader::find_manifests(
_remote.local(),
_config.bucket,
fib,
ss::lowres_clock::now() + _config.operation_timeout_ms,
10ms,
std::move(should_create),
&manifests);
if (res.has_error()) {
_recovery_request = std::nullopt;
set_state(state::inactive);
co_return recovery_error_ctx::make(
fmt::format("failed to create topics: {}", res.error()),
recovery_error_code::error_creating_topics);
}
if (res.value() != find_topic_manifest_outcome::success) {
andrwng marked this conversation as resolved.
Show resolved Hide resolved
_recovery_request = std::nullopt;
set_state(state::inactive);
co_return recovery_error_ctx::make(
"failed to create topics",
recovery_error_code::error_creating_topics);
}

if (manifests.empty()) {
andrwng marked this conversation as resolved.
Show resolved Hide resolved
vlog(cst_log.info, "exiting recovery, no topics to create");
Expand All @@ -321,7 +321,7 @@ topic_recovery_service::start_bg_recovery_task(recovery_request request) {
auto clear_fib = make_rtc(_as, _config);
co_await clear_recovery_results(
_remote.local(), _config.bucket, clear_fib, std::nullopt);
_downloaded_manifests.emplace(manifests);
_downloaded_manifests = std::move(manifests);

populate_recovery_status();

Expand Down Expand Up @@ -426,110 +426,6 @@ topic_recovery_service::create_topics(const recovery_request& request) {
config::shard_local_cfg().create_topic_timeout_ms());
}

ss::future<std::vector<cloud_storage::topic_manifest>>
topic_recovery_service::filter_existing_topics(
std::vector<remote_segment_path> items,
const recovery_request& request,
std::optional<model::ns>) {
absl::flat_hash_map<ss::sstring, absl::flat_hash_set<ss::sstring>>
topic_index;

for (const auto& topic : _topic_state.local().all_topics()) {
topic_index.try_emplace(topic.ns, absl::flat_hash_set<ss::sstring>{});
topic_index[topic.ns].insert(topic.tp);
}

std::vector<topic_manifest> manifests;
manifests.reserve(items.size());

std::optional<std::regex> requested_pattern = std::nullopt;
if (request.topic_names_pattern().has_value()) {
requested_pattern.emplace(
request.topic_names_pattern().value().data(),
request.topic_names_pattern().value().size());
}

for (const auto& item : items) {
// Although we filter for topic manifest pattern earlier, we still use
// this regex match here to extract the namespace and topic from the
// pattern.
std::smatch matches;
const auto& path = item().string();
const auto is_topic_manifest = std::regex_match(
path.cbegin(), path.cend(), matches, manifest_path_expr);
if (!is_topic_manifest) {
continue;
}

const auto& ns = matches[1].str();
const auto& tp = matches[2].str();

if (
requested_pattern.has_value()
&& !std::regex_search(tp, requested_pattern.value())) {
vlog(
cst_log.debug,
"will skip topic {}, it does not match pattern {}",
tp,
request.topic_names_pattern().value());
continue;
}

if (topic_index.contains(ns) && topic_index[ns].contains(tp)) {
vlog(
cst_log.debug,
"will skip creating {}:{}, topic already exists",
ns,
tp);
continue;
}

if (auto download_r = co_await download_manifest(path);
download_r.has_value()) {
manifests.push_back(std::move(download_r.value()));
}
}
co_return manifests;
}

ss::future<result<cloud_storage::topic_manifest, recovery_error_ctx>>
topic_recovery_service::download_manifest(ss::sstring path) {
cloud_storage::topic_manifest m;
auto fib = make_rtc(_as, _config);
auto expected_format = path.ends_with("json") ? manifest_format::json
: manifest_format::serde;
try {
auto download_r = co_await _remote.local().download_manifest(
_config.bucket,
{expected_format, remote_manifest_path{path}},
m,
fib);
if (download_r != download_result::success) {
auto error = recovery_error_ctx::make(
fmt::format(
"failed to download manifest from {} format {}: {}",
path,
expected_format,
download_r),
recovery_error_code::error_downloading_manifest);
vlog(cst_log.error, "{}", error.context);
co_return error;
}
co_return m;

} catch (const std::exception& ex) {
auto error = recovery_error_ctx::make(
fmt::format(
"failed to download manifest from {} format {}: {}",
path,
expected_format,
ex.what()),
recovery_error_code::error_downloading_manifest);
vlog(cst_log.error, "{}", error.context);
co_return error;
}
}

void topic_recovery_service::start_download_bg_tracker() {
_pending_status_timer.set_callback([this] {
ssx::spawn_with_gate(_gate, [this] { return check_for_downloads(); });
Expand Down
8 changes: 2 additions & 6 deletions src/v/cluster/topic_recovery_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "cloud_storage/remote.h"
#include "cloud_storage/topic_manifest.h"
#include "cluster/types.h"
#include "container/fragmented_vector.h"
#include "model/fundamental.h"

#include <seastar/core/gate.hh>
Expand Down Expand Up @@ -127,11 +128,6 @@ struct topic_recovery_service
const recovery_request& request,
std::optional<model::ns> filter_ns);

/// \brief Try to download a manifest JSON file, parse it and return the
/// parsed manifest
ss::future<result<cloud_storage::topic_manifest, recovery_error_ctx>>
download_manifest(ss::sstring path);

ss::future<std::vector<cluster::topic_result>>
create_topics(const recovery_request& request);

Expand Down Expand Up @@ -192,7 +188,7 @@ struct topic_recovery_service
// once the recovery has ended. One example is the topic retention which
// could be set to some small value during recovery and restored back to
// original value from manifest once recovery has ended.
std::optional<std::vector<topic_manifest>> _downloaded_manifests;
std::optional<chunked_vector<topic_manifest>> _downloaded_manifests;

boost::circular_buffer<recovery_status> _status_log;
};
Expand Down