diff --git a/src/v/cluster/controller.cc b/src/v/cluster/controller.cc index 43a68cd3e7e3f..4465d27b409bd 100644 --- a/src/v/cluster/controller.cc +++ b/src/v/cluster/controller.cc @@ -11,6 +11,8 @@ #include "cluster/bootstrap_backend.h" #include "cluster/cloud_metadata/cluster_recovery_backend.h" +#include "cluster/cloud_metadata/error_outcome.h" +#include "cluster/cloud_metadata/manifest_downloads.h" #include "cluster/cloud_metadata/uploader.h" #include "cluster/cluster_discovery.h" #include "cluster/cluster_recovery_table.h" @@ -109,6 +111,17 @@ controller::controller( // includes for destructors of all its members (e.g. the metadata uploader). controller::~controller() = default; +std::optional +controller::get_configured_bucket() { + auto& bucket_property = cloud_storage::configuration::get_bucket_config(); + if ( + !bucket_property.is_overriden() || !bucket_property().has_value() + || !_cloud_storage_api.local_is_initialized()) { + return std::nullopt; + } + return cloud_storage_clients::bucket_name(bucket_property().value()); +} + ss::future<> controller::wire_up() { return _as.start() .then([this] { return _members_table.start(); }) @@ -606,14 +619,11 @@ controller::start(cluster_discovery& discovery, ss::abort_source& shard0_as) { &partition_balancer_backend::start); }) .then([this] { - auto& bucket_property - = cloud_storage::configuration::get_bucket_config(); - if ( - !bucket_property.is_overriden() || !bucket_property().has_value() - || !_cloud_storage_api.local_is_initialized()) { + auto bucket_opt = get_configured_bucket(); + if (!bucket_opt.has_value()) { return; } - cloud_storage_clients::bucket_name bucket(bucket_property().value()); + cloud_storage_clients::bucket_name bucket = bucket_opt.value(); _metadata_uploader = std::make_unique( _raft_manager.local(), _storage.local(), @@ -735,8 +745,28 @@ ss::future<> controller::stop() { }); } -ss::future<> -controller::create_cluster(const bootstrap_cluster_cmd_data cmd_data) { +ss::future +controller::find_cluster_manifest_in_cloud() { + if (!config::shard_local_cfg() + .cloud_storage_attempt_cluster_recovery_on_bootstrap.value()) { + co_return cloud_metadata::error_outcome::no_matching_metadata; + } + auto& bucket_property = cloud_storage::configuration::get_bucket_config(); + if ( + !bucket_property.is_overriden() || !bucket_property().has_value() + || !_cloud_storage_api.local_is_initialized()) { + vlog( + clusterlog.debug, + "Cloud not configured, skipping cluster recovery check"); + co_return cloud_metadata::error_outcome::no_matching_metadata; + } + cloud_storage_clients::bucket_name bucket(bucket_property().value()); + retry_chain_node retry_node(_as.local()); + co_return co_await cloud_metadata::download_highest_manifest_in_bucket( + _cloud_storage_api.local(), bucket, retry_node); +} + +ss::future<> controller::create_cluster(bootstrap_cluster_cmd_data cmd_data) { vassert( ss::this_shard_id() == controller_stm_shard, "Cluster can only be created from controller_stm_shard"); @@ -764,6 +794,44 @@ controller::create_cluster(const bootstrap_cluster_cmd_data cmd_data) { co_return; } + // Check if there is any cluster metadata in the cloud. + auto bucket_opt = get_configured_bucket(); + if (bucket_opt.has_value()) { + retry_chain_node retry_node(_as.local(), 300s, 5s); + auto res + = co_await cloud_metadata::download_highest_manifest_in_bucket( + _cloud_storage_api.local(), bucket_opt.value(), retry_node); + if (res.has_value()) { + vlog( + clusterlog.info, + "Found cluster metadata manifest {} in bucket {}", + res.value(), + bucket_opt.value()); + cmd_data.recovery_manifest = std::move(res.value()); + cmd_data.recovery_bucket = bucket_opt.value(); + } else { + const auto& err = res.error(); + if ( + err == cloud_metadata::error_outcome::no_matching_metadata) { + vlog( + clusterlog.info, + "No cluster manifest in bucket {}, proceeding without " + "recovery", + bucket_opt.value()); + } else { + vlog( + clusterlog.error, + "Error looking for cluster recovery material in cloud, " + "retrying: {}", + err); + co_await ss::sleep_abortable( + retry_jitter.next_duration(), _as.local()); + continue; + } + // No metadata in cloud: this isn't a recovery cluster. + } + } + vlog(clusterlog.info, "Creating cluster UUID {}", cmd_data.uuid); const std::error_code errc = co_await replicate_and_wait( _stm, diff --git a/src/v/cluster/controller.h b/src/v/cluster/controller.h index 5c43807aab5cb..0787887611205 100644 --- a/src/v/cluster/controller.h +++ b/src/v/cluster/controller.h @@ -12,6 +12,7 @@ #pragma once #include "cloud_storage/fwd.h" +#include "cluster/cloud_metadata/cluster_manifest.h" #include "cluster/controller_probe.h" #include "cluster/controller_stm.h" #include "cluster/fwd.h" @@ -234,6 +235,18 @@ class controller { ss::future<> cluster_creation_hook(cluster_discovery& discovery); + std::optional get_configured_bucket(); + + /** + * Looks in the cloud for a cluster metadata manifest with which to recover + * a cluster. + * + * Returns \c no_matching_metadata if there is no metadata or cloud isn't + * configured. + */ + ss::future + find_cluster_manifest_in_cloud(); + // Checks configuration invariants stored in kvstore ss::future<> validate_configuration_invariants(); diff --git a/tests/rptest/tests/cluster_recovery_test.py b/tests/rptest/tests/cluster_recovery_test.py index aeb0605f261ee..805ac2ded19e1 100644 --- a/tests/rptest/tests/cluster_recovery_test.py +++ b/tests/rptest/tests/cluster_recovery_test.py @@ -131,3 +131,50 @@ def cluster_recovery_complete(): if u in l and "ALLOW" in l and "DESCRIBE" in l: found = True assert found, f"Couldn't find {u} in {acls_lines}" + + @cluster(num_nodes=4) + def test_bootstrap_with_recovery(self): + """ + Smoke test that configuring automated recovery at bootstrap will kick + in as appropriate. + """ + rpk = RpkTool(self.redpanda) + rpk.cluster_config_set( + "cloud_storage_attempt_cluster_recovery_on_bootstrap", True) + for t in self.topics: + KgoVerifierProducer.oneshot(self.test_context, + self.redpanda, + t.name, + self.message_size, + 100, + batch_max_bytes=self.message_size * 8, + timeout_sec=60) + quiesce_uploads(self.redpanda, [t.name for t in self.topics], + timeout_sec=60) + time.sleep(5) + + self.redpanda.stop() + for n in self.redpanda.nodes: + self.redpanda.remove_local_data(n) + + # Restart the nodes, overriding the recovery bootstrap config. + extra_rp_conf = dict( + cloud_storage_attempt_cluster_recovery_on_bootstrap=True) + self.redpanda.set_extra_rp_conf(extra_rp_conf) + self.redpanda.write_bootstrap_cluster_config() + self.redpanda.restart_nodes(self.redpanda.nodes, + override_cfg_params=extra_rp_conf) + + # We should see a recovery begin automatically. + self.redpanda._admin.await_stable_leader("controller", + partition=0, + namespace='redpanda', + timeout_s=60, + backoff_s=2) + + def cluster_recovery_complete(): + return "recovery_stage::complete" in self.redpanda._admin.get_cluster_recovery_status( + ).json()["state"] + + wait_until(cluster_recovery_complete, timeout_sec=30, backoff_sec=1) + self.redpanda.restart_nodes(self.redpanda.nodes)