Skip to content

Commit

Permalink
controller: test orphan deletion fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ZeDRoman committed Feb 21, 2023
1 parent 65ba934 commit 0ecefac
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 111 deletions.
180 changes: 71 additions & 109 deletions src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include <seastar/core/smp.hh>
#include <seastar/util/later.hh>

#include <absl/container/flat_hash_map.h>
#include <absl/container/flat_hash_set.h>
#include <absl/container/node_hash_map.h>

Expand Down Expand Up @@ -289,21 +290,68 @@ void controller_backend::setup_metrics() {
});
}

static absl::flat_hash_map<model::ntp, model::revision_id>
create_topic_table_snapshot(
ss::sharded<cluster::topic_table>& topics, model::node_id current_node) {
absl::flat_hash_map<model::ntp, model::revision_id> snapshot;

for (const auto& nt : topics.local().all_topics()) {
auto ntp_view = model::topic_namespace_view(nt);
auto ntp_meta = topics.local().get_topic_metadata(ntp_view);
if (!ntp_meta) {
continue;
}
for (const auto& p : ntp_meta->get_assignments()) {
auto ntp = model::ntp(nt.ns, nt.tp, p.id);
if (!ntp_meta->is_topic_replicable()) {
snapshot.emplace(ntp, 0);
continue;
}
auto revision_id = ntp_meta->get_revision();
if (cluster::contains_node(p.replicas, current_node)) {
snapshot.emplace(ntp, revision_id);
continue;
}
auto target_replica_set = topics.local().get_target_replica_set(
ntp);
if (
target_replica_set
&& cluster::contains_node(*target_replica_set, current_node)) {
snapshot.emplace(ntp, revision_id);
continue;
}
auto previous_replica_set = topics.local().get_previous_replica_set(
ntp);
if (
previous_replica_set
&& cluster::contains_node(*previous_replica_set, current_node)) {
snapshot.emplace(ntp, revision_id);
continue;
}
}
}
return snapshot;
}

ss::future<> controller_backend::start() {
setup_metrics();
return bootstrap_controller_backend().then([this] {
if (ss::this_shard_id() == cluster::controller_stm_shard) {
auto bootstrap_revision = _topics.local().last_applied_revision();
ssx::spawn_with_gate(_gate, [this, bootstrap_revision] {
return clear_orphan_topic_files(bootstrap_revision)
.handle_exception_type(
[](std::filesystem::filesystem_error const& err) {
vlog(
clusterlog.error,
"Exception while cleaning oprhan files {}",
err);
});
});
auto snapshot = create_topic_table_snapshot(_topics, _self);
ssx::spawn_with_gate(
_gate,
[this, bootstrap_revision, snapshot = std::move(snapshot)] {
return clear_orphan_topic_files(
bootstrap_revision, std::move(snapshot))
.handle_exception_type(
[](std::filesystem::filesystem_error const& err) {
vlog(
clusterlog.error,
"Exception while cleaning oprhan files {}",
err);
});
});
}
start_topics_reconciliation_loop();
_housekeeping_timer.set_callback([this] { housekeeping(); });
Expand Down Expand Up @@ -618,14 +666,10 @@ ss::future<> controller_backend::fetch_deltas() {
bool topic_files_are_orphan(
const model::ntp& ntp,
storage::partition_path::metadata ntp_directory_data,
ss::sharded<cluster::topic_table>& _topics,
model::revision_id last_applied_revision,
model::node_id current_node) {
vlog(
clusterlog.debug,
"Cleaning up orphan topic files. NTP {} Cur Node {}",
ntp,
current_node);
const absl::flat_hash_map<model::ntp, model::revision_id>&
topic_table_snapshot,
model::revision_id last_applied_revision) {
vlog(clusterlog.debug, "Cleaning up orphan topic files. NTP {}", ntp);

if (ntp_directory_data.revision_id > last_applied_revision) {
vlog(
Expand All @@ -637,102 +681,18 @@ bool topic_files_are_orphan(
last_applied_revision);
return false;
}
auto ntp_view = model::topic_namespace_view(ntp);
if (!_topics.local().contains(ntp_view)) {
vlog(
clusterlog.debug,
"Cleaning up orphan topic files. NTP {} Clean doesn't exist in "
"topic_table",
ntp);
return true;
}
auto ntp_meta = _topics.local().get_topic_metadata(ntp_view);
if (ntp_meta && !ntp_meta->is_topic_replicable()) {
vlog(
clusterlog.debug,
"Cleaning up orphan topic files. NTP {} Skip topic not replicable",
ntp);
return false;
}
if (ntp_meta && ntp_meta->get_revision() > ntp_directory_data.revision_id) {
vlog(
clusterlog.debug,
"Cleaning up orphan topic files. NTP {} Clean meta_revision {} is "
"higher than directory {}",
ntp,
ntp_meta->get_revision(),
ntp_directory_data.revision_id);
return true;
}

auto current_replica_set = _topics.local().get_partition_assignment(ntp);
if (cluster::contains_node(current_replica_set->replicas, current_node)) {
vlog(
clusterlog.debug,
"Cleaning up orphan topic files. NTP {} Skip contains in current "
"replica set",
ntp);
return false;
}
for (const auto& i : current_replica_set->replicas) {
vlog(
clusterlog.debug,
"Cleaning up orphan topic files. NTP {} Skip Current replica: {}",
ntp,
i.node_id);
}

auto target_replica_set = _topics.local().get_target_replica_set(ntp);
if (target_replica_set) {
for (const auto& i : *target_replica_set) {
vlog(
clusterlog.debug,
"Cleaning up orphan topic files. NTP {} Skip Target replica: {}",
ntp,
i.node_id);
}
}
if (
target_replica_set
&& cluster::contains_node(*target_replica_set, current_node)) {
vlog(
clusterlog.debug,
"Cleaning up orphan topic files. NTP {} Skip contains in target "
"replica set",
ntp);
return false;
}

auto previous_replica_set = _topics.local().get_previous_replica_set(ntp);
if (previous_replica_set) {
for (const auto& i : *previous_replica_set) {
vlog(
clusterlog.debug,
"Cleaning up orphan topic files. NTP {} Skip Previous replica: "
"{}",
ntp,
i.node_id);
}
}
if (
previous_replica_set
&& cluster::contains_node(*previous_replica_set, current_node)) {
vlog(
clusterlog.debug,
"Cleaning up orphan topic files. NTP {} Skip contains in previous "
"replica set",
ntp);
topic_table_snapshot.contains(ntp)
&& ntp_directory_data.revision_id
>= topic_table_snapshot.find(ntp)->second) {
return false;
}
vlog(
clusterlog.debug,
"Cleaning up orphan topic files. NTP {} Can clean",
ntp);
return true;
}

ss::future<> controller_backend::clear_orphan_topic_files(
model::revision_id bootstrap_revision) {
model::revision_id bootstrap_revision,
absl::flat_hash_map<model::ntp, model::revision_id> topic_table_snapshot) {
vlog(
clusterlog.info,
"Cleaning up orphan topic files. bootstrap_revision: {}",
Expand All @@ -746,10 +706,12 @@ ss::future<> controller_backend::clear_orphan_topic_files(
return _storage.local().log_mgr().remove_orphan_files(
_data_directory,
std::move(namespaces),
[&, bootstrap_revision](
[&,
bootstrap_revision,
topic_table_snapshot = std::move(topic_table_snapshot)](
model::ntp ntp, storage::partition_path::metadata p) {
return topic_files_are_orphan(
ntp, p, _topics, bootstrap_revision, _self);
ntp, p, topic_table_snapshot, bootstrap_revision);
});
}

Expand Down
5 changes: 3 additions & 2 deletions src/v/cluster/controller_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,9 @@ class controller_backend

// Topics
ss::future<> bootstrap_controller_backend();
ss::future<>
clear_orphan_topic_files(model::revision_id bootstrap_revision);
ss::future<> clear_orphan_topic_files(
model::revision_id bootstrap_revision,
absl::flat_hash_map<model::ntp, model::revision_id> topic_table_snapshot);
void start_topics_reconciliation_loop();

ss::future<> fetch_deltas();
Expand Down

0 comments on commit 0ecefac

Please sign in to comment.