Skip to content

Commit

Permalink
cluster: add recovery state to bootstrap cmd
Browse files Browse the repository at this point in the history
Recovery can be started with a new cluster config, in which case the
recovery will be added to the recovery table when applying the bootstrap
command.
  • Loading branch information
andrwng committed Nov 21, 2023
1 parent 9578da9 commit 94ce852
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 11 deletions.
28 changes: 26 additions & 2 deletions src/v/cluster/bootstrap_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "cluster/commands.h"
#include "cluster/controller_snapshot.h"
#include "cluster/feature_backend.h"
#include "cluster/fwd.h"
#include "cluster/logger.h"
#include "cluster/members_manager.h"
#include "cluster/types.h"
Expand All @@ -40,12 +41,14 @@ bootstrap_backend::bootstrap_backend(
ss::sharded<storage::api>& storage,
ss::sharded<members_manager>& members_manager,
ss::sharded<features::feature_table>& feature_table,
ss::sharded<feature_backend>& feature_backend)
ss::sharded<feature_backend>& feature_backend,
ss::sharded<cluster_recovery_table>& cluster_recovery_table)
: _credentials(credentials)
, _storage(storage)
, _members_manager(members_manager)
, _feature_table(feature_table)
, _feature_backend(feature_backend) {}
, _feature_backend(feature_backend)
, _cluster_recovery_table(cluster_recovery_table) {}

namespace {

Expand Down Expand Up @@ -194,6 +197,27 @@ bootstrap_backend::apply(bootstrap_cluster_cmd cmd, model::offset offset) {
}
}

// If this is a recovery cluster, initialize recovery state.
if (
cmd.value.recovery_bucket.has_value()
!= cmd.value.recovery_manifest.has_value()) {
vlog(
clusterlog.error,
"Expected bucket and manifest together. Bucket: {}, manifest: {}",
cmd.value.recovery_bucket,
cmd.value.recovery_manifest);
}
if (
cmd.value.recovery_manifest.has_value()
&& cmd.value.recovery_bucket.has_value()) {
co_await _cluster_recovery_table.invoke_on_all(
[o = offset,
m = cmd.value.recovery_manifest.value(),
b = cmd.value.recovery_bucket.value()](auto& recovery_table) {
recovery_table.apply(o, m, b, wait_for_nodes::yes);
});
}

co_await apply_cluster_uuid(cmd.value.uuid);

co_return errc::success;
Expand Down
4 changes: 3 additions & 1 deletion src/v/cluster/bootstrap_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class bootstrap_backend final {
ss::sharded<storage::api>&,
ss::sharded<members_manager>&,
ss::sharded<features::feature_table>&,
ss::sharded<feature_backend>&);
ss::sharded<feature_backend>&,
ss::sharded<cluster_recovery_table>&);

ss::future<std::error_code> apply_update(model::record_batch);

Expand All @@ -65,6 +66,7 @@ class bootstrap_backend final {
ss::sharded<members_manager>& _members_manager;
ss::sharded<features::feature_table>& _feature_table;
ss::sharded<feature_backend>& _feature_backend;
ss::sharded<cluster_recovery_table>& _cluster_recovery_table;
std::optional<model::cluster_uuid> _cluster_uuid_applied;
};

Expand Down
20 changes: 15 additions & 5 deletions src/v/cluster/cluster_recovery_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ ss::future<> cluster_recovery_table::wait_for_active_recovery() {
}

std::error_code cluster_recovery_table::apply(
model::offset offset, cluster_recovery_init_cmd cmd) {
model::offset offset,
cloud_metadata::cluster_metadata_manifest manifest,
cloud_storage_clients::bucket_name bucket,
wait_for_nodes wait_for_nodes) {
if (!_states.empty() && _states.back().is_active()) {
return errc::update_in_progress;
}
Expand All @@ -62,16 +65,23 @@ std::error_code cluster_recovery_table::apply(
vlog(
clusterlog.info,
"Initializing cluster recovery at offset {} with manifest {} from bucket "
"{}",
"{}, waiting for nodes: {}",
offset,
cmd.value.manifest,
cmd.value.bucket);
manifest,
bucket,
wait_for_nodes);
_states.emplace_back(
std::move(cmd.value.manifest), std::move(cmd.value.bucket));
std::move(manifest), std::move(bucket), wait_for_nodes);
_has_active_recovery.signal();
return errc::success;
}

std::error_code cluster_recovery_table::apply(
model::offset offset, cluster_recovery_init_cmd cmd) {
return apply(
offset, std::move(cmd.value.manifest), std::move(cmd.value.bucket));
}

std::error_code
cluster_recovery_table::apply(model::offset, cluster_recovery_update_cmd cmd) {
if (_states.empty()) {
Expand Down
5 changes: 5 additions & 0 deletions src/v/cluster/cluster_recovery_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ class cluster_recovery_table {
return std::reference_wrapper(_states.back());
}

std::error_code apply(
model::offset offset,
cloud_metadata::cluster_metadata_manifest,
cloud_storage_clients::bucket_name,
wait_for_nodes wait = wait_for_nodes::no);
std::error_code apply(model::offset offset, cluster_recovery_init_cmd);
std::error_code apply(model::offset offset, cluster_recovery_update_cmd);

Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ controller::start(cluster_discovery& discovery, ss::abort_source& shard0_as) {
std::ref(_storage),
std::ref(_members_manager),
std::ref(_feature_table),
std::ref(_feature_backend));
std::ref(_feature_backend),
std::ref(_recovery_table));
})
.then([this] { return _recovery_table.start(); })
.then([this] {
Expand Down
11 changes: 9 additions & 2 deletions src/v/cluster/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -2957,7 +2957,7 @@ struct user_and_credential
struct bootstrap_cluster_cmd_data
: serde::envelope<
bootstrap_cluster_cmd_data,
serde::version<2>,
serde::version<3>,
serde::compat_version<0>> {
using rpc_adl_exempt = std::true_type;

Expand All @@ -2971,7 +2971,9 @@ struct bootstrap_cluster_cmd_data
bootstrap_user_cred,
node_ids_by_uuid,
founding_version,
initial_nodes);
initial_nodes,
recovery_manifest,
recovery_bucket);
}

model::cluster_uuid uuid;
Expand All @@ -2983,6 +2985,11 @@ struct bootstrap_cluster_cmd_data
// the node that generated the bootstrap record.
cluster_version founding_version{invalid_version};
std::vector<model::broker> initial_nodes;

// If these are set, begins a cluster recovery using this manifest as the
// basis.
std::optional<cloud_metadata::cluster_metadata_manifest> recovery_manifest;
std::optional<cloud_storage_clients::bucket_name> recovery_bucket;
};

struct cluster_recovery_init_cmd_data
Expand Down

0 comments on commit 94ce852

Please sign in to comment.