Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Clear topics orphan files" #8932

Merged
merged 1 commit into from
Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 14 additions & 83 deletions src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,19 @@
namespace cluster {
namespace {

inline bool contains_node(
model::node_id id, const std::vector<model::broker_shard>& replicas) {
return std::any_of(
replicas.cbegin(), replicas.cend(), [id](const model::broker_shard& bs) {
return bs.node_id == id;
});
}

bool is_cross_core_update(model::node_id self, const topic_table_delta& delta) {
if (!delta.previous_replica_set) {
return false;
}
return contains_node(*delta.previous_replica_set, self)
return contains_node(self, *delta.previous_replica_set)
&& !has_local_replicas(self, *delta.previous_replica_set);
}

Expand Down Expand Up @@ -186,7 +194,7 @@ std::error_code check_configuration_update(
change_revision);
return errc::partition_configuration_revision_not_updated;
}
const bool includes_self = contains_node(bs, self);
const bool includes_self = contains_node(self, bs);

/*
* if configuration includes current node, we expect configuration to be
Expand Down Expand Up @@ -292,19 +300,6 @@ void controller_backend::setup_metrics() {
ss::future<> controller_backend::start() {
setup_metrics();
return bootstrap_controller_backend().then([this] {
if (ss::this_shard_id() == cluster::controller_stm_shard) {
auto bootstrap_revision = _topics.local().last_applied_revision();
ssx::spawn_with_gate(_gate, [this, bootstrap_revision] {
return clear_orphan_topic_files(bootstrap_revision)
.handle_exception_type(
[](std::filesystem::filesystem_error const& err) {
vlog(
clusterlog.error,
"Exception while cleaning oprhan files {}",
err);
});
});
}
start_topics_reconciliation_loop();
_housekeeping_timer.set_callback([this] { housekeeping(); });
_housekeeping_timer.arm(_housekeeping_timer_interval);
Expand Down Expand Up @@ -541,7 +536,7 @@ controller_backend::bootstrap_ntp(const model::ntp& ntp, deltas_t& deltas) {
}
return md.delta.type == op_t::update_finished
&& !contains_node(
md.delta.new_assignment.replicas, _self);
_self, md.delta.new_assignment.replicas);
});

vassert(
Expand Down Expand Up @@ -571,7 +566,7 @@ controller_backend::bootstrap_ntp(const model::ntp& ntp, deltas_t& deltas) {
* first operation that created replica on current node
*
*/
if (!contains_node(it->delta.new_assignment.replicas, _self)) {
if (!contains_node(_self, it->delta.new_assignment.replicas)) {
vassert(
it != deltas.rbegin(),
"operation {} must have following operation that created a "
Expand Down Expand Up @@ -615,70 +610,6 @@ ss::future<> controller_backend::fetch_deltas() {
});
}

bool topic_files_are_orphan(
const model::ntp& ntp,
storage::partition_path::metadata ntp_directory_data,
ss::sharded<cluster::topic_table>& _topics,
model::revision_id last_applied_revision,
model::node_id current_node) {
if (ntp_directory_data.revision_id > last_applied_revision) {
return false;
}
auto ntp_view = model::topic_namespace_view(ntp);
if (!_topics.local().contains(ntp_view)) {
return true;
}
auto ntp_meta = _topics.local().get_topic_metadata(ntp_view);
if (ntp_meta && !ntp_meta->is_topic_replicable()) {
return false;
}
if (ntp_meta && ntp_meta->get_revision() > ntp_directory_data.revision_id) {
return true;
}

auto current_replica_set = _topics.local().get_partition_assignment(ntp);
if (cluster::contains_node(current_replica_set->replicas, current_node)) {
return false;
}

auto target_replica_set = _topics.local().get_target_replica_set(ntp);
if (
target_replica_set
&& cluster::contains_node(*target_replica_set, current_node)) {
return false;
}

auto previous_replica_set = _topics.local().get_previous_replica_set(ntp);
if (
previous_replica_set
&& cluster::contains_node(*previous_replica_set, current_node)) {
return false;
}
return true;
}

ss::future<> controller_backend::clear_orphan_topic_files(
model::revision_id bootstrap_revision) {
vlog(
clusterlog.info,
"Cleaning up orphan topic files. bootstrap_revision: {}",
bootstrap_revision);
// Init with default namespace to clean if there is no topics
absl::flat_hash_set<model::ns> namespaces = {{model::kafka_namespace}};
for (const auto& t : _topics.local().all_topics()) {
namespaces.emplace(t.ns);
}

return _storage.local().log_mgr().remove_orphan_files(
_data_directory,
std::move(namespaces),
[&, bootstrap_revision](
model::ntp ntp, storage::partition_path::metadata p) {
return topic_files_are_orphan(
ntp, p, _topics, bootstrap_revision, _self);
});
}

void controller_backend::start_topics_reconciliation_loop() {
ssx::spawn_with_gate(_gate, [this] {
return ss::do_until(
Expand Down Expand Up @@ -1083,7 +1014,7 @@ controller_backend::process_partition_reconfiguration(
* 1) shutdown partition instance
* 2) create instance on target remote core
*/
if (contains_node(target_assignment.replicas, _self)) {
if (contains_node(_self, target_assignment.replicas)) {
co_return co_await shutdown_on_current_shard(std::move(ntp), rev);
}

Expand All @@ -1107,7 +1038,7 @@ controller_backend::process_partition_reconfiguration(
// Wait fo the operation to be finished on one of the nodes
co_return errc::waiting_for_reconfiguration_finish;
}
const auto cross_core_move = contains_node(previous_replicas, _self)
const auto cross_core_move = contains_node(_self, previous_replicas)
&& !has_local_replicas(
_self, previous_replicas);
/**
Expand Down
2 changes: 0 additions & 2 deletions src/v/cluster/controller_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,6 @@ class controller_backend

// Topics
ss::future<> bootstrap_controller_backend();
ss::future<>
clear_orphan_topic_files(model::revision_id bootstrap_revision);
void start_topics_reconciliation_loop();

ss::future<> fetch_deltas();
Expand Down
20 changes: 0 additions & 20 deletions src/v/storage/fs_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,26 +67,6 @@ class segment_full_path;

class partition_path {
public:
struct metadata {
model::partition_id partition_id;
model::revision_id revision_id;
};

/// Parse ntp directory name
static std::optional<metadata>
parse_partition_directory(const ss::sstring& name) {
const std::regex re(R"(^(\d+)_(\d+)$)");
std::cmatch match;
if (!std::regex_match(name.c_str(), match, re)) {
return std::nullopt;
}
return metadata{
.partition_id = model::partition_id(
boost::lexical_cast<uint64_t>(match[1].str())),
.revision_id = model::revision_id(
boost::lexical_cast<uint64_t>(match[2].str()))};
}

partition_path(const ntp_config& ntpc);

operator std::filesystem::path() const { return make_path(); }
Expand Down
83 changes: 0 additions & 83 deletions src/v/storage/log_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,9 @@

#include "storage/log_manager.h"

#include "cluster/cluster_utils.h"
#include "cluster/topic_table.h"
#include "config/configuration.h"
#include "likely.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/timestamp.h"
#include "resource_mgmt/io_priority.h"
#include "ssx/async-clear.h"
Expand Down Expand Up @@ -49,7 +46,6 @@
#include <seastar/core/with_scheduling_group.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/file.hh>

#include <boost/algorithm/string/predicate.hpp>
#include <fmt/format.h>
Expand Down Expand Up @@ -440,85 +436,6 @@ ss::future<> log_manager::remove(model::ntp ntp) {
co_await dispatch_topic_dir_deletion(topic_dir);
}

ss::future<> remove_orphan_partition_files(
ss::sstring topic_directory_path,
model::topic_namespace nt,
ss::noncopyable_function<bool(model::ntp, partition_path::metadata)>&
orphan_filter) {
return directory_walker::walk(
topic_directory_path,
[topic_directory_path, nt, &orphan_filter](
ss::directory_entry entry) -> ss::future<> {
auto ntp_directory_data = partition_path::parse_partition_directory(
entry.name);
if (!ntp_directory_data) {
return ss::now();
}

auto ntp = model::ntp(nt.ns, nt.tp, ntp_directory_data->partition_id);
if (orphan_filter(ntp, *ntp_directory_data)) {
auto ntp_directory = std::filesystem::path(topic_directory_path)
/ std::filesystem::path(entry.name);
vlog(stlog.info, "Cleaning up ntp directory {} ", ntp_directory);
return ss::recursive_remove_directory(ntp_directory)
.handle_exception_type([ntp_directory](
std::filesystem::
filesystem_error const& err) {
vlog(
stlog.error,
"Exception while cleaning oprhan files for {} Error: {}",
ntp_directory,
err);
});
}
return ss::now();
});
}

ss::future<> log_manager::remove_orphan_files(
ss::sstring data_directory_path,
absl::flat_hash_set<model::ns> namespaces,
ss::noncopyable_function<bool(model::ntp, partition_path::metadata)>
orphan_filter) {
auto data_directory_exist = co_await ss::file_exists(data_directory_path);
if (!data_directory_exist) {
co_return;
}

for (const auto& ns : namespaces) {
auto namespace_directory = std::filesystem::path(data_directory_path)
/ std::filesystem::path(ss::sstring(ns));
auto namespace_directory_exist = co_await ss::file_exists(
namespace_directory.string());
if (!namespace_directory_exist) {
continue;
}
co_await directory_walker::walk(
namespace_directory.string(),
[this, &namespace_directory, &ns, &orphan_filter](
ss::directory_entry entry) -> ss::future<> {
if (entry.type != ss::directory_entry_type::directory) {
return ss::now();
}
auto topic_directory = namespace_directory
/ std::filesystem::path(entry.name);
return remove_orphan_partition_files(
topic_directory.string(),
model::topic_namespace(ns, model::topic(entry.name)),
orphan_filter)
.then([this, topic_directory]() {
vlog(
stlog.info,
"Trying to clean up topic directory {} ",
topic_directory);
return dispatch_topic_dir_deletion(
topic_directory.string());
});
});
}
co_return;
}

ss::future<> log_manager::dispatch_topic_dir_deletion(ss::sstring dir) {
return ss::smp::submit_to(
0,
Expand Down
7 changes: 0 additions & 7 deletions src/v/storage/log_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@

#pragma once

#include "cluster/topic_table.h"
#include "config/property.h"
#include "features/feature_table.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "random/simple_time_jitter.h"
#include "seastarx.h"
#include "storage/batch_cache.h"
Expand Down Expand Up @@ -176,11 +174,6 @@ class log_manager {
* rebalancing partitions across the cluster, etc...
*/
ss::future<> remove(model::ntp);
ss::future<> remove_orphan_files(
ss::sstring data_directory_path,
absl::flat_hash_set<model::ns> namespaces,
ss::noncopyable_function<bool(model::ntp, partition_path::metadata)>
orphan_filter);

ss::future<> stop();

Expand Down
Loading