-
Notifications
You must be signed in to change notification settings - Fork 593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Clear topics orphan files with fix #8960
Changes from all commits
cefbef2
2862c32
ab214ed
2eb0976
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,6 +43,7 @@ | |
#include <seastar/core/smp.hh> | ||
#include <seastar/util/later.hh> | ||
|
||
#include <absl/container/flat_hash_map.h> | ||
#include <absl/container/flat_hash_set.h> | ||
#include <absl/container/node_hash_map.h> | ||
|
||
|
@@ -56,20 +57,12 @@ | |
namespace cluster { | ||
namespace { | ||
|
||
inline bool contains_node( | ||
model::node_id id, const std::vector<model::broker_shard>& replicas) { | ||
return std::any_of( | ||
replicas.cbegin(), replicas.cend(), [id](const model::broker_shard& bs) { | ||
return bs.node_id == id; | ||
}); | ||
} | ||
|
||
bool is_cross_core_update(model::node_id self, const topic_table_delta& delta) { | ||
if (!delta.previous_replica_set) { | ||
return false; | ||
} | ||
return has_local_replicas(self, delta.new_assignment.replicas) | ||
&& contains_node(self, *delta.previous_replica_set) | ||
&& contains_node(*delta.previous_replica_set, self) | ||
&& !has_local_replicas(self, *delta.previous_replica_set); | ||
} | ||
|
||
|
@@ -219,7 +212,7 @@ std::error_code check_configuration_update( | |
change_revision); | ||
return errc::partition_configuration_revision_not_updated; | ||
} | ||
const bool includes_self = contains_node(self, bs); | ||
const bool includes_self = contains_node(bs, self); | ||
|
||
/* | ||
* if configuration includes current node, we expect configuration to be | ||
|
@@ -326,10 +319,76 @@ void controller_backend::setup_metrics() { | |
"Number of partitions with ongoing/requested operations")), | ||
}); | ||
} | ||
/** | ||
* Create snapshot of topic table that contains all ntp that | ||
* must be presented on local disk storage and their revision | ||
* This snapshot will help to define orphan topic files | ||
* If topic is not presented in this snapshot or its revision | ||
* its revision is less than in this snapshot then this topic | ||
* directory is orphan | ||
*/ | ||
static absl::flat_hash_map<model::ntp, model::revision_id> | ||
create_topic_table_snapshot( | ||
ss::sharded<cluster::topic_table>& topics, model::node_id current_node) { | ||
absl::flat_hash_map<model::ntp, model::revision_id> snapshot; | ||
|
||
for (const auto& nt : topics.local().all_topics()) { | ||
auto ntp_view = model::topic_namespace_view(nt); | ||
auto ntp_meta = topics.local().get_topic_metadata(ntp_view); | ||
if (!ntp_meta) { | ||
continue; | ||
} | ||
for (const auto& p : ntp_meta->get_assignments()) { | ||
auto ntp = model::ntp(nt.ns, nt.tp, p.id); | ||
if (!ntp_meta->is_topic_replicable()) { | ||
snapshot.emplace(ntp, 0); | ||
continue; | ||
} | ||
auto revision_id = ntp_meta->get_revision(); | ||
if (cluster::contains_node(p.replicas, current_node)) { | ||
snapshot.emplace(ntp, revision_id); | ||
continue; | ||
} | ||
auto target_replica_set = topics.local().get_target_replica_set( | ||
ntp); | ||
if ( | ||
target_replica_set | ||
&& cluster::contains_node(*target_replica_set, current_node)) { | ||
snapshot.emplace(ntp, revision_id); | ||
continue; | ||
} | ||
auto previous_replica_set = topics.local().get_previous_replica_set( | ||
ntp); | ||
if ( | ||
previous_replica_set | ||
&& cluster::contains_node(*previous_replica_set, current_node)) { | ||
snapshot.emplace(ntp, revision_id); | ||
continue; | ||
} | ||
} | ||
} | ||
return snapshot; | ||
} | ||
|
||
ss::future<> controller_backend::start() { | ||
setup_metrics(); | ||
return bootstrap_controller_backend().then([this] { | ||
if (ss::this_shard_id() == cluster::controller_stm_shard) { | ||
auto bootstrap_revision = _topics.local().last_applied_revision(); | ||
auto snapshot = create_topic_table_snapshot(_topics, _self); | ||
ssx::spawn_with_gate( | ||
_gate, | ||
[this, bootstrap_revision, snapshot = std::move(snapshot)] { | ||
return clear_orphan_topic_files( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: add some comments around what exactly orphan files and are why we do it at startup? (for readers who don't look at this as a commit). |
||
bootstrap_revision, std::move(snapshot)) | ||
.handle_exception([](const std::exception_ptr& err) { | ||
vlog( | ||
clusterlog.error, | ||
"Exception while cleaning oprhan files {}", | ||
err); | ||
}); | ||
}); | ||
} | ||
start_topics_reconciliation_loop(); | ||
_housekeeping_timer.set_callback([this] { housekeeping(); }); | ||
_housekeeping_timer.arm(_housekeeping_timer_interval); | ||
|
@@ -585,6 +644,56 @@ ss::future<> controller_backend::fetch_deltas() { | |
}); | ||
} | ||
|
||
/** | ||
* Topic files is qualified as orphan if we don't have it in topic table | ||
* or it's revision is less than revision it topic table | ||
* And it's revision is less than topic table snapshot revision | ||
*/ | ||
bool topic_files_are_orphan( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The logging in this function is a bit confusing to me. |
||
const model::ntp& ntp, | ||
storage::partition_path::metadata ntp_directory_data, | ||
const absl::flat_hash_map<model::ntp, model::revision_id>& | ||
topic_table_snapshot, | ||
model::revision_id last_applied_revision) { | ||
vlog(clusterlog.debug, "Checking topic files for ntp {} are orphan", ntp); | ||
|
||
if (ntp_directory_data.revision_id > last_applied_revision) { | ||
return false; | ||
} | ||
if ( | ||
topic_table_snapshot.contains(ntp) | ||
&& ntp_directory_data.revision_id | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: would be nice to document what qualifies as an orphan in the method doc. |
||
>= topic_table_snapshot.find(ntp)->second) { | ||
return false; | ||
} | ||
return true; | ||
} | ||
|
||
ss::future<> controller_backend::clear_orphan_topic_files( | ||
model::revision_id bootstrap_revision, | ||
absl::flat_hash_map<model::ntp, model::revision_id> topic_table_snapshot) { | ||
vlog( | ||
clusterlog.info, | ||
"Cleaning up orphan topic files. bootstrap_revision: {}", | ||
bootstrap_revision); | ||
// Init with default namespace to clean if there is no topics | ||
absl::flat_hash_set<model::ns> namespaces = {{model::kafka_namespace}}; | ||
for (const auto& t : _topics.local().all_topics()) { | ||
namespaces.emplace(t.ns); | ||
} | ||
|
||
return _storage.local().log_mgr().remove_orphan_files( | ||
_data_directory, | ||
std::move(namespaces), | ||
[&, | ||
bootstrap_revision, | ||
topic_table_snapshot = std::move(topic_table_snapshot)]( | ||
model::ntp ntp, storage::partition_path::metadata p) { | ||
return topic_files_are_orphan( | ||
ntp, p, topic_table_snapshot, bootstrap_revision); | ||
}); | ||
} | ||
|
||
void controller_backend::start_topics_reconciliation_loop() { | ||
ssx::spawn_with_gate(_gate, [this] { | ||
return ss::do_until( | ||
|
@@ -981,7 +1090,7 @@ controller_backend::process_partition_reconfiguration( | |
* 1) shutdown partition instance | ||
* 2) create instance on target remote core | ||
*/ | ||
if (contains_node(_self, target_assignment.replicas)) { | ||
if (contains_node(target_assignment.replicas, _self)) { | ||
co_return co_await shutdown_on_current_shard( | ||
std::move(ntp), command_rev); | ||
} | ||
|
@@ -1006,7 +1115,7 @@ controller_backend::process_partition_reconfiguration( | |
// Wait fo the operation to be finished on one of the nodes | ||
co_return errc::waiting_for_reconfiguration_finish; | ||
} | ||
const auto cross_core_move = contains_node(_self, previous_replicas) | ||
const auto cross_core_move = contains_node(previous_replicas, _self) | ||
&& !has_local_replicas( | ||
_self, previous_replicas); | ||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,9 +9,12 @@ | |
|
||
#include "storage/log_manager.h" | ||
|
||
#include "cluster/cluster_utils.h" | ||
#include "cluster/topic_table.h" | ||
#include "config/configuration.h" | ||
#include "likely.h" | ||
#include "model/fundamental.h" | ||
#include "model/metadata.h" | ||
#include "model/timestamp.h" | ||
#include "resource_mgmt/io_priority.h" | ||
#include "ssx/async-clear.h" | ||
|
@@ -46,6 +49,7 @@ | |
#include <seastar/core/with_scheduling_group.hh> | ||
#include <seastar/coroutine/maybe_yield.hh> | ||
#include <seastar/coroutine/parallel_for_each.hh> | ||
#include <seastar/util/file.hh> | ||
|
||
#include <boost/algorithm/string/predicate.hpp> | ||
#include <fmt/format.h> | ||
|
@@ -430,6 +434,97 @@ ss::future<> log_manager::remove(model::ntp ntp) { | |
co_await dispatch_topic_dir_deletion(topic_dir); | ||
} | ||
|
||
ss::future<> remove_orphan_partition_files( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I'd move this free function into the anon namespace ( |
||
ss::sstring topic_directory_path, | ||
model::topic_namespace nt, | ||
ss::noncopyable_function<bool(model::ntp, partition_path::metadata)>& | ||
orphan_filter) { | ||
return directory_walker::walk( | ||
topic_directory_path, | ||
[topic_directory_path, nt, &orphan_filter]( | ||
ss::directory_entry entry) -> ss::future<> { | ||
auto ntp_directory_data = partition_path::parse_partition_directory( | ||
entry.name); | ||
if (!ntp_directory_data) { | ||
return ss::now(); | ||
} | ||
|
||
auto ntp = model::ntp(nt.ns, nt.tp, ntp_directory_data->partition_id); | ||
if (orphan_filter(ntp, *ntp_directory_data)) { | ||
auto ntp_directory = std::filesystem::path(topic_directory_path) | ||
/ std::filesystem::path(entry.name); | ||
vlog(stlog.info, "Cleaning up ntp directory {} ", ntp_directory); | ||
return ss::recursive_remove_directory(ntp_directory) | ||
.handle_exception_type([ntp_directory]( | ||
std::filesystem:: | ||
filesystem_error const& err) { | ||
vlog( | ||
stlog.error, | ||
"Exception while cleaning oprhan files for {} Error: {}", | ||
ntp_directory, | ||
err); | ||
}); | ||
} | ||
return ss::now(); | ||
}); | ||
} | ||
|
||
ss::future<> log_manager::remove_orphan_files( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: would be nice to mention that this walks thru the entire directory structure in an async fiber to remove... |
||
ss::sstring data_directory_path, | ||
absl::flat_hash_set<model::ns> namespaces, | ||
ss::noncopyable_function<bool(model::ntp, partition_path::metadata)> | ||
orphan_filter) { | ||
auto data_directory_exist = co_await ss::file_exists(data_directory_path); | ||
if (!data_directory_exist) { | ||
co_return; | ||
} | ||
|
||
for (const auto& ns : namespaces) { | ||
auto namespace_directory = std::filesystem::path(data_directory_path) | ||
/ std::filesystem::path(ss::sstring(ns)); | ||
auto namespace_directory_exist = co_await ss::file_exists( | ||
namespace_directory.string()); | ||
if (!namespace_directory_exist) { | ||
continue; | ||
} | ||
co_await directory_walker::walk( | ||
namespace_directory.string(), | ||
[this, &namespace_directory, &ns, &orphan_filter]( | ||
ss::directory_entry entry) -> ss::future<> { | ||
if (entry.type != ss::directory_entry_type::directory) { | ||
return ss::now(); | ||
} | ||
auto topic_directory = namespace_directory | ||
/ std::filesystem::path(entry.name); | ||
return remove_orphan_partition_files( | ||
topic_directory.string(), | ||
model::topic_namespace(ns, model::topic(entry.name)), | ||
orphan_filter) | ||
.then([this, topic_directory]() { | ||
vlog( | ||
stlog.info, | ||
"Trying to clean up topic directory {} ", | ||
topic_directory); | ||
return dispatch_topic_dir_deletion( | ||
topic_directory.string()); | ||
}) | ||
.handle_exception_type( | ||
[](std::filesystem::filesystem_error const& err) { | ||
vlog( | ||
stlog.error, | ||
"Exception while cleaning oprhan files {}", | ||
err); | ||
}); | ||
}) | ||
.handle_exception_type( | ||
[](std::filesystem::filesystem_error const& err) { | ||
vlog( | ||
stlog.error, "Exception while cleaning oprhan files {}", err); | ||
}); | ||
} | ||
co_return; | ||
} | ||
|
||
ss::future<> log_manager::dispatch_topic_dir_deletion(ss::sstring dir) { | ||
return ss::smp::submit_to( | ||
0, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it will be helpful to have a comment why we make a snpashot and what exactly is in the snapshot.