Skip to content

Commit

Permalink
controller: clean up topic orphan files
Browse files Browse the repository at this point in the history
When redpanda is restarted while delete operation is not finish
Partition files might be left on disk.
We need to cleanup orphan partition files after bootstrap.
  • Loading branch information
ZeDRoman committed Mar 22, 2023
1 parent 782eefb commit 6f9be07
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 0 deletions.
118 changes: 118 additions & 0 deletions src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include <seastar/core/smp.hh>
#include <seastar/util/later.hh>

#include <absl/container/flat_hash_map.h>
#include <absl/container/flat_hash_set.h>
#include <absl/container/node_hash_map.h>

Expand Down Expand Up @@ -288,10 +289,77 @@ void controller_backend::setup_metrics() {
"Number of partitions with ongoing/requested operations")),
});
}
/**
* Create snapshot of topic table that contains all ntp that
* must be presented on local disk storage and their revision
* This snapshot will help to define orphan topic files
* If topic is not presented in this snapshot or its revision
* its revision is less than in this snapshot then this topic
* directory is orphan
*/
static absl::flat_hash_map<model::ntp, model::revision_id>
create_topic_table_snapshot(
ss::sharded<cluster::topic_table>& topics, model::node_id current_node) {
absl::flat_hash_map<model::ntp, model::revision_id> snapshot;

for (const auto& nt : topics.local().all_topics()) {
auto ntp_view = model::topic_namespace_view(nt);
auto ntp_meta = topics.local().get_topic_metadata(ntp_view);
if (!ntp_meta) {
continue;
}
for (const auto& p : ntp_meta->get_assignments()) {
auto ntp = model::ntp(nt.ns, nt.tp, p.id);
if (!ntp_meta->is_topic_replicable()) {
snapshot.emplace(ntp, 0);
continue;
}
auto revision_id = ntp_meta->get_revision();
if (cluster::contains_node(p.replicas, current_node)) {
snapshot.emplace(ntp, revision_id);
continue;
}
auto target_replica_set = topics.local().get_target_replica_set(
ntp);
if (
target_replica_set
&& cluster::contains_node(*target_replica_set, current_node)) {
snapshot.emplace(ntp, revision_id);
continue;
}
auto previous_replica_set = topics.local().get_previous_replica_set(
ntp);
if (
previous_replica_set
&& cluster::contains_node(*previous_replica_set, current_node)) {
snapshot.emplace(ntp, revision_id);
continue;
}
}
}
return snapshot;
}

ss::future<> controller_backend::start() {
setup_metrics();
return bootstrap_controller_backend().then([this] {
if (ss::this_shard_id() == cluster::controller_stm_shard) {
auto bootstrap_revision = _topics.local().last_applied_revision();
auto snapshot = create_topic_table_snapshot(_topics, _self);
ssx::spawn_with_gate(
_gate,
[this, bootstrap_revision, snapshot = std::move(snapshot)] {
return clear_orphan_topic_files(
bootstrap_revision, std::move(snapshot))
.handle_exception(
[](const std::exception_ptr& err) {
vlog(
clusterlog.error,
"Exception while cleaning oprhan files {}",
err);
});
});
}
start_topics_reconciliation_loop();
_housekeeping_timer.set_callback([this] { housekeeping(); });
_housekeeping_timer.arm(_housekeeping_timer_interval);
Expand Down Expand Up @@ -570,6 +638,56 @@ ss::future<> controller_backend::fetch_deltas() {
});
}

/**
* Topic files is qualified as orphan if we don't have it in topic table
* or it's revision is less than revision it topic table
* And it's revision is less than topic table snapshot revision
*/
bool topic_files_are_orphan(
const model::ntp& ntp,
storage::partition_path::metadata ntp_directory_data,
const absl::flat_hash_map<model::ntp, model::revision_id>&
topic_table_snapshot,
model::revision_id last_applied_revision) {
vlog(clusterlog.debug, "Checking topic files for ntp {} are orphan", ntp);

if (ntp_directory_data.revision_id > last_applied_revision) {
return false;
}
if (
topic_table_snapshot.contains(ntp)
&& ntp_directory_data.revision_id
>= topic_table_snapshot.find(ntp)->second) {
return false;
}
return true;
}

ss::future<> controller_backend::clear_orphan_topic_files(
model::revision_id bootstrap_revision,
absl::flat_hash_map<model::ntp, model::revision_id> topic_table_snapshot) {
vlog(
clusterlog.info,
"Cleaning up orphan topic files. bootstrap_revision: {}",
bootstrap_revision);
// Init with default namespace to clean if there is no topics
absl::flat_hash_set<model::ns> namespaces = {{model::kafka_namespace}};
for (const auto& t : _topics.local().all_topics()) {
namespaces.emplace(t.ns);
}

return _storage.local().log_mgr().remove_orphan_files(
_data_directory,
std::move(namespaces),
[&,
bootstrap_revision,
topic_table_snapshot = std::move(topic_table_snapshot)](
model::ntp ntp, storage::partition_path::metadata p) {
return topic_files_are_orphan(
ntp, p, topic_table_snapshot, bootstrap_revision);
});
}

void controller_backend::start_topics_reconciliation_loop() {
ssx::spawn_with_gate(_gate, [this] {
return ss::do_until(
Expand Down
14 changes: 14 additions & 0 deletions src/v/cluster/controller_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,20 @@ class controller_backend

// Topics
ss::future<> bootstrap_controller_backend();
/**
* Function that will clean orphan topic files on redpanda startup
* Orphan topic files is files that left on disk after some node
* manipullations and redpanda doesn't know about these files, so it is
* impossible to remove them with default approach. Currently we may leave
* orphan topic files when we restart redpanda while partition deletion
* operation was evaluating but hasn't finished yet.
* We assume that we can leave orphan files only on redpanda restart
* so we run clean process on startup, after bootstrap when redpanda
* already knows about all topics that it should containt on disk
**/
ss::future<> clear_orphan_topic_files(
model::revision_id bootstrap_revision,
absl::flat_hash_map<model::ntp, model::revision_id> topic_table_snapshot);
void start_topics_reconciliation_loop();

ss::future<> fetch_deltas();
Expand Down
20 changes: 20 additions & 0 deletions src/v/storage/fs_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,26 @@ class segment_full_path;

class partition_path {
public:
struct metadata {
model::partition_id partition_id;
model::revision_id revision_id;
};

/// Parse ntp directory name
static std::optional<metadata>
parse_partition_directory(const ss::sstring& name) {
const std::regex re(R"(^(\d+)_(\d+)$)");
std::cmatch match;
if (!std::regex_match(name.c_str(), match, re)) {
return std::nullopt;
}
return metadata{
.partition_id = model::partition_id(
boost::lexical_cast<uint64_t>(match[1].str())),
.revision_id = model::revision_id(
boost::lexical_cast<uint64_t>(match[2].str()))};
}

partition_path(const ntp_config& ntpc);

operator std::filesystem::path() const { return make_path(); }
Expand Down
95 changes: 95 additions & 0 deletions src/v/storage/log_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@

#include "storage/log_manager.h"

#include "cluster/cluster_utils.h"
#include "cluster/topic_table.h"
#include "config/configuration.h"
#include "likely.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/timestamp.h"
#include "resource_mgmt/io_priority.h"
#include "ssx/async-clear.h"
Expand Down Expand Up @@ -46,6 +49,7 @@
#include <seastar/core/with_scheduling_group.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/file.hh>

#include <boost/algorithm/string/predicate.hpp>
#include <fmt/format.h>
Expand Down Expand Up @@ -437,6 +441,97 @@ ss::future<> log_manager::remove(model::ntp ntp) {
co_await dispatch_topic_dir_deletion(topic_dir);
}

ss::future<> remove_orphan_partition_files(
ss::sstring topic_directory_path,
model::topic_namespace nt,
ss::noncopyable_function<bool(model::ntp, partition_path::metadata)>&
orphan_filter) {
return directory_walker::walk(
topic_directory_path,
[topic_directory_path, nt, &orphan_filter](
ss::directory_entry entry) -> ss::future<> {
auto ntp_directory_data = partition_path::parse_partition_directory(
entry.name);
if (!ntp_directory_data) {
return ss::now();
}

auto ntp = model::ntp(nt.ns, nt.tp, ntp_directory_data->partition_id);
if (orphan_filter(ntp, *ntp_directory_data)) {
auto ntp_directory = std::filesystem::path(topic_directory_path)
/ std::filesystem::path(entry.name);
vlog(stlog.info, "Cleaning up ntp directory {} ", ntp_directory);
return ss::recursive_remove_directory(ntp_directory)
.handle_exception_type([ntp_directory](
std::filesystem::
filesystem_error const& err) {
vlog(
stlog.error,
"Exception while cleaning oprhan files for {} Error: {}",
ntp_directory,
err);
});
}
return ss::now();
});
}

ss::future<> log_manager::remove_orphan_files(
ss::sstring data_directory_path,
absl::flat_hash_set<model::ns> namespaces,
ss::noncopyable_function<bool(model::ntp, partition_path::metadata)>
orphan_filter) {
auto data_directory_exist = co_await ss::file_exists(data_directory_path);
if (!data_directory_exist) {
co_return;
}

for (const auto& ns : namespaces) {
auto namespace_directory = std::filesystem::path(data_directory_path)
/ std::filesystem::path(ss::sstring(ns));
auto namespace_directory_exist = co_await ss::file_exists(
namespace_directory.string());
if (!namespace_directory_exist) {
continue;
}
co_await directory_walker::walk(
namespace_directory.string(),
[this, &namespace_directory, &ns, &orphan_filter](
ss::directory_entry entry) -> ss::future<> {
if (entry.type != ss::directory_entry_type::directory) {
return ss::now();
}
auto topic_directory = namespace_directory
/ std::filesystem::path(entry.name);
return remove_orphan_partition_files(
topic_directory.string(),
model::topic_namespace(ns, model::topic(entry.name)),
orphan_filter)
.then([this, topic_directory]() {
vlog(
stlog.info,
"Trying to clean up topic directory {} ",
topic_directory);
return dispatch_topic_dir_deletion(
topic_directory.string());
})
.handle_exception_type(
[](std::filesystem::filesystem_error const& err) {
vlog(
stlog.error,
"Exception while cleaning oprhan files {}",
err);
});
})
.handle_exception_type(
[](std::filesystem::filesystem_error const& err) {
vlog(
stlog.error, "Exception while cleaning oprhan files {}", err);
});
}
co_return;
}

ss::future<> log_manager::dispatch_topic_dir_deletion(ss::sstring dir) {
return ss::smp::submit_to(
0,
Expand Down
12 changes: 12 additions & 0 deletions src/v/storage/log_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@

#pragma once

#include "cluster/topic_table.h"
#include "config/property.h"
#include "features/feature_table.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "random/simple_time_jitter.h"
#include "seastarx.h"
#include "storage/batch_cache.h"
Expand Down Expand Up @@ -174,6 +176,16 @@ class log_manager {
* rebalancing partitions across the cluster, etc...
*/
ss::future<> remove(model::ntp);
/**
* This function walsk through entire directory structure in an async fiber
* to remove all orphan files in that directory. It checks if file is orphan
* with special orphan_filter
*/
ss::future<> remove_orphan_files(
ss::sstring data_directory_path,
absl::flat_hash_set<model::ns> namespaces,
ss::noncopyable_function<bool(model::ntp, partition_path::metadata)>
orphan_filter);

ss::future<> stop();

Expand Down

0 comments on commit 6f9be07

Please sign in to comment.