From 2ec9858174f5a1a610f3c1fadf75cf251330068c Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Fri, 24 Nov 2023 00:56:12 -0800 Subject: [PATCH] cluster: look for cluster manifest and start recovery on bootstrap This updates the cluster bootstrap sequence such that, when configured, Redpanda will look for cluster metadata in the cloud and then immediately start a recovery if any is found. --- src/v/cluster/bootstrap_backend.cc | 17 ++++- src/v/cluster/bootstrap_backend.h | 4 +- src/v/cluster/cluster_recovery_table.cc | 24 +++++-- src/v/cluster/cluster_recovery_table.h | 5 ++ src/v/cluster/controller.cc | 72 ++++++++++++++++++--- src/v/cluster/controller.h | 2 + tests/rptest/tests/cluster_recovery_test.py | 47 ++++++++++++++ 7 files changed, 152 insertions(+), 19 deletions(-) diff --git a/src/v/cluster/bootstrap_backend.cc b/src/v/cluster/bootstrap_backend.cc index 3638bd68340f..e4e72e77e09a 100644 --- a/src/v/cluster/bootstrap_backend.cc +++ b/src/v/cluster/bootstrap_backend.cc @@ -15,6 +15,7 @@ #include "cluster/commands.h" #include "cluster/controller_snapshot.h" #include "cluster/feature_backend.h" +#include "cluster/fwd.h" #include "cluster/logger.h" #include "cluster/members_manager.h" #include "cluster/types.h" @@ -40,12 +41,14 @@ bootstrap_backend::bootstrap_backend( ss::sharded& storage, ss::sharded& members_manager, ss::sharded& feature_table, - ss::sharded& feature_backend) + ss::sharded& feature_backend, + ss::sharded& cluster_recovery_table) : _credentials(credentials) , _storage(storage) , _members_manager(members_manager) , _feature_table(feature_table) - , _feature_backend(feature_backend) {} + , _feature_backend(feature_backend) + , _cluster_recovery_table(cluster_recovery_table) {} namespace { @@ -194,6 +197,16 @@ bootstrap_backend::apply(bootstrap_cluster_cmd cmd, model::offset offset) { } } + // If this is a recovery cluster, initialize recovery state. + if (cmd.value.recovery_state.has_value()) { + co_await _cluster_recovery_table.invoke_on_all( + [o = offset, + m = cmd.value.recovery_state->manifest, + b = cmd.value.recovery_state->bucket](auto& recovery_table) { + recovery_table.apply(o, m, b, wait_for_nodes::yes); + }); + } + co_await apply_cluster_uuid(cmd.value.uuid); co_return errc::success; diff --git a/src/v/cluster/bootstrap_backend.h b/src/v/cluster/bootstrap_backend.h index ad92d3ff3887..54b572c866eb 100644 --- a/src/v/cluster/bootstrap_backend.h +++ b/src/v/cluster/bootstrap_backend.h @@ -44,7 +44,8 @@ class bootstrap_backend final { ss::sharded&, ss::sharded&, ss::sharded&, - ss::sharded&); + ss::sharded&, + ss::sharded&); ss::future apply_update(model::record_batch); @@ -65,6 +66,7 @@ class bootstrap_backend final { ss::sharded& _members_manager; ss::sharded& _feature_table; ss::sharded& _feature_backend; + ss::sharded& _cluster_recovery_table; std::optional _cluster_uuid_applied; }; diff --git a/src/v/cluster/cluster_recovery_table.cc b/src/v/cluster/cluster_recovery_table.cc index 15d7d7e8b948..f33ffb3e19a0 100644 --- a/src/v/cluster/cluster_recovery_table.cc +++ b/src/v/cluster/cluster_recovery_table.cc @@ -54,7 +54,10 @@ ss::future<> cluster_recovery_table::wait_for_active_recovery() { } std::error_code cluster_recovery_table::apply( - model::offset offset, cluster_recovery_init_cmd cmd) { + model::offset offset, + cloud_metadata::cluster_metadata_manifest manifest, + cloud_storage_clients::bucket_name bucket, + wait_for_nodes wait_for_nodes) { if (!_states.empty() && _states.back().is_active()) { return errc::update_in_progress; } @@ -63,18 +66,25 @@ std::error_code cluster_recovery_table::apply( vlog( clusterlog.info, "Initializing cluster recovery at offset {} with manifest {} from bucket " - "{}", + "{}, waiting for nodes: {}", offset, - cmd.value.state.manifest, - cmd.value.state.bucket); + manifest, + bucket, + wait_for_nodes); _states.emplace_back( - std::move(cmd.value.state.manifest), - std::move(cmd.value.state.bucket), - wait_for_nodes::no); + std::move(manifest), std::move(bucket), wait_for_nodes); _has_active_recovery.signal(); return errc::success; } +std::error_code cluster_recovery_table::apply( + model::offset offset, cluster_recovery_init_cmd cmd) { + return apply( + offset, + std::move(cmd.value.state.manifest), + std::move(cmd.value.state.bucket)); +} + std::error_code cluster_recovery_table::apply(model::offset, cluster_recovery_update_cmd cmd) { if (_states.empty()) { diff --git a/src/v/cluster/cluster_recovery_table.h b/src/v/cluster/cluster_recovery_table.h index c41f7863e85d..d70948a48812 100644 --- a/src/v/cluster/cluster_recovery_table.h +++ b/src/v/cluster/cluster_recovery_table.h @@ -92,6 +92,11 @@ class cluster_recovery_table { return std::reference_wrapper(_states.back()); } + std::error_code apply( + model::offset offset, + cloud_metadata::cluster_metadata_manifest, + cloud_storage_clients::bucket_name, + wait_for_nodes wait = wait_for_nodes::no); std::error_code apply(model::offset offset, cluster_recovery_init_cmd); std::error_code apply(model::offset offset, cluster_recovery_update_cmd); diff --git a/src/v/cluster/controller.cc b/src/v/cluster/controller.cc index 40ea9da57c01..3a726cec6561 100644 --- a/src/v/cluster/controller.cc +++ b/src/v/cluster/controller.cc @@ -10,7 +10,10 @@ #include "cluster/controller.h" #include "cluster/bootstrap_backend.h" +#include "cluster/cloud_metadata/cluster_manifest.h" #include "cluster/cloud_metadata/cluster_recovery_backend.h" +#include "cluster/cloud_metadata/error_outcome.h" +#include "cluster/cloud_metadata/manifest_downloads.h" #include "cluster/cloud_metadata/offsets_upload_rpc_types.h" #include "cluster/cloud_metadata/producer_id_recovery_manager.h" #include "cluster/cloud_metadata/uploader.h" @@ -111,6 +114,17 @@ controller::controller( // includes for destructors of all its members (e.g. the metadata uploader). controller::~controller() = default; +std::optional +controller::get_configured_bucket() { + auto& bucket_property = cloud_storage::configuration::get_bucket_config(); + if ( + !bucket_property.is_overriden() || !bucket_property().has_value() + || !_cloud_storage_api.local_is_initialized()) { + return std::nullopt; + } + return cloud_storage_clients::bucket_name(bucket_property().value()); +} + ss::future<> controller::wire_up() { return _as.start() .then([this] { return _members_table.start(); }) @@ -217,7 +231,8 @@ ss::future<> controller::start( std::ref(_storage), std::ref(_members_manager), std::ref(_feature_table), - std::ref(_feature_backend)); + std::ref(_feature_backend), + std::ref(_recovery_table)); }) .then([this] { return _recovery_table.start(); }) .then([this] { @@ -612,14 +627,11 @@ ss::future<> controller::start( &partition_balancer_backend::start); }) .then([this, offsets_uploader, producer_id_recovery] { - auto& bucket_property - = cloud_storage::configuration::get_bucket_config(); - if ( - !bucket_property.is_overriden() || !bucket_property().has_value() - || !_cloud_storage_api.local_is_initialized()) { + auto bucket_opt = get_configured_bucket(); + if (!bucket_opt.has_value()) { return; } - cloud_storage_clients::bucket_name bucket(bucket_property().value()); + cloud_storage_clients::bucket_name bucket = bucket_opt.value(); _metadata_uploader = std::make_unique( _raft_manager.local(), _storage.local(), @@ -745,8 +757,7 @@ ss::future<> controller::stop() { }); } -ss::future<> -controller::create_cluster(const bootstrap_cluster_cmd_data cmd_data) { +ss::future<> controller::create_cluster(bootstrap_cluster_cmd_data cmd_data) { vassert( ss::this_shard_id() == controller_stm_shard, "Cluster can only be created from controller_stm_shard"); @@ -774,6 +785,49 @@ controller::create_cluster(const bootstrap_cluster_cmd_data cmd_data) { co_return; } + // Check if there is any cluster metadata in the cloud. + auto bucket_opt = get_configured_bucket(); + if ( + bucket_opt.has_value() + && config::shard_local_cfg() + .cloud_storage_attempt_cluster_recovery_on_bootstrap.value()) { + retry_chain_node retry_node(_as.local(), 300s, 5s); + auto res + = co_await cloud_metadata::download_highest_manifest_in_bucket( + _cloud_storage_api.local(), bucket_opt.value(), retry_node); + if (res.has_value()) { + vlog( + clusterlog.info, + "Found cluster metadata manifest {} in bucket {}", + res.value(), + bucket_opt.value()); + cmd_data.recovery_state.emplace(); + cmd_data.recovery_state->manifest = std::move(res.value()); + cmd_data.recovery_state->bucket = bucket_opt.value(); + // Proceed with recovery via cluster bootstrap. + } else { + const auto& err = res.error(); + if ( + err == cloud_metadata::error_outcome::no_matching_metadata) { + vlog( + clusterlog.info, + "No cluster manifest in bucket {}, proceeding without " + "recovery", + bucket_opt.value()); + // Fall through to regular cluster bootstrap. + } else { + vlog( + clusterlog.error, + "Error looking for cluster recovery material in cloud, " + "retrying: {}", + err); + co_await ss::sleep_abortable( + retry_jitter.next_duration(), _as.local()); + continue; + } + } + } + vlog(clusterlog.info, "Creating cluster UUID {}", cmd_data.uuid); const std::error_code errc = co_await replicate_and_wait( _stm, diff --git a/src/v/cluster/controller.h b/src/v/cluster/controller.h index 178dba9ca1b2..f535c161b889 100644 --- a/src/v/cluster/controller.h +++ b/src/v/cluster/controller.h @@ -241,6 +241,8 @@ class controller { ss::future<> cluster_creation_hook(cluster_discovery& discovery); + std::optional get_configured_bucket(); + // Checks configuration invariants stored in kvstore ss::future<> validate_configuration_invariants(); diff --git a/tests/rptest/tests/cluster_recovery_test.py b/tests/rptest/tests/cluster_recovery_test.py index aeb0605f261e..0de4e8c5ab6e 100644 --- a/tests/rptest/tests/cluster_recovery_test.py +++ b/tests/rptest/tests/cluster_recovery_test.py @@ -131,3 +131,50 @@ def cluster_recovery_complete(): if u in l and "ALLOW" in l and "DESCRIBE" in l: found = True assert found, f"Couldn't find {u} in {acls_lines}" + + @cluster(num_nodes=4) + def test_bootstrap_with_recovery(self): + """ + Smoke test that configuring automated recovery at bootstrap will kick + in as appropriate. + """ + rpk = RpkTool(self.redpanda) + rpk.cluster_config_set( + "cloud_storage_attempt_cluster_recovery_on_bootstrap", True) + for t in self.topics: + KgoVerifierProducer.oneshot(self.test_context, + self.redpanda, + t.name, + self.message_size, + 100, + batch_max_bytes=self.message_size * 8, + timeout_sec=60) + quiesce_uploads(self.redpanda, [t.name for t in self.topics], + timeout_sec=60) + time.sleep(5) + + self.redpanda.stop() + for n in self.redpanda.nodes: + self.redpanda.remove_local_data(n) + + # Restart the nodes, overriding the recovery bootstrap config. + extra_rp_conf = dict( + cloud_storage_attempt_cluster_recovery_on_bootstrap=True) + self.redpanda.set_extra_rp_conf(extra_rp_conf) + self.redpanda.write_bootstrap_cluster_config() + self.redpanda.restart_nodes(self.redpanda.nodes, + override_cfg_params=extra_rp_conf) + + # We should see a recovery begin automatically. + self.redpanda._admin.await_stable_leader("controller", + partition=0, + namespace='redpanda', + timeout_s=60, + backoff_s=2) + + def cluster_recovery_complete(): + return "recovery_stage::complete" in self.redpanda._admin.get_cluster_recovery_status( + ).json()["state"] + + wait_until(cluster_recovery_complete, timeout_sec=60, backoff_sec=1) + self.redpanda.restart_nodes(self.redpanda.nodes)