diff --git a/src/v/cluster/bootstrap_backend.cc b/src/v/cluster/bootstrap_backend.cc index 3638bd68340ff..e4e72e77e09a3 100644 --- a/src/v/cluster/bootstrap_backend.cc +++ b/src/v/cluster/bootstrap_backend.cc @@ -15,6 +15,7 @@ #include "cluster/commands.h" #include "cluster/controller_snapshot.h" #include "cluster/feature_backend.h" +#include "cluster/fwd.h" #include "cluster/logger.h" #include "cluster/members_manager.h" #include "cluster/types.h" @@ -40,12 +41,14 @@ bootstrap_backend::bootstrap_backend( ss::sharded& storage, ss::sharded& members_manager, ss::sharded& feature_table, - ss::sharded& feature_backend) + ss::sharded& feature_backend, + ss::sharded& cluster_recovery_table) : _credentials(credentials) , _storage(storage) , _members_manager(members_manager) , _feature_table(feature_table) - , _feature_backend(feature_backend) {} + , _feature_backend(feature_backend) + , _cluster_recovery_table(cluster_recovery_table) {} namespace { @@ -194,6 +197,16 @@ bootstrap_backend::apply(bootstrap_cluster_cmd cmd, model::offset offset) { } } + // If this is a recovery cluster, initialize recovery state. + if (cmd.value.recovery_state.has_value()) { + co_await _cluster_recovery_table.invoke_on_all( + [o = offset, + m = cmd.value.recovery_state->manifest, + b = cmd.value.recovery_state->bucket](auto& recovery_table) { + recovery_table.apply(o, m, b, wait_for_nodes::yes); + }); + } + co_await apply_cluster_uuid(cmd.value.uuid); co_return errc::success; diff --git a/src/v/cluster/bootstrap_backend.h b/src/v/cluster/bootstrap_backend.h index ad92d3ff3887e..54b572c866ebb 100644 --- a/src/v/cluster/bootstrap_backend.h +++ b/src/v/cluster/bootstrap_backend.h @@ -44,7 +44,8 @@ class bootstrap_backend final { ss::sharded&, ss::sharded&, ss::sharded&, - ss::sharded&); + ss::sharded&, + ss::sharded&); ss::future apply_update(model::record_batch); @@ -65,6 +66,7 @@ class bootstrap_backend final { ss::sharded& _members_manager; ss::sharded& _feature_table; ss::sharded& _feature_backend; + ss::sharded& _cluster_recovery_table; std::optional _cluster_uuid_applied; }; diff --git a/src/v/cluster/cluster_recovery_table.cc b/src/v/cluster/cluster_recovery_table.cc index 15d7d7e8b948e..f33ffb3e19a0d 100644 --- a/src/v/cluster/cluster_recovery_table.cc +++ b/src/v/cluster/cluster_recovery_table.cc @@ -54,7 +54,10 @@ ss::future<> cluster_recovery_table::wait_for_active_recovery() { } std::error_code cluster_recovery_table::apply( - model::offset offset, cluster_recovery_init_cmd cmd) { + model::offset offset, + cloud_metadata::cluster_metadata_manifest manifest, + cloud_storage_clients::bucket_name bucket, + wait_for_nodes wait_for_nodes) { if (!_states.empty() && _states.back().is_active()) { return errc::update_in_progress; } @@ -63,18 +66,25 @@ std::error_code cluster_recovery_table::apply( vlog( clusterlog.info, "Initializing cluster recovery at offset {} with manifest {} from bucket " - "{}", + "{}, waiting for nodes: {}", offset, - cmd.value.state.manifest, - cmd.value.state.bucket); + manifest, + bucket, + wait_for_nodes); _states.emplace_back( - std::move(cmd.value.state.manifest), - std::move(cmd.value.state.bucket), - wait_for_nodes::no); + std::move(manifest), std::move(bucket), wait_for_nodes); _has_active_recovery.signal(); return errc::success; } +std::error_code cluster_recovery_table::apply( + model::offset offset, cluster_recovery_init_cmd cmd) { + return apply( + offset, + std::move(cmd.value.state.manifest), + std::move(cmd.value.state.bucket)); +} + std::error_code cluster_recovery_table::apply(model::offset, cluster_recovery_update_cmd cmd) { if (_states.empty()) { diff --git a/src/v/cluster/cluster_recovery_table.h b/src/v/cluster/cluster_recovery_table.h index c41f7863e85d4..d70948a488121 100644 --- a/src/v/cluster/cluster_recovery_table.h +++ b/src/v/cluster/cluster_recovery_table.h @@ -92,6 +92,11 @@ class cluster_recovery_table { return std::reference_wrapper(_states.back()); } + std::error_code apply( + model::offset offset, + cloud_metadata::cluster_metadata_manifest, + cloud_storage_clients::bucket_name, + wait_for_nodes wait = wait_for_nodes::no); std::error_code apply(model::offset offset, cluster_recovery_init_cmd); std::error_code apply(model::offset offset, cluster_recovery_update_cmd); diff --git a/src/v/cluster/controller.cc b/src/v/cluster/controller.cc index 40ea9da57c013..3a726cec65611 100644 --- a/src/v/cluster/controller.cc +++ b/src/v/cluster/controller.cc @@ -10,7 +10,10 @@ #include "cluster/controller.h" #include "cluster/bootstrap_backend.h" +#include "cluster/cloud_metadata/cluster_manifest.h" #include "cluster/cloud_metadata/cluster_recovery_backend.h" +#include "cluster/cloud_metadata/error_outcome.h" +#include "cluster/cloud_metadata/manifest_downloads.h" #include "cluster/cloud_metadata/offsets_upload_rpc_types.h" #include "cluster/cloud_metadata/producer_id_recovery_manager.h" #include "cluster/cloud_metadata/uploader.h" @@ -111,6 +114,17 @@ controller::controller( // includes for destructors of all its members (e.g. the metadata uploader). controller::~controller() = default; +std::optional +controller::get_configured_bucket() { + auto& bucket_property = cloud_storage::configuration::get_bucket_config(); + if ( + !bucket_property.is_overriden() || !bucket_property().has_value() + || !_cloud_storage_api.local_is_initialized()) { + return std::nullopt; + } + return cloud_storage_clients::bucket_name(bucket_property().value()); +} + ss::future<> controller::wire_up() { return _as.start() .then([this] { return _members_table.start(); }) @@ -217,7 +231,8 @@ ss::future<> controller::start( std::ref(_storage), std::ref(_members_manager), std::ref(_feature_table), - std::ref(_feature_backend)); + std::ref(_feature_backend), + std::ref(_recovery_table)); }) .then([this] { return _recovery_table.start(); }) .then([this] { @@ -612,14 +627,11 @@ ss::future<> controller::start( &partition_balancer_backend::start); }) .then([this, offsets_uploader, producer_id_recovery] { - auto& bucket_property - = cloud_storage::configuration::get_bucket_config(); - if ( - !bucket_property.is_overriden() || !bucket_property().has_value() - || !_cloud_storage_api.local_is_initialized()) { + auto bucket_opt = get_configured_bucket(); + if (!bucket_opt.has_value()) { return; } - cloud_storage_clients::bucket_name bucket(bucket_property().value()); + cloud_storage_clients::bucket_name bucket = bucket_opt.value(); _metadata_uploader = std::make_unique( _raft_manager.local(), _storage.local(), @@ -745,8 +757,7 @@ ss::future<> controller::stop() { }); } -ss::future<> -controller::create_cluster(const bootstrap_cluster_cmd_data cmd_data) { +ss::future<> controller::create_cluster(bootstrap_cluster_cmd_data cmd_data) { vassert( ss::this_shard_id() == controller_stm_shard, "Cluster can only be created from controller_stm_shard"); @@ -774,6 +785,49 @@ controller::create_cluster(const bootstrap_cluster_cmd_data cmd_data) { co_return; } + // Check if there is any cluster metadata in the cloud. + auto bucket_opt = get_configured_bucket(); + if ( + bucket_opt.has_value() + && config::shard_local_cfg() + .cloud_storage_attempt_cluster_recovery_on_bootstrap.value()) { + retry_chain_node retry_node(_as.local(), 300s, 5s); + auto res + = co_await cloud_metadata::download_highest_manifest_in_bucket( + _cloud_storage_api.local(), bucket_opt.value(), retry_node); + if (res.has_value()) { + vlog( + clusterlog.info, + "Found cluster metadata manifest {} in bucket {}", + res.value(), + bucket_opt.value()); + cmd_data.recovery_state.emplace(); + cmd_data.recovery_state->manifest = std::move(res.value()); + cmd_data.recovery_state->bucket = bucket_opt.value(); + // Proceed with recovery via cluster bootstrap. + } else { + const auto& err = res.error(); + if ( + err == cloud_metadata::error_outcome::no_matching_metadata) { + vlog( + clusterlog.info, + "No cluster manifest in bucket {}, proceeding without " + "recovery", + bucket_opt.value()); + // Fall through to regular cluster bootstrap. + } else { + vlog( + clusterlog.error, + "Error looking for cluster recovery material in cloud, " + "retrying: {}", + err); + co_await ss::sleep_abortable( + retry_jitter.next_duration(), _as.local()); + continue; + } + } + } + vlog(clusterlog.info, "Creating cluster UUID {}", cmd_data.uuid); const std::error_code errc = co_await replicate_and_wait( _stm, diff --git a/src/v/cluster/controller.h b/src/v/cluster/controller.h index 178dba9ca1b26..f535c161b889f 100644 --- a/src/v/cluster/controller.h +++ b/src/v/cluster/controller.h @@ -241,6 +241,8 @@ class controller { ss::future<> cluster_creation_hook(cluster_discovery& discovery); + std::optional get_configured_bucket(); + // Checks configuration invariants stored in kvstore ss::future<> validate_configuration_invariants(); diff --git a/tests/rptest/tests/cluster_recovery_test.py b/tests/rptest/tests/cluster_recovery_test.py index aeb0605f261ee..0de4e8c5ab6ef 100644 --- a/tests/rptest/tests/cluster_recovery_test.py +++ b/tests/rptest/tests/cluster_recovery_test.py @@ -131,3 +131,50 @@ def cluster_recovery_complete(): if u in l and "ALLOW" in l and "DESCRIBE" in l: found = True assert found, f"Couldn't find {u} in {acls_lines}" + + @cluster(num_nodes=4) + def test_bootstrap_with_recovery(self): + """ + Smoke test that configuring automated recovery at bootstrap will kick + in as appropriate. + """ + rpk = RpkTool(self.redpanda) + rpk.cluster_config_set( + "cloud_storage_attempt_cluster_recovery_on_bootstrap", True) + for t in self.topics: + KgoVerifierProducer.oneshot(self.test_context, + self.redpanda, + t.name, + self.message_size, + 100, + batch_max_bytes=self.message_size * 8, + timeout_sec=60) + quiesce_uploads(self.redpanda, [t.name for t in self.topics], + timeout_sec=60) + time.sleep(5) + + self.redpanda.stop() + for n in self.redpanda.nodes: + self.redpanda.remove_local_data(n) + + # Restart the nodes, overriding the recovery bootstrap config. + extra_rp_conf = dict( + cloud_storage_attempt_cluster_recovery_on_bootstrap=True) + self.redpanda.set_extra_rp_conf(extra_rp_conf) + self.redpanda.write_bootstrap_cluster_config() + self.redpanda.restart_nodes(self.redpanda.nodes, + override_cfg_params=extra_rp_conf) + + # We should see a recovery begin automatically. + self.redpanda._admin.await_stable_leader("controller", + partition=0, + namespace='redpanda', + timeout_s=60, + backoff_s=2) + + def cluster_recovery_complete(): + return "recovery_stage::complete" in self.redpanda._admin.get_cluster_recovery_status( + ).json()["state"] + + wait_until(cluster_recovery_complete, timeout_sec=60, backoff_sec=1) + self.redpanda.restart_nodes(self.redpanda.nodes)