Skip to content

Commit

Permalink
cluster: look for cluster manifest and start recovery on bootstrap
Browse files Browse the repository at this point in the history
This updates the cluster bootstrap sequence such that, when configured,
Redpanda will look for cluster metadata in the cloud and then
immediately start a recovery if any is found.
  • Loading branch information
andrwng committed Nov 24, 2023
1 parent 55984d0 commit 2f6455b
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 8 deletions.
84 changes: 76 additions & 8 deletions src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

#include "cluster/bootstrap_backend.h"
#include "cluster/cloud_metadata/cluster_recovery_backend.h"
#include "cluster/cloud_metadata/error_outcome.h"
#include "cluster/cloud_metadata/manifest_downloads.h"
#include "cluster/cloud_metadata/offsets_upload_rpc_types.h"
#include "cluster/cloud_metadata/producer_id_recovery_manager.h"
#include "cluster/cloud_metadata/uploader.h"
Expand Down Expand Up @@ -111,6 +113,17 @@ controller::controller(
// includes for destructors of all its members (e.g. the metadata uploader).
controller::~controller() = default;

std::optional<cloud_storage_clients::bucket_name>
controller::get_configured_bucket() {
auto& bucket_property = cloud_storage::configuration::get_bucket_config();
if (
!bucket_property.is_overriden() || !bucket_property().has_value()
|| !_cloud_storage_api.local_is_initialized()) {
return std::nullopt;
}
return cloud_storage_clients::bucket_name(bucket_property().value());
}

ss::future<> controller::wire_up() {
return _as.start()
.then([this] { return _members_table.start(); })
Expand Down Expand Up @@ -613,14 +626,11 @@ ss::future<> controller::start(
&partition_balancer_backend::start);
})
.then([this, offsets_uploader, producer_id_recovery] {
auto& bucket_property
= cloud_storage::configuration::get_bucket_config();
if (
!bucket_property.is_overriden() || !bucket_property().has_value()
|| !_cloud_storage_api.local_is_initialized()) {
auto bucket_opt = get_configured_bucket();
if (!bucket_opt.has_value()) {
return;
}
cloud_storage_clients::bucket_name bucket(bucket_property().value());
cloud_storage_clients::bucket_name bucket = bucket_opt.value();
_metadata_uploader = std::make_unique<cloud_metadata::uploader>(
_raft_manager.local(),
_storage.local(),
Expand Down Expand Up @@ -746,8 +756,28 @@ ss::future<> controller::stop() {
});
}

ss::future<>
controller::create_cluster(const bootstrap_cluster_cmd_data cmd_data) {
ss::future<cloud_metadata::cluster_manifest_result>
controller::find_cluster_manifest_in_cloud() {
if (!config::shard_local_cfg()
.cloud_storage_attempt_cluster_recovery_on_bootstrap.value()) {
co_return cloud_metadata::error_outcome::no_matching_metadata;
}
auto& bucket_property = cloud_storage::configuration::get_bucket_config();
if (
!bucket_property.is_overriden() || !bucket_property().has_value()
|| !_cloud_storage_api.local_is_initialized()) {
vlog(
clusterlog.debug,
"Cloud not configured, skipping cluster recovery check");
co_return cloud_metadata::error_outcome::no_matching_metadata;
}
cloud_storage_clients::bucket_name bucket(bucket_property().value());
retry_chain_node retry_node(_as.local());
co_return co_await cloud_metadata::download_highest_manifest_in_bucket(
_cloud_storage_api.local(), bucket, retry_node);
}

ss::future<> controller::create_cluster(bootstrap_cluster_cmd_data cmd_data) {
vassert(
ss::this_shard_id() == controller_stm_shard,
"Cluster can only be created from controller_stm_shard");
Expand Down Expand Up @@ -775,6 +805,44 @@ controller::create_cluster(const bootstrap_cluster_cmd_data cmd_data) {
co_return;
}

// Check if there is any cluster metadata in the cloud.
auto bucket_opt = get_configured_bucket();
if (bucket_opt.has_value()) {
retry_chain_node retry_node(_as.local(), 300s, 5s);
auto res
= co_await cloud_metadata::download_highest_manifest_in_bucket(
_cloud_storage_api.local(), bucket_opt.value(), retry_node);
if (res.has_value()) {
vlog(
clusterlog.info,
"Found cluster metadata manifest {} in bucket {}",
res.value(),
bucket_opt.value());
cmd_data.recovery_manifest = std::move(res.value());
cmd_data.recovery_bucket = bucket_opt.value();
} else {
const auto& err = res.error();
if (
err == cloud_metadata::error_outcome::no_matching_metadata) {
vlog(
clusterlog.info,
"No cluster manifest in bucket {}, proceeding without "
"recovery",
bucket_opt.value());
} else {
vlog(
clusterlog.error,
"Error looking for cluster recovery material in cloud, "
"retrying: {}",
err);
co_await ss::sleep_abortable(
retry_jitter.next_duration(), _as.local());
continue;
}
// No metadata in cloud: this isn't a recovery cluster.
}
}

vlog(clusterlog.info, "Creating cluster UUID {}", cmd_data.uuid);
const std::error_code errc = co_await replicate_and_wait(
_stm,
Expand Down
13 changes: 13 additions & 0 deletions src/v/cluster/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#pragma once

#include "cloud_storage/fwd.h"
#include "cluster/cloud_metadata/cluster_manifest.h"
#include "cluster/cloud_metadata/producer_id_recovery_manager.h"
#include "cluster/controller_probe.h"
#include "cluster/controller_stm.h"
Expand Down Expand Up @@ -241,6 +242,18 @@ class controller {

ss::future<> cluster_creation_hook(cluster_discovery& discovery);

std::optional<cloud_storage_clients::bucket_name> get_configured_bucket();

/**
* Looks in the cloud for a cluster metadata manifest with which to recover
* a cluster.
*
* Returns \c no_matching_metadata if there is no metadata or cloud isn't
* configured.
*/
ss::future<cloud_metadata::cluster_manifest_result>
find_cluster_manifest_in_cloud();

// Checks configuration invariants stored in kvstore
ss::future<> validate_configuration_invariants();

Expand Down
47 changes: 47 additions & 0 deletions tests/rptest/tests/cluster_recovery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,50 @@ def cluster_recovery_complete():
if u in l and "ALLOW" in l and "DESCRIBE" in l:
found = True
assert found, f"Couldn't find {u} in {acls_lines}"

@cluster(num_nodes=4)
def test_bootstrap_with_recovery(self):
"""
Smoke test that configuring automated recovery at bootstrap will kick
in as appropriate.
"""
rpk = RpkTool(self.redpanda)
rpk.cluster_config_set(
"cloud_storage_attempt_cluster_recovery_on_bootstrap", True)
for t in self.topics:
KgoVerifierProducer.oneshot(self.test_context,
self.redpanda,
t.name,
self.message_size,
100,
batch_max_bytes=self.message_size * 8,
timeout_sec=60)
quiesce_uploads(self.redpanda, [t.name for t in self.topics],
timeout_sec=60)
time.sleep(5)

self.redpanda.stop()
for n in self.redpanda.nodes:
self.redpanda.remove_local_data(n)

# Restart the nodes, overriding the recovery bootstrap config.
extra_rp_conf = dict(
cloud_storage_attempt_cluster_recovery_on_bootstrap=True)
self.redpanda.set_extra_rp_conf(extra_rp_conf)
self.redpanda.write_bootstrap_cluster_config()
self.redpanda.restart_nodes(self.redpanda.nodes,
override_cfg_params=extra_rp_conf)

# We should see a recovery begin automatically.
self.redpanda._admin.await_stable_leader("controller",
partition=0,
namespace='redpanda',
timeout_s=60,
backoff_s=2)

def cluster_recovery_complete():
return "recovery_stage::complete" in self.redpanda._admin.get_cluster_recovery_status(
).json()["state"]

wait_until(cluster_recovery_complete, timeout_sec=30, backoff_sec=1)
self.redpanda.restart_nodes(self.redpanda.nodes)

0 comments on commit 2f6455b

Please sign in to comment.