Skip to content

Commit

Permalink
cluster: look for cluster manifest and start recovery on bootstrap
Browse files Browse the repository at this point in the history
This updates the cluster bootstrap sequence such that, when configured,
Redpanda will look for cluster metadata in the cloud and then
immediately start a recovery if any is found.
  • Loading branch information
andrwng committed Nov 24, 2023
1 parent 048ee23 commit 2ec9858
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 19 deletions.
17 changes: 15 additions & 2 deletions src/v/cluster/bootstrap_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "cluster/commands.h"
#include "cluster/controller_snapshot.h"
#include "cluster/feature_backend.h"
#include "cluster/fwd.h"
#include "cluster/logger.h"
#include "cluster/members_manager.h"
#include "cluster/types.h"
Expand All @@ -40,12 +41,14 @@ bootstrap_backend::bootstrap_backend(
ss::sharded<storage::api>& storage,
ss::sharded<members_manager>& members_manager,
ss::sharded<features::feature_table>& feature_table,
ss::sharded<feature_backend>& feature_backend)
ss::sharded<feature_backend>& feature_backend,
ss::sharded<cluster_recovery_table>& cluster_recovery_table)
: _credentials(credentials)
, _storage(storage)
, _members_manager(members_manager)
, _feature_table(feature_table)
, _feature_backend(feature_backend) {}
, _feature_backend(feature_backend)
, _cluster_recovery_table(cluster_recovery_table) {}

namespace {

Expand Down Expand Up @@ -194,6 +197,16 @@ bootstrap_backend::apply(bootstrap_cluster_cmd cmd, model::offset offset) {
}
}

// If this is a recovery cluster, initialize recovery state.
if (cmd.value.recovery_state.has_value()) {
co_await _cluster_recovery_table.invoke_on_all(
[o = offset,
m = cmd.value.recovery_state->manifest,
b = cmd.value.recovery_state->bucket](auto& recovery_table) {
recovery_table.apply(o, m, b, wait_for_nodes::yes);
});
}

co_await apply_cluster_uuid(cmd.value.uuid);

co_return errc::success;
Expand Down
4 changes: 3 additions & 1 deletion src/v/cluster/bootstrap_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class bootstrap_backend final {
ss::sharded<storage::api>&,
ss::sharded<members_manager>&,
ss::sharded<features::feature_table>&,
ss::sharded<feature_backend>&);
ss::sharded<feature_backend>&,
ss::sharded<cluster_recovery_table>&);

ss::future<std::error_code> apply_update(model::record_batch);

Expand All @@ -65,6 +66,7 @@ class bootstrap_backend final {
ss::sharded<members_manager>& _members_manager;
ss::sharded<features::feature_table>& _feature_table;
ss::sharded<feature_backend>& _feature_backend;
ss::sharded<cluster_recovery_table>& _cluster_recovery_table;
std::optional<model::cluster_uuid> _cluster_uuid_applied;
};

Expand Down
24 changes: 17 additions & 7 deletions src/v/cluster/cluster_recovery_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ ss::future<> cluster_recovery_table::wait_for_active_recovery() {
}

std::error_code cluster_recovery_table::apply(
model::offset offset, cluster_recovery_init_cmd cmd) {
model::offset offset,
cloud_metadata::cluster_metadata_manifest manifest,
cloud_storage_clients::bucket_name bucket,
wait_for_nodes wait_for_nodes) {
if (!_states.empty() && _states.back().is_active()) {
return errc::update_in_progress;
}
Expand All @@ -63,18 +66,25 @@ std::error_code cluster_recovery_table::apply(
vlog(
clusterlog.info,
"Initializing cluster recovery at offset {} with manifest {} from bucket "
"{}",
"{}, waiting for nodes: {}",
offset,
cmd.value.state.manifest,
cmd.value.state.bucket);
manifest,
bucket,
wait_for_nodes);
_states.emplace_back(
std::move(cmd.value.state.manifest),
std::move(cmd.value.state.bucket),
wait_for_nodes::no);
std::move(manifest), std::move(bucket), wait_for_nodes);
_has_active_recovery.signal();
return errc::success;
}

std::error_code cluster_recovery_table::apply(
model::offset offset, cluster_recovery_init_cmd cmd) {
return apply(
offset,
std::move(cmd.value.state.manifest),
std::move(cmd.value.state.bucket));
}

std::error_code
cluster_recovery_table::apply(model::offset, cluster_recovery_update_cmd cmd) {
if (_states.empty()) {
Expand Down
5 changes: 5 additions & 0 deletions src/v/cluster/cluster_recovery_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ class cluster_recovery_table {
return std::reference_wrapper(_states.back());
}

std::error_code apply(
model::offset offset,
cloud_metadata::cluster_metadata_manifest,
cloud_storage_clients::bucket_name,
wait_for_nodes wait = wait_for_nodes::no);
std::error_code apply(model::offset offset, cluster_recovery_init_cmd);
std::error_code apply(model::offset offset, cluster_recovery_update_cmd);

Expand Down
72 changes: 63 additions & 9 deletions src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
#include "cluster/controller.h"

#include "cluster/bootstrap_backend.h"
#include "cluster/cloud_metadata/cluster_manifest.h"
#include "cluster/cloud_metadata/cluster_recovery_backend.h"
#include "cluster/cloud_metadata/error_outcome.h"
#include "cluster/cloud_metadata/manifest_downloads.h"
#include "cluster/cloud_metadata/offsets_upload_rpc_types.h"
#include "cluster/cloud_metadata/producer_id_recovery_manager.h"
#include "cluster/cloud_metadata/uploader.h"
Expand Down Expand Up @@ -111,6 +114,17 @@ controller::controller(
// includes for destructors of all its members (e.g. the metadata uploader).
controller::~controller() = default;

std::optional<cloud_storage_clients::bucket_name>
controller::get_configured_bucket() {
auto& bucket_property = cloud_storage::configuration::get_bucket_config();
if (
!bucket_property.is_overriden() || !bucket_property().has_value()
|| !_cloud_storage_api.local_is_initialized()) {
return std::nullopt;
}
return cloud_storage_clients::bucket_name(bucket_property().value());
}

ss::future<> controller::wire_up() {
return _as.start()
.then([this] { return _members_table.start(); })
Expand Down Expand Up @@ -217,7 +231,8 @@ ss::future<> controller::start(
std::ref(_storage),
std::ref(_members_manager),
std::ref(_feature_table),
std::ref(_feature_backend));
std::ref(_feature_backend),
std::ref(_recovery_table));
})
.then([this] { return _recovery_table.start(); })
.then([this] {
Expand Down Expand Up @@ -612,14 +627,11 @@ ss::future<> controller::start(
&partition_balancer_backend::start);
})
.then([this, offsets_uploader, producer_id_recovery] {
auto& bucket_property
= cloud_storage::configuration::get_bucket_config();
if (
!bucket_property.is_overriden() || !bucket_property().has_value()
|| !_cloud_storage_api.local_is_initialized()) {
auto bucket_opt = get_configured_bucket();
if (!bucket_opt.has_value()) {
return;
}
cloud_storage_clients::bucket_name bucket(bucket_property().value());
cloud_storage_clients::bucket_name bucket = bucket_opt.value();
_metadata_uploader = std::make_unique<cloud_metadata::uploader>(
_raft_manager.local(),
_storage.local(),
Expand Down Expand Up @@ -745,8 +757,7 @@ ss::future<> controller::stop() {
});
}

ss::future<>
controller::create_cluster(const bootstrap_cluster_cmd_data cmd_data) {
ss::future<> controller::create_cluster(bootstrap_cluster_cmd_data cmd_data) {
vassert(
ss::this_shard_id() == controller_stm_shard,
"Cluster can only be created from controller_stm_shard");
Expand Down Expand Up @@ -774,6 +785,49 @@ controller::create_cluster(const bootstrap_cluster_cmd_data cmd_data) {
co_return;
}

// Check if there is any cluster metadata in the cloud.
auto bucket_opt = get_configured_bucket();
if (
bucket_opt.has_value()
&& config::shard_local_cfg()
.cloud_storage_attempt_cluster_recovery_on_bootstrap.value()) {
retry_chain_node retry_node(_as.local(), 300s, 5s);
auto res
= co_await cloud_metadata::download_highest_manifest_in_bucket(
_cloud_storage_api.local(), bucket_opt.value(), retry_node);
if (res.has_value()) {
vlog(
clusterlog.info,
"Found cluster metadata manifest {} in bucket {}",
res.value(),
bucket_opt.value());
cmd_data.recovery_state.emplace();
cmd_data.recovery_state->manifest = std::move(res.value());
cmd_data.recovery_state->bucket = bucket_opt.value();
// Proceed with recovery via cluster bootstrap.
} else {
const auto& err = res.error();
if (
err == cloud_metadata::error_outcome::no_matching_metadata) {
vlog(
clusterlog.info,
"No cluster manifest in bucket {}, proceeding without "
"recovery",
bucket_opt.value());
// Fall through to regular cluster bootstrap.
} else {
vlog(
clusterlog.error,
"Error looking for cluster recovery material in cloud, "
"retrying: {}",
err);
co_await ss::sleep_abortable(
retry_jitter.next_duration(), _as.local());
continue;
}
}
}

vlog(clusterlog.info, "Creating cluster UUID {}", cmd_data.uuid);
const std::error_code errc = co_await replicate_and_wait(
_stm,
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ class controller {

ss::future<> cluster_creation_hook(cluster_discovery& discovery);

std::optional<cloud_storage_clients::bucket_name> get_configured_bucket();

// Checks configuration invariants stored in kvstore
ss::future<> validate_configuration_invariants();

Expand Down
47 changes: 47 additions & 0 deletions tests/rptest/tests/cluster_recovery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,50 @@ def cluster_recovery_complete():
if u in l and "ALLOW" in l and "DESCRIBE" in l:
found = True
assert found, f"Couldn't find {u} in {acls_lines}"

@cluster(num_nodes=4)
def test_bootstrap_with_recovery(self):
"""
Smoke test that configuring automated recovery at bootstrap will kick
in as appropriate.
"""
rpk = RpkTool(self.redpanda)
rpk.cluster_config_set(
"cloud_storage_attempt_cluster_recovery_on_bootstrap", True)
for t in self.topics:
KgoVerifierProducer.oneshot(self.test_context,
self.redpanda,
t.name,
self.message_size,
100,
batch_max_bytes=self.message_size * 8,
timeout_sec=60)
quiesce_uploads(self.redpanda, [t.name for t in self.topics],
timeout_sec=60)
time.sleep(5)

self.redpanda.stop()
for n in self.redpanda.nodes:
self.redpanda.remove_local_data(n)

# Restart the nodes, overriding the recovery bootstrap config.
extra_rp_conf = dict(
cloud_storage_attempt_cluster_recovery_on_bootstrap=True)
self.redpanda.set_extra_rp_conf(extra_rp_conf)
self.redpanda.write_bootstrap_cluster_config()
self.redpanda.restart_nodes(self.redpanda.nodes,
override_cfg_params=extra_rp_conf)

# We should see a recovery begin automatically.
self.redpanda._admin.await_stable_leader("controller",
partition=0,
namespace='redpanda',
timeout_s=60,
backoff_s=2)

def cluster_recovery_complete():
return "recovery_stage::complete" in self.redpanda._admin.get_cluster_recovery_status(
).json()["state"]

wait_until(cluster_recovery_complete, timeout_sec=60, backoff_sec=1)
self.redpanda.restart_nodes(self.redpanda.nodes)

0 comments on commit 2ec9858

Please sign in to comment.