Skip to content

Commit

Permalink
Merge pull request #8960 from ZeDRoman/delete-orphan-topic-files-fix
Browse files Browse the repository at this point in the history
Clear topics orphan files with fix
  • Loading branch information
ZeDRoman authored Mar 28, 2023
2 parents 8479d7d + 2eb0976 commit 81610d8
Show file tree
Hide file tree
Showing 6 changed files with 444 additions and 24 deletions.
133 changes: 121 additions & 12 deletions src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include <seastar/core/smp.hh>
#include <seastar/util/later.hh>

#include <absl/container/flat_hash_map.h>
#include <absl/container/flat_hash_set.h>
#include <absl/container/node_hash_map.h>

Expand All @@ -56,20 +57,12 @@
namespace cluster {
namespace {

inline bool contains_node(
model::node_id id, const std::vector<model::broker_shard>& replicas) {
return std::any_of(
replicas.cbegin(), replicas.cend(), [id](const model::broker_shard& bs) {
return bs.node_id == id;
});
}

bool is_cross_core_update(model::node_id self, const topic_table_delta& delta) {
if (!delta.previous_replica_set) {
return false;
}
return has_local_replicas(self, delta.new_assignment.replicas)
&& contains_node(self, *delta.previous_replica_set)
&& contains_node(*delta.previous_replica_set, self)
&& !has_local_replicas(self, *delta.previous_replica_set);
}

Expand Down Expand Up @@ -219,7 +212,7 @@ std::error_code check_configuration_update(
change_revision);
return errc::partition_configuration_revision_not_updated;
}
const bool includes_self = contains_node(self, bs);
const bool includes_self = contains_node(bs, self);

/*
* if configuration includes current node, we expect configuration to be
Expand Down Expand Up @@ -326,10 +319,76 @@ void controller_backend::setup_metrics() {
"Number of partitions with ongoing/requested operations")),
});
}
/**
* Create snapshot of topic table that contains all ntp that
* must be presented on local disk storage and their revision
* This snapshot will help to define orphan topic files
* If topic is not presented in this snapshot or its revision
* its revision is less than in this snapshot then this topic
* directory is orphan
*/
static absl::flat_hash_map<model::ntp, model::revision_id>
create_topic_table_snapshot(
ss::sharded<cluster::topic_table>& topics, model::node_id current_node) {
absl::flat_hash_map<model::ntp, model::revision_id> snapshot;

for (const auto& nt : topics.local().all_topics()) {
auto ntp_view = model::topic_namespace_view(nt);
auto ntp_meta = topics.local().get_topic_metadata(ntp_view);
if (!ntp_meta) {
continue;
}
for (const auto& p : ntp_meta->get_assignments()) {
auto ntp = model::ntp(nt.ns, nt.tp, p.id);
if (!ntp_meta->is_topic_replicable()) {
snapshot.emplace(ntp, 0);
continue;
}
auto revision_id = ntp_meta->get_revision();
if (cluster::contains_node(p.replicas, current_node)) {
snapshot.emplace(ntp, revision_id);
continue;
}
auto target_replica_set = topics.local().get_target_replica_set(
ntp);
if (
target_replica_set
&& cluster::contains_node(*target_replica_set, current_node)) {
snapshot.emplace(ntp, revision_id);
continue;
}
auto previous_replica_set = topics.local().get_previous_replica_set(
ntp);
if (
previous_replica_set
&& cluster::contains_node(*previous_replica_set, current_node)) {
snapshot.emplace(ntp, revision_id);
continue;
}
}
}
return snapshot;
}

ss::future<> controller_backend::start() {
setup_metrics();
return bootstrap_controller_backend().then([this] {
if (ss::this_shard_id() == cluster::controller_stm_shard) {
auto bootstrap_revision = _topics.local().last_applied_revision();
auto snapshot = create_topic_table_snapshot(_topics, _self);
ssx::spawn_with_gate(
_gate,
[this, bootstrap_revision, snapshot = std::move(snapshot)] {
return clear_orphan_topic_files(
bootstrap_revision, std::move(snapshot))
.handle_exception([](const std::exception_ptr& err) {
vlog(
clusterlog.error,
"Exception while cleaning oprhan files {}",
err);
});
});
}
start_topics_reconciliation_loop();
_housekeeping_timer.set_callback([this] { housekeeping(); });
_housekeeping_timer.arm(_housekeeping_timer_interval);
Expand Down Expand Up @@ -585,6 +644,56 @@ ss::future<> controller_backend::fetch_deltas() {
});
}

/**
* Topic files is qualified as orphan if we don't have it in topic table
* or it's revision is less than revision it topic table
* And it's revision is less than topic table snapshot revision
*/
bool topic_files_are_orphan(
const model::ntp& ntp,
storage::partition_path::metadata ntp_directory_data,
const absl::flat_hash_map<model::ntp, model::revision_id>&
topic_table_snapshot,
model::revision_id last_applied_revision) {
vlog(clusterlog.debug, "Checking topic files for ntp {} are orphan", ntp);

if (ntp_directory_data.revision_id > last_applied_revision) {
return false;
}
if (
topic_table_snapshot.contains(ntp)
&& ntp_directory_data.revision_id
>= topic_table_snapshot.find(ntp)->second) {
return false;
}
return true;
}

ss::future<> controller_backend::clear_orphan_topic_files(
model::revision_id bootstrap_revision,
absl::flat_hash_map<model::ntp, model::revision_id> topic_table_snapshot) {
vlog(
clusterlog.info,
"Cleaning up orphan topic files. bootstrap_revision: {}",
bootstrap_revision);
// Init with default namespace to clean if there is no topics
absl::flat_hash_set<model::ns> namespaces = {{model::kafka_namespace}};
for (const auto& t : _topics.local().all_topics()) {
namespaces.emplace(t.ns);
}

return _storage.local().log_mgr().remove_orphan_files(
_data_directory,
std::move(namespaces),
[&,
bootstrap_revision,
topic_table_snapshot = std::move(topic_table_snapshot)](
model::ntp ntp, storage::partition_path::metadata p) {
return topic_files_are_orphan(
ntp, p, topic_table_snapshot, bootstrap_revision);
});
}

void controller_backend::start_topics_reconciliation_loop() {
ssx::spawn_with_gate(_gate, [this] {
return ss::do_until(
Expand Down Expand Up @@ -981,7 +1090,7 @@ controller_backend::process_partition_reconfiguration(
* 1) shutdown partition instance
* 2) create instance on target remote core
*/
if (contains_node(_self, target_assignment.replicas)) {
if (contains_node(target_assignment.replicas, _self)) {
co_return co_await shutdown_on_current_shard(
std::move(ntp), command_rev);
}
Expand All @@ -1006,7 +1115,7 @@ controller_backend::process_partition_reconfiguration(
// Wait fo the operation to be finished on one of the nodes
co_return errc::waiting_for_reconfiguration_finish;
}
const auto cross_core_move = contains_node(_self, previous_replicas)
const auto cross_core_move = contains_node(previous_replicas, _self)
&& !has_local_replicas(
_self, previous_replicas);
/**
Expand Down
14 changes: 14 additions & 0 deletions src/v/cluster/controller_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,20 @@ class controller_backend

// Topics
ss::future<> bootstrap_controller_backend();
/**
* Function that will clean orphan topic files on redpanda startup
* Orphan topic files is files that left on disk after some node
* manipullations and redpanda doesn't know about these files, so it is
* impossible to remove them with default approach. Currently we may leave
* orphan topic files when we restart redpanda while partition deletion
* operation was evaluating but hasn't finished yet.
* We assume that we can leave orphan files only on redpanda restart
* so we run clean process on startup, after bootstrap when redpanda
* already knows about all topics that it should containt on disk
**/
ss::future<> clear_orphan_topic_files(
model::revision_id bootstrap_revision,
absl::flat_hash_map<model::ntp, model::revision_id> topic_table_snapshot);
void start_topics_reconciliation_loop();

ss::future<> fetch_deltas();
Expand Down
20 changes: 20 additions & 0 deletions src/v/storage/fs_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,26 @@ class segment_full_path;

class partition_path {
public:
struct metadata {
model::partition_id partition_id;
model::revision_id revision_id;
};

/// Parse ntp directory name
static std::optional<metadata>
parse_partition_directory(const ss::sstring& name) {
const std::regex re(R"(^(\d+)_(\d+)$)");
std::cmatch match;
if (!std::regex_match(name.c_str(), match, re)) {
return std::nullopt;
}
return metadata{
.partition_id = model::partition_id(
boost::lexical_cast<uint64_t>(match[1].str())),
.revision_id = model::revision_id(
boost::lexical_cast<uint64_t>(match[2].str()))};
}

partition_path(const ntp_config& ntpc);

operator std::filesystem::path() const { return make_path(); }
Expand Down
95 changes: 95 additions & 0 deletions src/v/storage/log_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@

#include "storage/log_manager.h"

#include "cluster/cluster_utils.h"
#include "cluster/topic_table.h"
#include "config/configuration.h"
#include "likely.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/timestamp.h"
#include "resource_mgmt/io_priority.h"
#include "ssx/async-clear.h"
Expand Down Expand Up @@ -46,6 +49,7 @@
#include <seastar/core/with_scheduling_group.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/file.hh>

#include <boost/algorithm/string/predicate.hpp>
#include <fmt/format.h>
Expand Down Expand Up @@ -425,6 +429,97 @@ ss::future<> log_manager::remove(model::ntp ntp) {
co_await dispatch_topic_dir_deletion(topic_dir);
}

ss::future<> remove_orphan_partition_files(
ss::sstring topic_directory_path,
model::topic_namespace nt,
ss::noncopyable_function<bool(model::ntp, partition_path::metadata)>&
orphan_filter) {
return directory_walker::walk(
topic_directory_path,
[topic_directory_path, nt, &orphan_filter](
ss::directory_entry entry) -> ss::future<> {
auto ntp_directory_data = partition_path::parse_partition_directory(
entry.name);
if (!ntp_directory_data) {
return ss::now();
}

auto ntp = model::ntp(nt.ns, nt.tp, ntp_directory_data->partition_id);
if (orphan_filter(ntp, *ntp_directory_data)) {
auto ntp_directory = std::filesystem::path(topic_directory_path)
/ std::filesystem::path(entry.name);
vlog(stlog.info, "Cleaning up ntp directory {} ", ntp_directory);
return ss::recursive_remove_directory(ntp_directory)
.handle_exception_type([ntp_directory](
std::filesystem::
filesystem_error const& err) {
vlog(
stlog.error,
"Exception while cleaning oprhan files for {} Error: {}",
ntp_directory,
err);
});
}
return ss::now();
});
}

ss::future<> log_manager::remove_orphan_files(
ss::sstring data_directory_path,
absl::flat_hash_set<model::ns> namespaces,
ss::noncopyable_function<bool(model::ntp, partition_path::metadata)>
orphan_filter) {
auto data_directory_exist = co_await ss::file_exists(data_directory_path);
if (!data_directory_exist) {
co_return;
}

for (const auto& ns : namespaces) {
auto namespace_directory = std::filesystem::path(data_directory_path)
/ std::filesystem::path(ss::sstring(ns));
auto namespace_directory_exist = co_await ss::file_exists(
namespace_directory.string());
if (!namespace_directory_exist) {
continue;
}
co_await directory_walker::walk(
namespace_directory.string(),
[this, &namespace_directory, &ns, &orphan_filter](
ss::directory_entry entry) -> ss::future<> {
if (entry.type != ss::directory_entry_type::directory) {
return ss::now();
}
auto topic_directory = namespace_directory
/ std::filesystem::path(entry.name);
return remove_orphan_partition_files(
topic_directory.string(),
model::topic_namespace(ns, model::topic(entry.name)),
orphan_filter)
.then([this, topic_directory]() {
vlog(
stlog.info,
"Trying to clean up topic directory {} ",
topic_directory);
return dispatch_topic_dir_deletion(
topic_directory.string());
})
.handle_exception_type(
[](std::filesystem::filesystem_error const& err) {
vlog(
stlog.error,
"Exception while cleaning oprhan files {}",
err);
});
})
.handle_exception_type(
[](std::filesystem::filesystem_error const& err) {
vlog(
stlog.error, "Exception while cleaning oprhan files {}", err);
});
}
co_return;
}

ss::future<> log_manager::dispatch_topic_dir_deletion(ss::sstring dir) {
return ss::smp::submit_to(
0,
Expand Down
12 changes: 12 additions & 0 deletions src/v/storage/log_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@

#pragma once

#include "cluster/topic_table.h"
#include "config/property.h"
#include "features/feature_table.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "random/simple_time_jitter.h"
#include "seastarx.h"
#include "storage/batch_cache.h"
Expand Down Expand Up @@ -168,6 +170,16 @@ class log_manager {
* rebalancing partitions across the cluster, etc...
*/
ss::future<> remove(model::ntp);
/**
* This function walsk through entire directory structure in an async fiber
* to remove all orphan files in that directory. It checks if file is orphan
* with special orphan_filter
*/
ss::future<> remove_orphan_files(
ss::sstring data_directory_path,
absl::flat_hash_set<model::ns> namespaces,
ss::noncopyable_function<bool(model::ntp, partition_path::metadata)>
orphan_filter);

ss::future<> stop();

Expand Down
Loading

0 comments on commit 81610d8

Please sign in to comment.