diff --git a/src/v/cluster/controller_backend.cc b/src/v/cluster/controller_backend.cc index f5fd7b93e5ff..700fbf2a6ccc 100644 --- a/src/v/cluster/controller_backend.cc +++ b/src/v/cluster/controller_backend.cc @@ -56,11 +56,19 @@ namespace cluster { namespace { +inline bool contains_node( + model::node_id id, const std::vector& replicas) { + return std::any_of( + replicas.cbegin(), replicas.cend(), [id](const model::broker_shard& bs) { + return bs.node_id == id; + }); +} + bool is_cross_core_update(model::node_id self, const topic_table_delta& delta) { if (!delta.previous_replica_set) { return false; } - return contains_node(*delta.previous_replica_set, self) + return contains_node(self, *delta.previous_replica_set) && !has_local_replicas(self, *delta.previous_replica_set); } @@ -186,7 +194,7 @@ std::error_code check_configuration_update( change_revision); return errc::partition_configuration_revision_not_updated; } - const bool includes_self = contains_node(bs, self); + const bool includes_self = contains_node(self, bs); /* * if configuration includes current node, we expect configuration to be @@ -292,19 +300,6 @@ void controller_backend::setup_metrics() { ss::future<> controller_backend::start() { setup_metrics(); return bootstrap_controller_backend().then([this] { - if (ss::this_shard_id() == cluster::controller_stm_shard) { - auto bootstrap_revision = _topics.local().last_applied_revision(); - ssx::spawn_with_gate(_gate, [this, bootstrap_revision] { - return clear_orphan_topic_files(bootstrap_revision) - .handle_exception_type( - [](std::filesystem::filesystem_error const& err) { - vlog( - clusterlog.error, - "Exception while cleaning oprhan files {}", - err); - }); - }); - } start_topics_reconciliation_loop(); _housekeeping_timer.set_callback([this] { housekeeping(); }); _housekeeping_timer.arm(_housekeeping_timer_interval); @@ -541,7 +536,7 @@ controller_backend::bootstrap_ntp(const model::ntp& ntp, deltas_t& deltas) { } return md.delta.type == op_t::update_finished && !contains_node( - md.delta.new_assignment.replicas, _self); + _self, md.delta.new_assignment.replicas); }); vassert( @@ -571,7 +566,7 @@ controller_backend::bootstrap_ntp(const model::ntp& ntp, deltas_t& deltas) { * first operation that created replica on current node * */ - if (!contains_node(it->delta.new_assignment.replicas, _self)) { + if (!contains_node(_self, it->delta.new_assignment.replicas)) { vassert( it != deltas.rbegin(), "operation {} must have following operation that created a " @@ -615,70 +610,6 @@ ss::future<> controller_backend::fetch_deltas() { }); } -bool topic_files_are_orphan( - const model::ntp& ntp, - storage::partition_path::metadata ntp_directory_data, - ss::sharded& _topics, - model::revision_id last_applied_revision, - model::node_id current_node) { - if (ntp_directory_data.revision_id > last_applied_revision) { - return false; - } - auto ntp_view = model::topic_namespace_view(ntp); - if (!_topics.local().contains(ntp_view)) { - return true; - } - auto ntp_meta = _topics.local().get_topic_metadata(ntp_view); - if (ntp_meta && !ntp_meta->is_topic_replicable()) { - return false; - } - if (ntp_meta && ntp_meta->get_revision() > ntp_directory_data.revision_id) { - return true; - } - - auto current_replica_set = _topics.local().get_partition_assignment(ntp); - if (cluster::contains_node(current_replica_set->replicas, current_node)) { - return false; - } - - auto target_replica_set = _topics.local().get_target_replica_set(ntp); - if ( - target_replica_set - && cluster::contains_node(*target_replica_set, current_node)) { - return false; - } - - auto previous_replica_set = _topics.local().get_previous_replica_set(ntp); - if ( - previous_replica_set - && cluster::contains_node(*previous_replica_set, current_node)) { - return false; - } - return true; -} - -ss::future<> controller_backend::clear_orphan_topic_files( - model::revision_id bootstrap_revision) { - vlog( - clusterlog.info, - "Cleaning up orphan topic files. bootstrap_revision: {}", - bootstrap_revision); - // Init with default namespace to clean if there is no topics - absl::flat_hash_set namespaces = {{model::kafka_namespace}}; - for (const auto& t : _topics.local().all_topics()) { - namespaces.emplace(t.ns); - } - - return _storage.local().log_mgr().remove_orphan_files( - _data_directory, - std::move(namespaces), - [&, bootstrap_revision]( - model::ntp ntp, storage::partition_path::metadata p) { - return topic_files_are_orphan( - ntp, p, _topics, bootstrap_revision, _self); - }); -} - void controller_backend::start_topics_reconciliation_loop() { ssx::spawn_with_gate(_gate, [this] { return ss::do_until( @@ -1083,7 +1014,7 @@ controller_backend::process_partition_reconfiguration( * 1) shutdown partition instance * 2) create instance on target remote core */ - if (contains_node(target_assignment.replicas, _self)) { + if (contains_node(_self, target_assignment.replicas)) { co_return co_await shutdown_on_current_shard(std::move(ntp), rev); } @@ -1107,7 +1038,7 @@ controller_backend::process_partition_reconfiguration( // Wait fo the operation to be finished on one of the nodes co_return errc::waiting_for_reconfiguration_finish; } - const auto cross_core_move = contains_node(previous_replicas, _self) + const auto cross_core_move = contains_node(_self, previous_replicas) && !has_local_replicas( _self, previous_replicas); /** diff --git a/src/v/cluster/controller_backend.h b/src/v/cluster/controller_backend.h index 47f6de9cedb7..43ac152b42a9 100644 --- a/src/v/cluster/controller_backend.h +++ b/src/v/cluster/controller_backend.h @@ -268,8 +268,6 @@ class controller_backend // Topics ss::future<> bootstrap_controller_backend(); - ss::future<> - clear_orphan_topic_files(model::revision_id bootstrap_revision); void start_topics_reconciliation_loop(); ss::future<> fetch_deltas(); diff --git a/src/v/storage/fs_utils.h b/src/v/storage/fs_utils.h index 50a4e40e3d14..2e5354dacaed 100644 --- a/src/v/storage/fs_utils.h +++ b/src/v/storage/fs_utils.h @@ -67,26 +67,6 @@ class segment_full_path; class partition_path { public: - struct metadata { - model::partition_id partition_id; - model::revision_id revision_id; - }; - - /// Parse ntp directory name - static std::optional - parse_partition_directory(const ss::sstring& name) { - const std::regex re(R"(^(\d+)_(\d+)$)"); - std::cmatch match; - if (!std::regex_match(name.c_str(), match, re)) { - return std::nullopt; - } - return metadata{ - .partition_id = model::partition_id( - boost::lexical_cast(match[1].str())), - .revision_id = model::revision_id( - boost::lexical_cast(match[2].str()))}; - } - partition_path(const ntp_config& ntpc); operator std::filesystem::path() const { return make_path(); } diff --git a/src/v/storage/log_manager.cc b/src/v/storage/log_manager.cc index e5c207019b37..08dcbcab800c 100644 --- a/src/v/storage/log_manager.cc +++ b/src/v/storage/log_manager.cc @@ -9,12 +9,9 @@ #include "storage/log_manager.h" -#include "cluster/cluster_utils.h" -#include "cluster/topic_table.h" #include "config/configuration.h" #include "likely.h" #include "model/fundamental.h" -#include "model/metadata.h" #include "model/timestamp.h" #include "resource_mgmt/io_priority.h" #include "ssx/async-clear.h" @@ -49,7 +46,6 @@ #include #include #include -#include #include #include @@ -440,85 +436,6 @@ ss::future<> log_manager::remove(model::ntp ntp) { co_await dispatch_topic_dir_deletion(topic_dir); } -ss::future<> remove_orphan_partition_files( - ss::sstring topic_directory_path, - model::topic_namespace nt, - ss::noncopyable_function& - orphan_filter) { - return directory_walker::walk( - topic_directory_path, - [topic_directory_path, nt, &orphan_filter]( - ss::directory_entry entry) -> ss::future<> { - auto ntp_directory_data = partition_path::parse_partition_directory( - entry.name); - if (!ntp_directory_data) { - return ss::now(); - } - - auto ntp = model::ntp(nt.ns, nt.tp, ntp_directory_data->partition_id); - if (orphan_filter(ntp, *ntp_directory_data)) { - auto ntp_directory = std::filesystem::path(topic_directory_path) - / std::filesystem::path(entry.name); - vlog(stlog.info, "Cleaning up ntp directory {} ", ntp_directory); - return ss::recursive_remove_directory(ntp_directory) - .handle_exception_type([ntp_directory]( - std::filesystem:: - filesystem_error const& err) { - vlog( - stlog.error, - "Exception while cleaning oprhan files for {} Error: {}", - ntp_directory, - err); - }); - } - return ss::now(); - }); -} - -ss::future<> log_manager::remove_orphan_files( - ss::sstring data_directory_path, - absl::flat_hash_set namespaces, - ss::noncopyable_function - orphan_filter) { - auto data_directory_exist = co_await ss::file_exists(data_directory_path); - if (!data_directory_exist) { - co_return; - } - - for (const auto& ns : namespaces) { - auto namespace_directory = std::filesystem::path(data_directory_path) - / std::filesystem::path(ss::sstring(ns)); - auto namespace_directory_exist = co_await ss::file_exists( - namespace_directory.string()); - if (!namespace_directory_exist) { - continue; - } - co_await directory_walker::walk( - namespace_directory.string(), - [this, &namespace_directory, &ns, &orphan_filter]( - ss::directory_entry entry) -> ss::future<> { - if (entry.type != ss::directory_entry_type::directory) { - return ss::now(); - } - auto topic_directory = namespace_directory - / std::filesystem::path(entry.name); - return remove_orphan_partition_files( - topic_directory.string(), - model::topic_namespace(ns, model::topic(entry.name)), - orphan_filter) - .then([this, topic_directory]() { - vlog( - stlog.info, - "Trying to clean up topic directory {} ", - topic_directory); - return dispatch_topic_dir_deletion( - topic_directory.string()); - }); - }); - } - co_return; -} - ss::future<> log_manager::dispatch_topic_dir_deletion(ss::sstring dir) { return ss::smp::submit_to( 0, diff --git a/src/v/storage/log_manager.h b/src/v/storage/log_manager.h index cbe9f803fcc0..13f767152c56 100644 --- a/src/v/storage/log_manager.h +++ b/src/v/storage/log_manager.h @@ -11,11 +11,9 @@ #pragma once -#include "cluster/topic_table.h" #include "config/property.h" #include "features/feature_table.h" #include "model/fundamental.h" -#include "model/metadata.h" #include "random/simple_time_jitter.h" #include "seastarx.h" #include "storage/batch_cache.h" @@ -176,11 +174,6 @@ class log_manager { * rebalancing partitions across the cluster, etc... */ ss::future<> remove(model::ntp); - ss::future<> remove_orphan_files( - ss::sstring data_directory_path, - absl::flat_hash_set namespaces, - ss::noncopyable_function - orphan_filter); ss::future<> stop(); diff --git a/tests/rptest/tests/topic_delete_test.py b/tests/rptest/tests/topic_delete_test.py index 55763ee8fac0..831374c40467 100644 --- a/tests/rptest/tests/topic_delete_test.py +++ b/tests/rptest/tests/topic_delete_test.py @@ -25,7 +25,6 @@ from rptest.services.redpanda import CloudStorageType, SISettings from rptest.util import wait_for_local_storage_truncate, firewall_blocked from rptest.services.admin import Admin -from rptest.tests.partition_movement import PartitionMovementMixin def get_kvstore_topic_key_counts(redpanda): @@ -140,23 +139,16 @@ def __init__(self, test_context): self.kafka_tools = KafkaCliTools(self.redpanda) - def produce_until_partitions(self): - self.kafka_tools.produce(self.topic, 1024, 1024) - storage = self.redpanda.storage() - return len(list(storage.partitions("kafka", self.topic))) == 9 - - def dump_storage_listing(self): - for node in self.redpanda.nodes: - self.logger.error(f"Storage listing on {node.name}:") - for line in node.account.ssh_capture( - f"find {self.redpanda.DATA_DIR}"): - self.logger.error(line.strip()) - @cluster(num_nodes=3) @parametrize(with_restart=False) @parametrize(with_restart=True) def topic_delete_test(self, with_restart): - wait_until(lambda: self.produce_until_partitions(), + def produce_until_partitions(): + self.kafka_tools.produce(self.topic, 1024, 1024) + storage = self.redpanda.storage() + return len(list(storage.partitions("kafka", self.topic))) == 9 + + wait_until(lambda: produce_until_partitions(), timeout_sec=30, backoff_sec=2, err_msg="Expected partition did not materialize") @@ -180,178 +172,16 @@ def topic_delete_test(self, with_restart): err_msg="Topic storage was not removed") except: - self.dump_storage_listing() - raise - - @cluster(num_nodes=3, log_allow_list=[r'filesystem error: remove failed']) - def topic_delete_orphan_files_test(self): - wait_until(lambda: self.produce_until_partitions(), - timeout_sec=30, - backoff_sec=2, - err_msg="Expected partition did not materialize") - - # Sanity check the kvstore checks: there should be at least one kvstore entry - # per partition while the topic exists. - assert sum(get_kvstore_topic_key_counts( - self.redpanda).values()) >= self.topics[0].partition_count - - down_node = self.redpanda.nodes[-1] - try: - # Make topic directory immutable to prevent deleting - down_node.account.ssh( - f"chattr +i {self.redpanda.DATA_DIR}/kafka/{self.topic}") - - self.kafka_tools.delete_topic(self.topic) - - def topic_deleted_on_all_nodes_except_one(redpanda, down_node, - topic_name): - storage = redpanda.storage() - log_not_removed_on_down = topic_name in next( - filter(lambda x: x.name == down_node.name, - storage.nodes)).ns["kafka"].topics - logs_removed_on_others = all( - map( - lambda n: topic_name not in n.ns["kafka"].topics, - filter(lambda x: x.name != down_node.name, - storage.nodes))) - return log_not_removed_on_down and logs_removed_on_others - - try: - wait_until( - lambda: topic_deleted_on_all_nodes_except_one( - self.redpanda, down_node, self.topic), - timeout_sec=30, - backoff_sec=2, - err_msg= - "Topic storage was not removed from running nodes or removed from down node" - ) - except: - self.dump_storage_listing() - raise - - self.redpanda.stop_node(down_node) - finally: - down_node.account.ssh( - f"chattr -i {self.redpanda.DATA_DIR}/kafka/{self.topic}") + # On errors, dump listing of the storage location + for node in self.redpanda.nodes: + self.logger.error(f"Storage listing on {node.name}:") + for line in node.account.ssh_capture( + f"find {self.redpanda.DATA_DIR}"): + self.logger.error(line.strip()) - self.redpanda.start_node(down_node) - - try: - wait_until(lambda: topic_storage_purged(self.redpanda, self.topic), - timeout_sec=10, - backoff_sec=2, - err_msg="Topic storage was not removed") - except: - self.dump_storage_listing() raise -class TopicDeleteAfterMovementTest(RedpandaTest): - """ - Verify that topic deleted after partition movement. - """ - partition_count = 3 - topics = (TopicSpec(partition_count=partition_count), ) - - def __init__(self, test_context): - super(TopicDeleteAfterMovementTest, - self).__init__(test_context=test_context, num_brokers=4) - - self.kafka_tools = KafkaCliTools(self.redpanda) - - def movement_done(self, partition, assignments): - results = [] - for n in self.redpanda._started: - info = self.admin.get_partitions(self.topic, partition, node=n) - self.logger.info( - f"current assignments for {self.topic}-{partition}: {info}") - converged = PartitionMovementMixin._equal_assignments( - info["replicas"], assignments) - results.append(converged and info["status"] == "done") - return all(results) - - def move_topic(self, assignments): - for partition in range(3): - - def get_nodes(partition): - return list(r['node_id'] for r in partition['replicas']) - - nodes_before = set( - get_nodes(self.admin.get_partitions(self.topic, partition))) - nodes_after = {r['node_id'] for r in assignments} - if nodes_before == nodes_after: - continue - self.admin.set_partition_replicas(self.topic, partition, - assignments) - - wait_until(lambda: self.movement_done(partition, assignments), - timeout_sec=60, - backoff_sec=2) - - @cluster(num_nodes=4, log_allow_list=[r'filesystem error: remove failed']) - def topic_delete_orphan_files_after_move_test(self): - - # Write out 10MB per partition - self.kafka_tools.produce(self.topic, - record_size=4096, - num_records=2560 * self.partition_count) - - self.admin = Admin(self.redpanda) - - # Move every partition to nodes 1,2,3 - assignments = [dict(node_id=n, core=0) for n in [1, 2, 3]] - self.move_topic(assignments) - - down_node = self.redpanda.nodes[0] - try: - # Make topic directory immutable to prevent deleting - down_node.account.ssh( - f"chattr +i {self.redpanda.DATA_DIR}/kafka/{self.topic}") - - # Move every partition from node 1 to node 4 - new_assignments = [dict(node_id=n, core=0) for n in [2, 3, 4]] - self.move_topic(new_assignments) - - def topic_exist_on_every_node(redpanda, topic_name): - storage = redpanda.storage() - exist_on_every = all( - map(lambda n: topic_name in n.ns["kafka"].topics, - storage.nodes)) - return exist_on_every - - wait_until( - lambda: topic_exist_on_every_node(self.redpanda, self.topic), - timeout_sec=30, - backoff_sec=2, - err_msg="Topic doesn't exist on some node") - - self.redpanda.stop_node(down_node) - finally: - down_node.account.ssh( - f"chattr -i {self.redpanda.DATA_DIR}/kafka/{self.topic}") - - self.redpanda.start_node(down_node) - - def topic_deleted_on_down_node_and_exist_on_others( - redpanda, down_node, topic_name): - storage = redpanda.storage() - log_removed_on_down = topic_name not in next( - filter(lambda x: x.name == down_node.name, - storage.nodes)).ns["kafka"].topics - logs_not_removed_on_others = all( - map(lambda n: topic_name in n.ns["kafka"].topics, - filter(lambda x: x.name != down_node.name, storage.nodes))) - return log_removed_on_down and logs_not_removed_on_others - - wait_until( - lambda: topic_deleted_on_down_node_and_exist_on_others( - self.redpanda, down_node, self.topic), - timeout_sec=30, - backoff_sec=2, - err_msg= - "Topic storage was not removed on down node or removed on other") - - class TopicDeleteCloudStorageTest(RedpandaTest): partition_count = 3 topics = (TopicSpec(partition_count=partition_count,