Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cloud_storage: remote labels #20778

Merged
merged 34 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
44978ef
offline_log_viewer: support remote_labels in topic properties
andrwng Jun 20, 2024
7d38dba
cluster: add remote label as topic property
andrwng Jun 12, 2024
fab93f1
archival_stm: add remote path provider to archival stm
andrwng Jun 29, 2024
cda86a5
ntp_archiver: use path provider for naming
andrwng Jun 20, 2024
160a40e
cloud_storage: plumb path provider into async manifest view
andrwng Jun 20, 2024
291ed3f
cluster: use topic_manifest_downloader in topic recovery
andrwng Jun 17, 2024
b6d1e16
cluster: use topic_manifest_downloader in list-based recovery
andrwng Jun 25, 2024
41df563
archival_stm: use partition_manifest_downloader for snapshot recovery
andrwng Jun 18, 2024
e16c247
ntp_archiver: use partition_manifest_downlaoder for read replicas
andrwng Jun 17, 2024
13a1567
cluster: use partition_manifest_downloader in partition recovery
andrwng Jun 17, 2024
fe640c9
cluster: use partition_manifest_downloader in topic recovery validation
andrwng Jun 20, 2024
9d97ff6
cluster: use partition_manifest_downloader for unsafe reset from cloud
andrwng Jun 17, 2024
8cb5db2
cloud_storage: use partition_manifest_downloader in scrubber/anomaly …
andrwng Jun 20, 2024
ebf2fa1
remote_partition: use partition_manifest_downloader for manifest fina…
andrwng Jun 17, 2024
653c9ec
cloud_storage: remove try_download_partition_manifest
andrwng Jun 17, 2024
eaa7b95
cloud_storage: remove remote::partition_manifest_exists()
andrwng Jun 20, 2024
849aef7
cloud_storage: use path provider for spillovers in manifest view
andrwng Jun 19, 2024
5248547
remote_partition: use path provider for segment paths
andrwng Jun 18, 2024
3937454
cloud_storage: add utils for lifecycle marker paths
andrwng Jun 29, 2024
d10a60d
archival: use path provider to generate lifecycle marker paths
andrwng Jun 29, 2024
5d30320
archival: use the path provider throughout the purger
andrwng Jun 18, 2024
a0be06a
archival/segment_merger: use path provider for remote runs
andrwng Jun 18, 2024
d2681ba
admin: use path provider for anomalies report
andrwng Jun 19, 2024
1066344
features: add flag for remote labels
andrwng Jul 4, 2024
2d7e19a
config: property for disabling remote labels for tests
andrwng Jun 20, 2024
e7669a9
cloud_storage: clean up manifest path generation from manifests
andrwng Jun 20, 2024
46ed453
cloud_storage: clean up remote segment name generation
andrwng Jun 29, 2024
459106c
topic_manifest: remove feature table
andrwng Jun 22, 2024
902f522
rptest/services: make get_cluster_uuid() node optional
andrwng Jun 24, 2024
245b4f2
rptest: support remote labels in BucketView
andrwng Jun 24, 2024
df36562
rptest: option to change si_settings bucket
andrwng Jul 1, 2024
45c62a4
cluster: plug cluster uuid into new topics
andrwng Jun 29, 2024
b6ae520
config: enable remote labels by default
andrwng Jul 3, 2024
263d4b3
rptest: add remote_label_test
andrwng Jul 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions src/v/archival/archival_metadata_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "bytes/iostream.h"
#include "cloud_storage/partition_manifest.h"
#include "cloud_storage/remote.h"
#include "cloud_storage/remote_path_provider.h"
#include "cloud_storage/types.h"
#include "cluster/errc.h"
#include "cluster/logger.h"
Expand Down Expand Up @@ -641,14 +642,16 @@ archival_metadata_stm::archival_metadata_stm(
raft::consensus* raft,
cloud_storage::remote& remote,
features::feature_table& ft,
ss::logger& logger)
ss::logger& logger,
std::optional<cloud_storage::remote_label> remote_label)
: raft::persisted_stm<>(archival_stm_snapshot, logger, raft)
, _logger(logger, ssx::sformat("ntp: {}", raft->ntp()))
, _mem_tracker(ss::make_shared<util::mem_tracker>(raft->ntp().path()))
, _manifest(ss::make_shared<cloud_storage::partition_manifest>(
raft->ntp(), raft->log_config().get_initial_revision(), _mem_tracker))
, _cloud_storage_api(remote)
, _feature_table(ft) {}
, _feature_table(ft)
, _remote_path_provider({remote_label}) {}

ss::future<std::error_code> archival_metadata_stm::truncate(
model::offset start_rp_offset,
Expand Down Expand Up @@ -1687,10 +1690,12 @@ archival_metadata_stm::state_dirty archival_metadata_stm::get_dirty(
archival_metadata_stm_factory::archival_metadata_stm_factory(
bool cloud_storage_enabled,
ss::sharded<cloud_storage::remote>& cloud_storage_api,
ss::sharded<features::feature_table>& feature_table)
ss::sharded<features::feature_table>& feature_table,
ss::sharded<cluster::topic_table>& topics)
: _cloud_storage_enabled(cloud_storage_enabled)
, _cloud_storage_api(cloud_storage_api)
, _feature_table(feature_table) {}
, _feature_table(feature_table)
, _topics(topics) {}

bool archival_metadata_stm_factory::is_applicable_for(
const storage::ntp_config& ntp_cfg) const {
Expand All @@ -1700,8 +1705,18 @@ bool archival_metadata_stm_factory::is_applicable_for(

void archival_metadata_stm_factory::create(
raft::state_machine_manager_builder& builder, raft::consensus* raft) {
auto topic_md = _topics.local().get_topic_metadata_ref(
model::topic_namespace_view(raft->ntp()));
auto remote_label
= topic_md.has_value()
? topic_md->get().get_configuration().properties.remote_label
: std::nullopt;
auto stm = builder.create_stm<cluster::archival_metadata_stm>(
raft, _cloud_storage_api.local(), _feature_table.local(), clusterlog);
raft,
_cloud_storage_api.local(),
_feature_table.local(),
clusterlog,
remote_label);
raft->log()->stm_manager()->add_stm(stm);
}

Expand Down
14 changes: 12 additions & 2 deletions src/v/archival/archival_metadata_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@

#include "cloud_storage/fwd.h"
#include "cloud_storage/partition_manifest.h"
#include "cloud_storage/remote_path_provider.h"
#include "cloud_storage/types.h"
#include "cluster/errc.h"
#include "cluster/state_machine_registry.h"
#include "cluster/topic_table.h"
#include "features/fwd.h"
#include "model/fundamental.h"
#include "model/record.h"
Expand Down Expand Up @@ -133,7 +135,8 @@ class archival_metadata_stm final : public raft::persisted_stm<> {
raft::consensus*,
cloud_storage::remote& remote,
features::feature_table&,
ss::logger& logger);
ss::logger& logger,
std::optional<cloud_storage::remote_label>);
WillemKauf marked this conversation as resolved.
Show resolved Hide resolved

/// Add segments to the raft log, replicate them and
/// wait until it is applied to the STM.
Expand Down Expand Up @@ -277,6 +280,10 @@ class archival_metadata_stm final : public raft::persisted_stm<> {
return _compacted_replaced_bytes;
}

const cloud_storage::remote_path_provider& path_provider() const {
return _remote_path_provider;
}

private:
ss::future<bool>
do_sync(model::timeout_clock::duration timeout, ss::abort_source* as);
Expand Down Expand Up @@ -374,6 +381,7 @@ class archival_metadata_stm final : public raft::persisted_stm<> {

cloud_storage::remote& _cloud_storage_api;
features::feature_table& _feature_table;
const cloud_storage::remote_path_provider _remote_path_provider;
ss::abort_source _download_as;

// for observability: keep track of the number of cloud bytes "removed" by
Expand All @@ -389,7 +397,8 @@ class archival_metadata_stm_factory : public state_machine_factory {
archival_metadata_stm_factory(
bool cloud_storage_enabled,
ss::sharded<cloud_storage::remote>&,
ss::sharded<features::feature_table>&);
ss::sharded<features::feature_table>&,
ss::sharded<cluster::topic_table>&);

bool is_applicable_for(const storage::ntp_config&) const final;
void create(raft::state_machine_manager_builder&, raft::consensus*) final;
Expand All @@ -398,6 +407,7 @@ class archival_metadata_stm_factory : public state_machine_factory {
bool _cloud_storage_enabled;
ss::sharded<cloud_storage::remote>& _cloud_storage_api;
ss::sharded<features::feature_table>& _feature_table;
ss::sharded<topic_table>& _topics;
};

} // namespace cluster
3 changes: 2 additions & 1 deletion src/v/archival/tests/archival_metadata_stm_gtest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ class archival_metadata_stm_gtest_fixture : public raft::raft_fixture {
node->raft().get(),
stm_node.remote.local(),
node->get_feature_table().local(),
fixture_logger);
fixture_logger,
std::nullopt);

stm_node.archival_stm = std::move(stm);

Expand Down
24 changes: 20 additions & 4 deletions src/v/archival/tests/archival_metadata_stm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,11 @@ struct archival_metadata_stm_fixture : archival_metadata_stm_base_fixture {
create_raft();
raft::state_machine_manager_builder builder;
archival_stm = builder.create_stm<cluster::archival_metadata_stm>(
_raft.get(), cloud_api.local(), feature_table.local(), logger);
_raft.get(),
cloud_api.local(),
feature_table.local(),
logger,
std::nullopt);

_raft->start(std::move(builder)).get();
_started = true;
Expand Down Expand Up @@ -351,7 +355,11 @@ FIXTURE_TEST(test_snapshot_loading, archival_metadata_stm_base_fixture) {

raft::state_machine_manager_builder builder;
auto archival_stm = builder.create_stm<cluster::archival_metadata_stm>(
_raft.get(), cloud_api.local(), feature_table.local(), logger);
_raft.get(),
cloud_api.local(),
feature_table.local(),
logger,
std::nullopt);
_raft->start(std::move(builder)).get();
_started = true;
wait_for_confirmed_leader();
Expand Down Expand Up @@ -446,7 +454,11 @@ FIXTURE_TEST(test_sname_derivation, archival_metadata_stm_base_fixture) {

raft::state_machine_manager_builder builder;
auto archival_stm = builder.create_stm<cluster::archival_metadata_stm>(
_raft.get(), cloud_api.local(), feature_table.local(), logger);
_raft.get(),
cloud_api.local(),
feature_table.local(),
logger,
std::nullopt);

_raft->start(std::move(builder)).get();
_started = true;
Expand Down Expand Up @@ -655,7 +667,11 @@ FIXTURE_TEST(

raft::state_machine_manager_builder builder;
auto archival_stm = builder.create_stm<cluster::archival_metadata_stm>(
_raft.get(), cloud_api.local(), feature_table.local(), logger);
_raft.get(),
cloud_api.local(),
feature_table.local(),
logger,
std::nullopt);
_raft->start(std::move(builder)).get();
_started = true;
wait_for_confirmed_leader();
Expand Down
3 changes: 2 additions & 1 deletion src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2685,7 +2685,8 @@ void application::start_runtime_services(
pm.register_factory<cluster::archival_metadata_stm_factory>(
config::shard_local_cfg().cloud_storage_enabled(),
cloud_storage_api,
feature_table);
feature_table,
controller->get_topics_state());
pm.register_factory<kafka::group_tx_tracker_stm_factory>();
})
.get();
Expand Down