From ab17ca12fee5246ef5cccfa18ee763c24f23a855 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Mon, 20 Nov 2023 21:12:46 -0800 Subject: [PATCH] cluster: look for cluster manifest and start recovery on bootstrap This updates the cluster bootstrap sequence such that, when configured, Redpanda will look for cluster metadata in the cloud and then immediately start a recovery if any is found. --- src/v/cluster/controller.cc | 84 +++++++++++++++++++-- src/v/cluster/controller.h | 13 ++++ tests/rptest/tests/cluster_recovery_test.py | 47 ++++++++++++ 3 files changed, 136 insertions(+), 8 deletions(-) diff --git a/src/v/cluster/controller.cc b/src/v/cluster/controller.cc index 43a68cd3e7e3f..4465d27b409bd 100644 --- a/src/v/cluster/controller.cc +++ b/src/v/cluster/controller.cc @@ -11,6 +11,8 @@ #include "cluster/bootstrap_backend.h" #include "cluster/cloud_metadata/cluster_recovery_backend.h" +#include "cluster/cloud_metadata/error_outcome.h" +#include "cluster/cloud_metadata/manifest_downloads.h" #include "cluster/cloud_metadata/uploader.h" #include "cluster/cluster_discovery.h" #include "cluster/cluster_recovery_table.h" @@ -109,6 +111,17 @@ controller::controller( // includes for destructors of all its members (e.g. the metadata uploader). controller::~controller() = default; +std::optional +controller::get_configured_bucket() { + auto& bucket_property = cloud_storage::configuration::get_bucket_config(); + if ( + !bucket_property.is_overriden() || !bucket_property().has_value() + || !_cloud_storage_api.local_is_initialized()) { + return std::nullopt; + } + return cloud_storage_clients::bucket_name(bucket_property().value()); +} + ss::future<> controller::wire_up() { return _as.start() .then([this] { return _members_table.start(); }) @@ -606,14 +619,11 @@ controller::start(cluster_discovery& discovery, ss::abort_source& shard0_as) { &partition_balancer_backend::start); }) .then([this] { - auto& bucket_property - = cloud_storage::configuration::get_bucket_config(); - if ( - !bucket_property.is_overriden() || !bucket_property().has_value() - || !_cloud_storage_api.local_is_initialized()) { + auto bucket_opt = get_configured_bucket(); + if (!bucket_opt.has_value()) { return; } - cloud_storage_clients::bucket_name bucket(bucket_property().value()); + cloud_storage_clients::bucket_name bucket = bucket_opt.value(); _metadata_uploader = std::make_unique( _raft_manager.local(), _storage.local(), @@ -735,8 +745,28 @@ ss::future<> controller::stop() { }); } -ss::future<> -controller::create_cluster(const bootstrap_cluster_cmd_data cmd_data) { +ss::future +controller::find_cluster_manifest_in_cloud() { + if (!config::shard_local_cfg() + .cloud_storage_attempt_cluster_recovery_on_bootstrap.value()) { + co_return cloud_metadata::error_outcome::no_matching_metadata; + } + auto& bucket_property = cloud_storage::configuration::get_bucket_config(); + if ( + !bucket_property.is_overriden() || !bucket_property().has_value() + || !_cloud_storage_api.local_is_initialized()) { + vlog( + clusterlog.debug, + "Cloud not configured, skipping cluster recovery check"); + co_return cloud_metadata::error_outcome::no_matching_metadata; + } + cloud_storage_clients::bucket_name bucket(bucket_property().value()); + retry_chain_node retry_node(_as.local()); + co_return co_await cloud_metadata::download_highest_manifest_in_bucket( + _cloud_storage_api.local(), bucket, retry_node); +} + +ss::future<> controller::create_cluster(bootstrap_cluster_cmd_data cmd_data) { vassert( ss::this_shard_id() == controller_stm_shard, "Cluster can only be created from controller_stm_shard"); @@ -764,6 +794,44 @@ controller::create_cluster(const bootstrap_cluster_cmd_data cmd_data) { co_return; } + // Check if there is any cluster metadata in the cloud. + auto bucket_opt = get_configured_bucket(); + if (bucket_opt.has_value()) { + retry_chain_node retry_node(_as.local(), 300s, 5s); + auto res + = co_await cloud_metadata::download_highest_manifest_in_bucket( + _cloud_storage_api.local(), bucket_opt.value(), retry_node); + if (res.has_value()) { + vlog( + clusterlog.info, + "Found cluster metadata manifest {} in bucket {}", + res.value(), + bucket_opt.value()); + cmd_data.recovery_manifest = std::move(res.value()); + cmd_data.recovery_bucket = bucket_opt.value(); + } else { + const auto& err = res.error(); + if ( + err == cloud_metadata::error_outcome::no_matching_metadata) { + vlog( + clusterlog.info, + "No cluster manifest in bucket {}, proceeding without " + "recovery", + bucket_opt.value()); + } else { + vlog( + clusterlog.error, + "Error looking for cluster recovery material in cloud, " + "retrying: {}", + err); + co_await ss::sleep_abortable( + retry_jitter.next_duration(), _as.local()); + continue; + } + // No metadata in cloud: this isn't a recovery cluster. + } + } + vlog(clusterlog.info, "Creating cluster UUID {}", cmd_data.uuid); const std::error_code errc = co_await replicate_and_wait( _stm, diff --git a/src/v/cluster/controller.h b/src/v/cluster/controller.h index 5c43807aab5cb..0787887611205 100644 --- a/src/v/cluster/controller.h +++ b/src/v/cluster/controller.h @@ -12,6 +12,7 @@ #pragma once #include "cloud_storage/fwd.h" +#include "cluster/cloud_metadata/cluster_manifest.h" #include "cluster/controller_probe.h" #include "cluster/controller_stm.h" #include "cluster/fwd.h" @@ -234,6 +235,18 @@ class controller { ss::future<> cluster_creation_hook(cluster_discovery& discovery); + std::optional get_configured_bucket(); + + /** + * Looks in the cloud for a cluster metadata manifest with which to recover + * a cluster. + * + * Returns \c no_matching_metadata if there is no metadata or cloud isn't + * configured. + */ + ss::future + find_cluster_manifest_in_cloud(); + // Checks configuration invariants stored in kvstore ss::future<> validate_configuration_invariants(); diff --git a/tests/rptest/tests/cluster_recovery_test.py b/tests/rptest/tests/cluster_recovery_test.py index aeb0605f261ee..805ac2ded19e1 100644 --- a/tests/rptest/tests/cluster_recovery_test.py +++ b/tests/rptest/tests/cluster_recovery_test.py @@ -131,3 +131,50 @@ def cluster_recovery_complete(): if u in l and "ALLOW" in l and "DESCRIBE" in l: found = True assert found, f"Couldn't find {u} in {acls_lines}" + + @cluster(num_nodes=4) + def test_bootstrap_with_recovery(self): + """ + Smoke test that configuring automated recovery at bootstrap will kick + in as appropriate. + """ + rpk = RpkTool(self.redpanda) + rpk.cluster_config_set( + "cloud_storage_attempt_cluster_recovery_on_bootstrap", True) + for t in self.topics: + KgoVerifierProducer.oneshot(self.test_context, + self.redpanda, + t.name, + self.message_size, + 100, + batch_max_bytes=self.message_size * 8, + timeout_sec=60) + quiesce_uploads(self.redpanda, [t.name for t in self.topics], + timeout_sec=60) + time.sleep(5) + + self.redpanda.stop() + for n in self.redpanda.nodes: + self.redpanda.remove_local_data(n) + + # Restart the nodes, overriding the recovery bootstrap config. + extra_rp_conf = dict( + cloud_storage_attempt_cluster_recovery_on_bootstrap=True) + self.redpanda.set_extra_rp_conf(extra_rp_conf) + self.redpanda.write_bootstrap_cluster_config() + self.redpanda.restart_nodes(self.redpanda.nodes, + override_cfg_params=extra_rp_conf) + + # We should see a recovery begin automatically. + self.redpanda._admin.await_stable_leader("controller", + partition=0, + namespace='redpanda', + timeout_s=60, + backoff_s=2) + + def cluster_recovery_complete(): + return "recovery_stage::complete" in self.redpanda._admin.get_cluster_recovery_status( + ).json()["state"] + + wait_until(cluster_recovery_complete, timeout_sec=30, backoff_sec=1) + self.redpanda.restart_nodes(self.redpanda.nodes)