Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cloud_storage: remote labels #20778

Merged
merged 34 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
44978ef
offline_log_viewer: support remote_labels in topic properties
andrwng Jun 20, 2024
7d38dba
cluster: add remote label as topic property
andrwng Jun 12, 2024
fab93f1
archival_stm: add remote path provider to archival stm
andrwng Jun 29, 2024
cda86a5
ntp_archiver: use path provider for naming
andrwng Jun 20, 2024
160a40e
cloud_storage: plumb path provider into async manifest view
andrwng Jun 20, 2024
291ed3f
cluster: use topic_manifest_downloader in topic recovery
andrwng Jun 17, 2024
b6d1e16
cluster: use topic_manifest_downloader in list-based recovery
andrwng Jun 25, 2024
41df563
archival_stm: use partition_manifest_downloader for snapshot recovery
andrwng Jun 18, 2024
e16c247
ntp_archiver: use partition_manifest_downlaoder for read replicas
andrwng Jun 17, 2024
13a1567
cluster: use partition_manifest_downloader in partition recovery
andrwng Jun 17, 2024
fe640c9
cluster: use partition_manifest_downloader in topic recovery validation
andrwng Jun 20, 2024
9d97ff6
cluster: use partition_manifest_downloader for unsafe reset from cloud
andrwng Jun 17, 2024
8cb5db2
cloud_storage: use partition_manifest_downloader in scrubber/anomaly …
andrwng Jun 20, 2024
ebf2fa1
remote_partition: use partition_manifest_downloader for manifest fina…
andrwng Jun 17, 2024
653c9ec
cloud_storage: remove try_download_partition_manifest
andrwng Jun 17, 2024
eaa7b95
cloud_storage: remove remote::partition_manifest_exists()
andrwng Jun 20, 2024
849aef7
cloud_storage: use path provider for spillovers in manifest view
andrwng Jun 19, 2024
5248547
remote_partition: use path provider for segment paths
andrwng Jun 18, 2024
3937454
cloud_storage: add utils for lifecycle marker paths
andrwng Jun 29, 2024
d10a60d
archival: use path provider to generate lifecycle marker paths
andrwng Jun 29, 2024
5d30320
archival: use the path provider throughout the purger
andrwng Jun 18, 2024
a0be06a
archival/segment_merger: use path provider for remote runs
andrwng Jun 18, 2024
d2681ba
admin: use path provider for anomalies report
andrwng Jun 19, 2024
1066344
features: add flag for remote labels
andrwng Jul 4, 2024
2d7e19a
config: property for disabling remote labels for tests
andrwng Jun 20, 2024
e7669a9
cloud_storage: clean up manifest path generation from manifests
andrwng Jun 20, 2024
46ed453
cloud_storage: clean up remote segment name generation
andrwng Jun 29, 2024
459106c
topic_manifest: remove feature table
andrwng Jun 22, 2024
902f522
rptest/services: make get_cluster_uuid() node optional
andrwng Jun 24, 2024
245b4f2
rptest: support remote labels in BucketView
andrwng Jun 24, 2024
df36562
rptest: option to change si_settings bucket
andrwng Jul 1, 2024
45c62a4
cluster: plug cluster uuid into new topics
andrwng Jun 29, 2024
b6ae520
config: enable remote labels by default
andrwng Jul 3, 2024
263d4b3
rptest: add remote_label_test
andrwng Jul 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "archival/archival_metadata_stm.h"
#include "base/outcome.h"
#include "base/vassert.h"
#include "cloud_storage/remote_path_provider.h"
#include "cluster/cluster_utils.h"
#include "cluster/errc.h"
#include "cluster/fwd.h"
Expand All @@ -36,6 +37,7 @@
#include "ssx/event.h"
#include "ssx/future-util.h"
#include "storage/offset_translator.h"
#include "types.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/coroutine.hh>
Expand Down Expand Up @@ -1402,7 +1404,10 @@ ss::future<std::error_code> controller_backend::create_partition(
group_id,
std::move(initial_brokers),
cfg->properties.remote_topic_properties,
read_replica_bucket);
read_replica_bucket,
raft::with_learner_recovery_throttle::yes,
raft::keep_snapshotted_log::no,
cfg->properties.remote_label);

co_await add_to_shard_table(
ntp, group_id, ss::this_shard_id(), log_revision);
Expand Down
17 changes: 13 additions & 4 deletions src/v/cluster/partition_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
#include "cloud_storage/cache_service.h"
#include "cloud_storage/partition_manifest.h"
#include "cloud_storage/remote.h"
#include "cloud_storage/remote_label.h"
#include "cloud_storage/remote_partition.h"
#include "cloud_storage/remote_path_provider.h"
#include "cluster/fwd.h"
#include "cluster/logger.h"
#include "cluster/partition.h"
Expand Down Expand Up @@ -117,9 +119,11 @@ ss::future<consensus_ptr> partition_manager::manage(
std::optional<remote_topic_properties> rtp,
std::optional<cloud_storage_clients::bucket_name> read_replica_bucket,
raft::with_learner_recovery_throttle enable_learner_recovery_throttle,
raft::keep_snapshotted_log keep_snapshotted_log) {
raft::keep_snapshotted_log keep_snapshotted_log,
std::optional<cloud_storage::remote_label> remote_label) {
auto guard = _gate.hold();
auto dl_result = co_await maybe_download_log(ntp_cfg, rtp);
cloud_storage::remote_path_provider path_provider(remote_label);
auto dl_result = co_await maybe_download_log(ntp_cfg, rtp, path_provider);
auto& [logs_recovered, clean_download, min_offset, max_offset, manifest, ot_state]
= dl_result;
if (logs_recovered) {
Expand Down Expand Up @@ -266,10 +270,15 @@ ss::future<consensus_ptr> partition_manager::manage(

ss::future<cloud_storage::log_recovery_result>
partition_manager::maybe_download_log(
storage::ntp_config& ntp_cfg, std::optional<remote_topic_properties> rtp) {
storage::ntp_config& ntp_cfg,
std::optional<remote_topic_properties> rtp,
cloud_storage::remote_path_provider& path_provider) {
if (rtp.has_value() && _partition_recovery_mgr.local_is_initialized()) {
auto res = co_await _partition_recovery_mgr.local().download_log(
ntp_cfg, rtp->remote_revision, rtp->remote_partition_count);
ntp_cfg,
rtp->remote_revision,
rtp->remote_partition_count,
path_provider);
co_return res;
}
vlog(
Expand Down
8 changes: 6 additions & 2 deletions src/v/cluster/partition_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "archival/fwd.h"
#include "cloud_storage/fwd.h"
#include "cloud_storage/remote_path_provider.h"
#include "cluster/fwd.h"
#include "cluster/ntp_callbacks.h"
#include "cluster/partition.h"
Expand Down Expand Up @@ -94,7 +95,8 @@ class partition_manager
std::optional<cloud_storage_clients::bucket_name> = std::nullopt,
raft::with_learner_recovery_throttle
= raft::with_learner_recovery_throttle::yes,
raft::keep_snapshotted_log = raft::keep_snapshotted_log::no);
raft::keep_snapshotted_log = raft::keep_snapshotted_log::no,
std::optional<cloud_storage::remote_label> = std::nullopt);

ss::future<> shutdown(const model::ntp& ntp);

Expand Down Expand Up @@ -247,7 +249,9 @@ class partition_manager
/// \param ntp_cfg is an ntp_config instance to recover
/// \return true if the recovery was invoked, false otherwise
ss::future<cloud_storage::log_recovery_result> maybe_download_log(
storage::ntp_config& ntp_cfg, std::optional<remote_topic_properties> rtp);
storage::ntp_config& ntp_cfg,
std::optional<remote_topic_properties> rtp,
cloud_storage::remote_path_provider& path_provider);

ss::future<> do_shutdown(ss::lw_shared_ptr<partition>);

Expand Down
48 changes: 31 additions & 17 deletions src/v/cluster/partition_recovery_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@

#include "bytes/streambuf.h"
#include "cloud_storage/logger.h"
#include "cloud_storage/partition_manifest_downloader.h"
#include "cloud_storage/recovery_utils.h"
#include "cloud_storage/remote_label.h"
#include "cloud_storage/remote_path_provider.h"
#include "cloud_storage/topic_manifest.h"
#include "cloud_storage/types.h"
#include "cluster/topic_recovery_status_frontend.h"
Expand Down Expand Up @@ -85,7 +88,8 @@ ss::future<> partition_recovery_manager::stop() {
ss::future<log_recovery_result> partition_recovery_manager::download_log(
const storage::ntp_config& ntp_cfg,
model::initial_revision_id remote_revision,
int32_t remote_partition_count) {
int32_t remote_partition_count,
cloud_storage::remote_path_provider& path_provider) {
if (!ntp_cfg.has_overrides()) {
vlog(
cst_log.debug, "No overrides for {} found, skipping", ntp_cfg.ntp());
Expand All @@ -101,6 +105,7 @@ ss::future<log_recovery_result> partition_recovery_manager::download_log(
}
partition_downloader downloader(
ntp_cfg,
path_provider,
&_remote.local(),
remote_revision,
remote_partition_count,
Expand All @@ -115,7 +120,7 @@ ss::future<log_recovery_result> partition_recovery_manager::download_log(
cst_log.debug,
"topic recovery service is active, uploading result: {} for {}",
result.logs_recovered,
result.manifest.get_manifest_path());
result.manifest.get_manifest_path(path_provider));
co_await cloud_storage::place_download_result(
_remote.local(), _bucket, ntp_cfg, result.logs_recovered, fib);
}
Expand Down Expand Up @@ -149,6 +154,7 @@ void partition_recovery_manager::set_topic_recovery_components(

partition_downloader::partition_downloader(
const storage::ntp_config& ntpc,
const cloud_storage::remote_path_provider& path_provider,
remote* remote,
model::initial_revision_id remote_rev_id,
int32_t remote_partition_count,
Expand All @@ -157,6 +163,7 @@ partition_downloader::partition_downloader(
retry_chain_node& parent,
storage::opt_abort_source_t as)
: _ntpc(ntpc)
, _remote_path_provider(path_provider)
, _bucket(std::move(bucket))
, _remote(remote)
, _remote_revision_id(remote_rev_id)
Expand Down Expand Up @@ -604,20 +611,27 @@ partition_downloader::find_recovery_material() {
vlog(
_ctxlog.info,
"Downloading partition manifest {}",
andrwng marked this conversation as resolved.
Show resolved Hide resolved
tmp.get_manifest_path());
auto [res, res_fmt] = co_await _remote->try_download_partition_manifest(
_bucket, tmp, _rtcnode);
if (res == download_result::success) {
recovery_mat.partition_manifest = std::move(tmp);
co_return recovery_mat;
tmp.get_manifest_path(_remote_path_provider));
cloud_storage::partition_manifest_downloader dl(
_bucket,
_remote_path_provider,
_ntpc.ntp(),
_remote_revision_id,
*_remote);
auto download_res = co_await dl.download_manifest(_rtcnode, &tmp);
if (download_res.has_error()) {
throw std::runtime_error(fmt_with_ctx(
fmt::format, "Can't download manifest: {}", download_res.error()));
}
if (res == download_result::notfound) {
// Manifest is not available in the cloud
throw missing_partition_exception(tmp.get_manifest_path(res_fmt));
if (
download_res.value()
== find_partition_manifest_outcome::no_matching_manifest) {
throw missing_partition_exception(
remote_manifest_path{_remote_path_provider.partition_manifest_path(
_ntpc.ntp(), _remote_revision_id)});
}
// Some other, possibly transient error
throw std::runtime_error(
fmt_with_ctx(fmt::format, "Can't download manifest: {}", res));
recovery_mat.partition_manifest = std::move(tmp);
co_return recovery_mat;
}

static ss::future<ss::output_stream<char>>
Expand Down Expand Up @@ -669,9 +683,6 @@ partition_downloader::download_segment_file(
const segment_meta& segm, const download_part& part) {
auto name = generate_local_segment_name(
segm.base_offset, segm.segment_term);
auto remote_path = partition_manifest::generate_remote_segment_path(
_ntpc.ntp(), segm);

auto localpath = part.part_prefix / std::filesystem::path(name());

vlog(
Expand All @@ -696,6 +707,9 @@ partition_downloader::download_segment_file(
}

auto stream_stats = cloud_storage::stream_stats{};
auto remote_path = cloud_storage::remote_segment_path(
_remote_path_provider.segment_path(
_ntpc.ntp(), _ntpc.get_initial_revision(), segm));

auto stream = [this,
&stream_stats,
Expand Down
7 changes: 6 additions & 1 deletion src/v/cluster/partition_recovery_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

#include "cloud_storage/offset_translation_layer.h"
#include "cloud_storage/remote.h"
#include "cloud_storage/remote_label.h"
#include "cloud_storage/remote_path_provider.h"
#include "model/metadata.h"
#include "model/record.h"
#include "storage/ntp_config.h"
Expand Down Expand Up @@ -77,7 +79,8 @@ class partition_recovery_manager {
ss::future<log_recovery_result> download_log(
const storage::ntp_config& ntp_cfg,
model::initial_revision_id remote_revsion,
int32_t remote_partition_count);
int32_t remote_partition_count,
cloud_storage::remote_path_provider& path_provider);

void set_topic_recovery_components(
ss::sharded<cluster::topic_recovery_status_frontend>&
Expand Down Expand Up @@ -110,6 +113,7 @@ class partition_downloader {
public:
partition_downloader(
const storage::ntp_config& ntpc,
const cloud_storage::remote_path_provider& path_provider,
remote* remote,
model::initial_revision_id remote_revision_id,
int32_t remote_partition_count,
Expand Down Expand Up @@ -216,6 +220,7 @@ class partition_downloader {
read_first_record_header(const std::filesystem::path& path);

const storage::ntp_config& _ntpc;
const cloud_storage::remote_path_provider& _remote_path_provider;
cloud_storage_clients::bucket_name _bucket;
remote* _remote;
model::initial_revision_id _remote_revision_id;
Expand Down