Skip to content

Commit

Permalink
cluster: plug cluster uuid into new topics
Browse files Browse the repository at this point in the history
This only takes effect for topics that aren't being read or restored
from the cloud.
  • Loading branch information
andrwng committed Jul 1, 2024
1 parent 694572e commit d1a4da8
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ ss::future<> controller::start(
std::ref(_partition_manager),
std::ref(_shard_table),
std::ref(_shard_balancer),
std::ref(_storage),
ss::sharded_parameter(
[this] { return std::ref(_data_migrated_resources.local()); }),
ss::sharded_parameter([this] { return std::ref(_plugin_table.local()); }),
Expand Down
21 changes: 21 additions & 0 deletions src/v/cluster/topics_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ topics_frontend::topics_frontend(
ss::sharded<partition_manager>& pm,
ss::sharded<shard_table>& shard_table,
ss::sharded<shard_balancer>& sb,
ss::sharded<storage::api>& storage,
data_migrations::migrated_resources& migrated_resources,
plugin_table& plugin_table,
metadata_cache& metadata_cache,
Expand All @@ -95,6 +96,7 @@ topics_frontend::topics_frontend(
, _cloud_storage_api(cloud_storage_api)
, _features(features)
, _shard_balancer(sb)
, _storage(storage)
, _plugin_table(plugin_table)
, _metadata_cache(metadata_cache)
, _members_table(members_table)
Expand Down Expand Up @@ -555,6 +557,25 @@ ss::future<topic_result> topics_frontend::do_create_topic(
assignable_config.cfg.tp_ns,
assignable_config.cfg);
}
bool configured_label_from_manifest
= assignable_config.is_read_replica()
|| assignable_config.is_recovery_enabled();
if (
!configured_label_from_manifest
&& !assignable_config.cfg.properties.remote_label.has_value()
&& _storage.local().get_cluster_uuid().has_value()
&& _features.local().is_active(features::feature::remote_labels)
&& !config::shard_local_cfg()
.cloud_storage_disable_remote_labels_for_tests.value()) {
auto remote_label = std::make_optional<cloud_storage::remote_label>(
_storage.local().get_cluster_uuid().value());
assignable_config.cfg.properties.remote_label = remote_label;
vlog(
clusterlog.debug,
"Configuring topic {} with remote label {}",
assignable_config.cfg.tp_ns,
remote_label);
}

auto units = co_await _allocator.invoke_on(
partition_allocator::shard,
Expand Down
3 changes: 3 additions & 0 deletions src/v/cluster/topics_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "model/timeout_clock.h"
#include "partition_balancer_types.h"
#include "rpc/fwd.h"
#include "storage/api.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/chunked_fifo.hh>
Expand Down Expand Up @@ -67,6 +68,7 @@ class topics_frontend {
ss::sharded<partition_manager>&,
ss::sharded<shard_table>&,
ss::sharded<shard_balancer>&,
ss::sharded<storage::api>&,
data_migrations::migrated_resources&,
plugin_table&,
metadata_cache&,
Expand Down Expand Up @@ -302,6 +304,7 @@ class topics_frontend {
ss::sharded<cloud_storage::remote>& _cloud_storage_api;
ss::sharded<features::feature_table>& _features;
ss::sharded<shard_balancer>& _shard_balancer;
ss::sharded<storage::api>& _storage;
plugin_table& _plugin_table;
metadata_cache& _metadata_cache;

Expand Down

0 comments on commit d1a4da8

Please sign in to comment.