Skip to content

Commit

Permalink
Merge pull request redpanda-data#15066 from andrwng/cluster-recovery-…
Browse files Browse the repository at this point in the history
…bootstrap

Config to attempt cluster recovery when bootstrapping cluster
  • Loading branch information
andrwng authored Nov 28, 2023
2 parents 60fc98c + 2ec9858 commit 3327b9e
Show file tree
Hide file tree
Showing 14 changed files with 256 additions and 33 deletions.
17 changes: 15 additions & 2 deletions src/v/cluster/bootstrap_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "cluster/commands.h"
#include "cluster/controller_snapshot.h"
#include "cluster/feature_backend.h"
#include "cluster/fwd.h"
#include "cluster/logger.h"
#include "cluster/members_manager.h"
#include "cluster/types.h"
Expand All @@ -40,12 +41,14 @@ bootstrap_backend::bootstrap_backend(
ss::sharded<storage::api>& storage,
ss::sharded<members_manager>& members_manager,
ss::sharded<features::feature_table>& feature_table,
ss::sharded<feature_backend>& feature_backend)
ss::sharded<feature_backend>& feature_backend,
ss::sharded<cluster_recovery_table>& cluster_recovery_table)
: _credentials(credentials)
, _storage(storage)
, _members_manager(members_manager)
, _feature_table(feature_table)
, _feature_backend(feature_backend) {}
, _feature_backend(feature_backend)
, _cluster_recovery_table(cluster_recovery_table) {}

namespace {

Expand Down Expand Up @@ -194,6 +197,16 @@ bootstrap_backend::apply(bootstrap_cluster_cmd cmd, model::offset offset) {
}
}

// If this is a recovery cluster, initialize recovery state.
if (cmd.value.recovery_state.has_value()) {
co_await _cluster_recovery_table.invoke_on_all(
[o = offset,
m = cmd.value.recovery_state->manifest,
b = cmd.value.recovery_state->bucket](auto& recovery_table) {
recovery_table.apply(o, m, b, wait_for_nodes::yes);
});
}

co_await apply_cluster_uuid(cmd.value.uuid);

co_return errc::success;
Expand Down
4 changes: 3 additions & 1 deletion src/v/cluster/bootstrap_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class bootstrap_backend final {
ss::sharded<storage::api>&,
ss::sharded<members_manager>&,
ss::sharded<features::feature_table>&,
ss::sharded<feature_backend>&);
ss::sharded<feature_backend>&,
ss::sharded<cluster_recovery_table>&);

ss::future<std::error_code> apply_update(model::record_batch);

Expand All @@ -65,6 +66,7 @@ class bootstrap_backend final {
ss::sharded<members_manager>& _members_manager;
ss::sharded<features::feature_table>& _feature_table;
ss::sharded<feature_backend>& _feature_backend;
ss::sharded<cluster_recovery_table>& _cluster_recovery_table;
std::optional<model::cluster_uuid> _cluster_uuid_applied;
};

Expand Down
37 changes: 37 additions & 0 deletions src/v/cluster/cloud_metadata/cluster_recovery_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,43 @@ ss::future<> cluster_recovery_backend::recover_until_term_change() {
clusterlog.info,
"Downloaded controller snapshot. Proceeding with reconciliation...");

if (recovery_state.wait_for_nodes) {
const auto& nodes = controller_snap.value().members.nodes;
vlog(
clusterlog.info,
"Original cluster had {} nodes. Waiting for cluster "
"membership...",
nodes.size());
retry_chain_node membership_retry(term_as, 600s, 10s);
while (_members_table.node_count() < nodes.size()) {
if (term_as.abort_requested()) {
co_return;
}
auto permit = membership_retry.retry();
if (!permit.is_allowed) {
co_await _recovery_manager.replicate_update(
synced_term,
recovery_stage::failed,
ssx::sformat(
"Timed out waiting for cluster, {}/{} nodes...",
_members_table.node_count(),
nodes.size()));
co_return;
}
vlog(
clusterlog.info,
"Cluster only has reached {}/{} nodes, waiting...",
_members_table.node_count(),
nodes.size());
co_await ss::sleep_abortable(permit.delay, term_as);
}
vlog(
clusterlog.info,
"Cluster has reached {}/{} nodes, proceeding...",
_members_table.node_count(),
nodes.size());
}

// We may need to restore state from the controller snapshot.
cloud_metadata::controller_snapshot_reconciler reconciler(
_recovery_table.local(),
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/cluster_recovery_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ cluster_recovery_manager::initialize_recovery(
// Replicate an update to start recovery. Once applied, this will update
// the recovery table.
cluster_recovery_init_cmd_data data;
data.manifest = std::move(manifest);
data.bucket = std::move(bucket);
data.state.manifest = std::move(manifest);
data.state.bucket = std::move(bucket);
auto errc = co_await replicate_and_wait(
_controller_stm,
_sharded_as,
Expand Down
21 changes: 18 additions & 3 deletions src/v/cluster/cluster_recovery_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
#include "serde/envelope.h"
#include "serde/serde.h"

#include <seastar/util/bool_class.hh>

namespace cluster {

using wait_for_nodes = ss::bool_class<struct wait_tag>;

// Tracks the state of an on-going cluster recovery. This only exposes an
// interface to expose and modify the current status of recovery; an external
// caller should use these to actually drive recovery.
Expand All @@ -35,10 +39,14 @@ struct cluster_recovery_state
cluster_recovery_state() = default;
explicit cluster_recovery_state(
cloud_metadata::cluster_metadata_manifest manifest,
cloud_storage_clients::bucket_name bucket)
cloud_storage_clients::bucket_name bucket,
wait_for_nodes wait)
: manifest(std::move(manifest))
, bucket(std::move(bucket)) {}
auto serde_fields() { return std::tie(stage, manifest, bucket, error_msg); }
, bucket(std::move(bucket))
, wait_for_nodes(wait) {}
auto serde_fields() {
return std::tie(stage, manifest, bucket, wait_for_nodes, error_msg);
}

bool is_active() const {
return !(
Expand All @@ -58,6 +66,13 @@ struct cluster_recovery_state
// Bucket being recovered from.
cloud_storage_clients::bucket_name bucket;

// Whether the recovery should wait for the cluster to become at least the
// size of the original cluster before proceeding with recovery.
//
// This may be desirable when the recovery was started as a part of cluster
// bootstrap.
wait_for_nodes wait_for_nodes;

// Only applicable when failed.
std::optional<ss::sstring> error_msg;
};
Expand Down
23 changes: 18 additions & 5 deletions src/v/cluster/cluster_recovery_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "cluster/cluster_recovery_table.h"

#include "cluster/cloud_metadata/cluster_manifest.h"
#include "cluster/cluster_recovery_state.h"
#include "cluster/logger.h"
#include "cluster/types.h"

Expand Down Expand Up @@ -53,7 +54,10 @@ ss::future<> cluster_recovery_table::wait_for_active_recovery() {
}

std::error_code cluster_recovery_table::apply(
model::offset offset, cluster_recovery_init_cmd cmd) {
model::offset offset,
cloud_metadata::cluster_metadata_manifest manifest,
cloud_storage_clients::bucket_name bucket,
wait_for_nodes wait_for_nodes) {
if (!_states.empty() && _states.back().is_active()) {
return errc::update_in_progress;
}
Expand All @@ -62,16 +66,25 @@ std::error_code cluster_recovery_table::apply(
vlog(
clusterlog.info,
"Initializing cluster recovery at offset {} with manifest {} from bucket "
"{}",
"{}, waiting for nodes: {}",
offset,
cmd.value.manifest,
cmd.value.bucket);
manifest,
bucket,
wait_for_nodes);
_states.emplace_back(
std::move(cmd.value.manifest), std::move(cmd.value.bucket));
std::move(manifest), std::move(bucket), wait_for_nodes);
_has_active_recovery.signal();
return errc::success;
}

std::error_code cluster_recovery_table::apply(
model::offset offset, cluster_recovery_init_cmd cmd) {
return apply(
offset,
std::move(cmd.value.state.manifest),
std::move(cmd.value.state.bucket));
}

std::error_code
cluster_recovery_table::apply(model::offset, cluster_recovery_update_cmd cmd) {
if (_states.empty()) {
Expand Down
5 changes: 5 additions & 0 deletions src/v/cluster/cluster_recovery_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ class cluster_recovery_table {
return std::reference_wrapper(_states.back());
}

std::error_code apply(
model::offset offset,
cloud_metadata::cluster_metadata_manifest,
cloud_storage_clients::bucket_name,
wait_for_nodes wait = wait_for_nodes::no);
std::error_code apply(model::offset offset, cluster_recovery_init_cmd);
std::error_code apply(model::offset offset, cluster_recovery_update_cmd);

Expand Down
72 changes: 63 additions & 9 deletions src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
#include "cluster/controller.h"

#include "cluster/bootstrap_backend.h"
#include "cluster/cloud_metadata/cluster_manifest.h"
#include "cluster/cloud_metadata/cluster_recovery_backend.h"
#include "cluster/cloud_metadata/error_outcome.h"
#include "cluster/cloud_metadata/manifest_downloads.h"
#include "cluster/cloud_metadata/offsets_upload_rpc_types.h"
#include "cluster/cloud_metadata/producer_id_recovery_manager.h"
#include "cluster/cloud_metadata/uploader.h"
Expand Down Expand Up @@ -111,6 +114,17 @@ controller::controller(
// includes for destructors of all its members (e.g. the metadata uploader).
controller::~controller() = default;

std::optional<cloud_storage_clients::bucket_name>
controller::get_configured_bucket() {
auto& bucket_property = cloud_storage::configuration::get_bucket_config();
if (
!bucket_property.is_overriden() || !bucket_property().has_value()
|| !_cloud_storage_api.local_is_initialized()) {
return std::nullopt;
}
return cloud_storage_clients::bucket_name(bucket_property().value());
}

ss::future<> controller::wire_up() {
return _as.start()
.then([this] { return _members_table.start(); })
Expand Down Expand Up @@ -217,7 +231,8 @@ ss::future<> controller::start(
std::ref(_storage),
std::ref(_members_manager),
std::ref(_feature_table),
std::ref(_feature_backend));
std::ref(_feature_backend),
std::ref(_recovery_table));
})
.then([this] { return _recovery_table.start(); })
.then([this] {
Expand Down Expand Up @@ -612,14 +627,11 @@ ss::future<> controller::start(
&partition_balancer_backend::start);
})
.then([this, offsets_uploader, producer_id_recovery] {
auto& bucket_property
= cloud_storage::configuration::get_bucket_config();
if (
!bucket_property.is_overriden() || !bucket_property().has_value()
|| !_cloud_storage_api.local_is_initialized()) {
auto bucket_opt = get_configured_bucket();
if (!bucket_opt.has_value()) {
return;
}
cloud_storage_clients::bucket_name bucket(bucket_property().value());
cloud_storage_clients::bucket_name bucket = bucket_opt.value();
_metadata_uploader = std::make_unique<cloud_metadata::uploader>(
_raft_manager.local(),
_storage.local(),
Expand Down Expand Up @@ -745,8 +757,7 @@ ss::future<> controller::stop() {
});
}

ss::future<>
controller::create_cluster(const bootstrap_cluster_cmd_data cmd_data) {
ss::future<> controller::create_cluster(bootstrap_cluster_cmd_data cmd_data) {
vassert(
ss::this_shard_id() == controller_stm_shard,
"Cluster can only be created from controller_stm_shard");
Expand Down Expand Up @@ -774,6 +785,49 @@ controller::create_cluster(const bootstrap_cluster_cmd_data cmd_data) {
co_return;
}

// Check if there is any cluster metadata in the cloud.
auto bucket_opt = get_configured_bucket();
if (
bucket_opt.has_value()
&& config::shard_local_cfg()
.cloud_storage_attempt_cluster_recovery_on_bootstrap.value()) {
retry_chain_node retry_node(_as.local(), 300s, 5s);
auto res
= co_await cloud_metadata::download_highest_manifest_in_bucket(
_cloud_storage_api.local(), bucket_opt.value(), retry_node);
if (res.has_value()) {
vlog(
clusterlog.info,
"Found cluster metadata manifest {} in bucket {}",
res.value(),
bucket_opt.value());
cmd_data.recovery_state.emplace();
cmd_data.recovery_state->manifest = std::move(res.value());
cmd_data.recovery_state->bucket = bucket_opt.value();
// Proceed with recovery via cluster bootstrap.
} else {
const auto& err = res.error();
if (
err == cloud_metadata::error_outcome::no_matching_metadata) {
vlog(
clusterlog.info,
"No cluster manifest in bucket {}, proceeding without "
"recovery",
bucket_opt.value());
// Fall through to regular cluster bootstrap.
} else {
vlog(
clusterlog.error,
"Error looking for cluster recovery material in cloud, "
"retrying: {}",
err);
co_await ss::sleep_abortable(
retry_jitter.next_duration(), _as.local());
continue;
}
}
}

vlog(clusterlog.info, "Creating cluster UUID {}", cmd_data.uuid);
const std::error_code errc = co_await replicate_and_wait(
_stm,
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ class controller {

ss::future<> cluster_creation_hook(cluster_discovery& discovery);

std::optional<cloud_storage_clients::bucket_name> get_configured_bucket();

// Checks configuration invariants stored in kvstore
ss::future<> validate_configuration_invariants();

Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/tests/cluster_recovery_table_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ namespace cluster {
namespace {
cluster_recovery_init_cmd make_init_cmd(int meta_id) {
cluster_recovery_init_cmd_data data;
data.manifest.metadata_id = cluster::cloud_metadata::cluster_metadata_id{
meta_id};
data.state.manifest.metadata_id
= cluster::cloud_metadata::cluster_metadata_id{meta_id};
return cluster_recovery_init_cmd{0, std::move(data)};
}
cluster_recovery_update_cmd make_update_cmd(
Expand Down
Loading

0 comments on commit 3327b9e

Please sign in to comment.