diff --git a/src/v/cluster/controller_backend.cc b/src/v/cluster/controller_backend.cc index 33fbc39f4ef35..3b436585dcf5d 100644 --- a/src/v/cluster/controller_backend.cc +++ b/src/v/cluster/controller_backend.cc @@ -43,6 +43,7 @@ #include #include +#include #include #include @@ -56,20 +57,12 @@ namespace cluster { namespace { -inline bool contains_node( - model::node_id id, const std::vector& replicas) { - return std::any_of( - replicas.cbegin(), replicas.cend(), [id](const model::broker_shard& bs) { - return bs.node_id == id; - }); -} - bool is_cross_core_update(model::node_id self, const topic_table_delta& delta) { if (!delta.previous_replica_set) { return false; } return has_local_replicas(self, delta.new_assignment.replicas) - && contains_node(self, *delta.previous_replica_set) + && contains_node(*delta.previous_replica_set, self) && !has_local_replicas(self, *delta.previous_replica_set); } @@ -219,7 +212,7 @@ std::error_code check_configuration_update( change_revision); return errc::partition_configuration_revision_not_updated; } - const bool includes_self = contains_node(self, bs); + const bool includes_self = contains_node(bs, self); /* * if configuration includes current node, we expect configuration to be @@ -326,10 +319,76 @@ void controller_backend::setup_metrics() { "Number of partitions with ongoing/requested operations")), }); } +/** + * Create snapshot of topic table that contains all ntp that + * must be presented on local disk storage and their revision + * This snapshot will help to define orphan topic files + * If topic is not presented in this snapshot or its revision + * its revision is less than in this snapshot then this topic + * directory is orphan + */ +static absl::flat_hash_map +create_topic_table_snapshot( + ss::sharded& topics, model::node_id current_node) { + absl::flat_hash_map snapshot; + + for (const auto& nt : topics.local().all_topics()) { + auto ntp_view = model::topic_namespace_view(nt); + auto ntp_meta = topics.local().get_topic_metadata(ntp_view); + if (!ntp_meta) { + continue; + } + for (const auto& p : ntp_meta->get_assignments()) { + auto ntp = model::ntp(nt.ns, nt.tp, p.id); + if (!ntp_meta->is_topic_replicable()) { + snapshot.emplace(ntp, 0); + continue; + } + auto revision_id = ntp_meta->get_revision(); + if (cluster::contains_node(p.replicas, current_node)) { + snapshot.emplace(ntp, revision_id); + continue; + } + auto target_replica_set = topics.local().get_target_replica_set( + ntp); + if ( + target_replica_set + && cluster::contains_node(*target_replica_set, current_node)) { + snapshot.emplace(ntp, revision_id); + continue; + } + auto previous_replica_set = topics.local().get_previous_replica_set( + ntp); + if ( + previous_replica_set + && cluster::contains_node(*previous_replica_set, current_node)) { + snapshot.emplace(ntp, revision_id); + continue; + } + } + } + return snapshot; +} ss::future<> controller_backend::start() { setup_metrics(); return bootstrap_controller_backend().then([this] { + if (ss::this_shard_id() == cluster::controller_stm_shard) { + auto bootstrap_revision = _topics.local().last_applied_revision(); + auto snapshot = create_topic_table_snapshot(_topics, _self); + ssx::spawn_with_gate( + _gate, + [this, bootstrap_revision, snapshot = std::move(snapshot)] { + return clear_orphan_topic_files( + bootstrap_revision, std::move(snapshot)) + .handle_exception([](const std::exception_ptr& err) { + vlog( + clusterlog.error, + "Exception while cleaning oprhan files {}", + err); + }); + }); + } start_topics_reconciliation_loop(); _housekeeping_timer.set_callback([this] { housekeeping(); }); _housekeeping_timer.arm(_housekeeping_timer_interval); @@ -585,6 +644,56 @@ ss::future<> controller_backend::fetch_deltas() { }); } +/** + * Topic files is qualified as orphan if we don't have it in topic table + * or it's revision is less than revision it topic table + * And it's revision is less than topic table snapshot revision + */ +bool topic_files_are_orphan( + const model::ntp& ntp, + storage::partition_path::metadata ntp_directory_data, + const absl::flat_hash_map& + topic_table_snapshot, + model::revision_id last_applied_revision) { + vlog(clusterlog.debug, "Checking topic files for ntp {} are orphan", ntp); + + if (ntp_directory_data.revision_id > last_applied_revision) { + return false; + } + if ( + topic_table_snapshot.contains(ntp) + && ntp_directory_data.revision_id + >= topic_table_snapshot.find(ntp)->second) { + return false; + } + return true; +} + +ss::future<> controller_backend::clear_orphan_topic_files( + model::revision_id bootstrap_revision, + absl::flat_hash_map topic_table_snapshot) { + vlog( + clusterlog.info, + "Cleaning up orphan topic files. bootstrap_revision: {}", + bootstrap_revision); + // Init with default namespace to clean if there is no topics + absl::flat_hash_set namespaces = {{model::kafka_namespace}}; + for (const auto& t : _topics.local().all_topics()) { + namespaces.emplace(t.ns); + } + + return _storage.local().log_mgr().remove_orphan_files( + _data_directory, + std::move(namespaces), + [&, + bootstrap_revision, + topic_table_snapshot = std::move(topic_table_snapshot)]( + model::ntp ntp, storage::partition_path::metadata p) { + return topic_files_are_orphan( + ntp, p, topic_table_snapshot, bootstrap_revision); + }); +} + void controller_backend::start_topics_reconciliation_loop() { ssx::spawn_with_gate(_gate, [this] { return ss::do_until( @@ -981,7 +1090,7 @@ controller_backend::process_partition_reconfiguration( * 1) shutdown partition instance * 2) create instance on target remote core */ - if (contains_node(_self, target_assignment.replicas)) { + if (contains_node(target_assignment.replicas, _self)) { co_return co_await shutdown_on_current_shard( std::move(ntp), command_rev); } @@ -1006,7 +1115,7 @@ controller_backend::process_partition_reconfiguration( // Wait fo the operation to be finished on one of the nodes co_return errc::waiting_for_reconfiguration_finish; } - const auto cross_core_move = contains_node(_self, previous_replicas) + const auto cross_core_move = contains_node(previous_replicas, _self) && !has_local_replicas( _self, previous_replicas); /** diff --git a/src/v/cluster/controller_backend.h b/src/v/cluster/controller_backend.h index 99b2686ff305b..8822e16bd8eee 100644 --- a/src/v/cluster/controller_backend.h +++ b/src/v/cluster/controller_backend.h @@ -270,6 +270,20 @@ class controller_backend // Topics ss::future<> bootstrap_controller_backend(); + /** + * Function that will clean orphan topic files on redpanda startup + * Orphan topic files is files that left on disk after some node + * manipullations and redpanda doesn't know about these files, so it is + * impossible to remove them with default approach. Currently we may leave + * orphan topic files when we restart redpanda while partition deletion + * operation was evaluating but hasn't finished yet. + * We assume that we can leave orphan files only on redpanda restart + * so we run clean process on startup, after bootstrap when redpanda + * already knows about all topics that it should containt on disk + **/ + ss::future<> clear_orphan_topic_files( + model::revision_id bootstrap_revision, + absl::flat_hash_map topic_table_snapshot); void start_topics_reconciliation_loop(); ss::future<> fetch_deltas(); diff --git a/src/v/storage/fs_utils.h b/src/v/storage/fs_utils.h index 2e5354dacaed8..50a4e40e3d147 100644 --- a/src/v/storage/fs_utils.h +++ b/src/v/storage/fs_utils.h @@ -67,6 +67,26 @@ class segment_full_path; class partition_path { public: + struct metadata { + model::partition_id partition_id; + model::revision_id revision_id; + }; + + /// Parse ntp directory name + static std::optional + parse_partition_directory(const ss::sstring& name) { + const std::regex re(R"(^(\d+)_(\d+)$)"); + std::cmatch match; + if (!std::regex_match(name.c_str(), match, re)) { + return std::nullopt; + } + return metadata{ + .partition_id = model::partition_id( + boost::lexical_cast(match[1].str())), + .revision_id = model::revision_id( + boost::lexical_cast(match[2].str()))}; + } + partition_path(const ntp_config& ntpc); operator std::filesystem::path() const { return make_path(); } diff --git a/src/v/storage/log_manager.cc b/src/v/storage/log_manager.cc index 8d91d478374af..cad7658a2453e 100644 --- a/src/v/storage/log_manager.cc +++ b/src/v/storage/log_manager.cc @@ -9,9 +9,12 @@ #include "storage/log_manager.h" +#include "cluster/cluster_utils.h" +#include "cluster/topic_table.h" #include "config/configuration.h" #include "likely.h" #include "model/fundamental.h" +#include "model/metadata.h" #include "model/timestamp.h" #include "resource_mgmt/io_priority.h" #include "ssx/async-clear.h" @@ -46,6 +49,7 @@ #include #include #include +#include #include #include @@ -430,6 +434,97 @@ ss::future<> log_manager::remove(model::ntp ntp) { co_await dispatch_topic_dir_deletion(topic_dir); } +ss::future<> remove_orphan_partition_files( + ss::sstring topic_directory_path, + model::topic_namespace nt, + ss::noncopyable_function& + orphan_filter) { + return directory_walker::walk( + topic_directory_path, + [topic_directory_path, nt, &orphan_filter]( + ss::directory_entry entry) -> ss::future<> { + auto ntp_directory_data = partition_path::parse_partition_directory( + entry.name); + if (!ntp_directory_data) { + return ss::now(); + } + + auto ntp = model::ntp(nt.ns, nt.tp, ntp_directory_data->partition_id); + if (orphan_filter(ntp, *ntp_directory_data)) { + auto ntp_directory = std::filesystem::path(topic_directory_path) + / std::filesystem::path(entry.name); + vlog(stlog.info, "Cleaning up ntp directory {} ", ntp_directory); + return ss::recursive_remove_directory(ntp_directory) + .handle_exception_type([ntp_directory]( + std::filesystem:: + filesystem_error const& err) { + vlog( + stlog.error, + "Exception while cleaning oprhan files for {} Error: {}", + ntp_directory, + err); + }); + } + return ss::now(); + }); +} + +ss::future<> log_manager::remove_orphan_files( + ss::sstring data_directory_path, + absl::flat_hash_set namespaces, + ss::noncopyable_function + orphan_filter) { + auto data_directory_exist = co_await ss::file_exists(data_directory_path); + if (!data_directory_exist) { + co_return; + } + + for (const auto& ns : namespaces) { + auto namespace_directory = std::filesystem::path(data_directory_path) + / std::filesystem::path(ss::sstring(ns)); + auto namespace_directory_exist = co_await ss::file_exists( + namespace_directory.string()); + if (!namespace_directory_exist) { + continue; + } + co_await directory_walker::walk( + namespace_directory.string(), + [this, &namespace_directory, &ns, &orphan_filter]( + ss::directory_entry entry) -> ss::future<> { + if (entry.type != ss::directory_entry_type::directory) { + return ss::now(); + } + auto topic_directory = namespace_directory + / std::filesystem::path(entry.name); + return remove_orphan_partition_files( + topic_directory.string(), + model::topic_namespace(ns, model::topic(entry.name)), + orphan_filter) + .then([this, topic_directory]() { + vlog( + stlog.info, + "Trying to clean up topic directory {} ", + topic_directory); + return dispatch_topic_dir_deletion( + topic_directory.string()); + }) + .handle_exception_type( + [](std::filesystem::filesystem_error const& err) { + vlog( + stlog.error, + "Exception while cleaning oprhan files {}", + err); + }); + }) + .handle_exception_type( + [](std::filesystem::filesystem_error const& err) { + vlog( + stlog.error, "Exception while cleaning oprhan files {}", err); + }); + } + co_return; +} + ss::future<> log_manager::dispatch_topic_dir_deletion(ss::sstring dir) { return ss::smp::submit_to( 0, diff --git a/src/v/storage/log_manager.h b/src/v/storage/log_manager.h index 37b966a89829c..244fcae3cf1b8 100644 --- a/src/v/storage/log_manager.h +++ b/src/v/storage/log_manager.h @@ -11,9 +11,11 @@ #pragma once +#include "cluster/topic_table.h" #include "config/property.h" #include "features/feature_table.h" #include "model/fundamental.h" +#include "model/metadata.h" #include "random/simple_time_jitter.h" #include "seastarx.h" #include "storage/batch_cache.h" @@ -174,6 +176,16 @@ class log_manager { * rebalancing partitions across the cluster, etc... */ ss::future<> remove(model::ntp); + /** + * This function walsk through entire directory structure in an async fiber + * to remove all orphan files in that directory. It checks if file is orphan + * with special orphan_filter + */ + ss::future<> remove_orphan_files( + ss::sstring data_directory_path, + absl::flat_hash_set namespaces, + ss::noncopyable_function + orphan_filter); ss::future<> stop(); diff --git a/tests/rptest/tests/topic_delete_test.py b/tests/rptest/tests/topic_delete_test.py index 33d98f7065d25..ca26cf03f1b6f 100644 --- a/tests/rptest/tests/topic_delete_test.py +++ b/tests/rptest/tests/topic_delete_test.py @@ -26,6 +26,7 @@ from rptest.services.redpanda import CloudStorageType, SISettings, get_cloud_storage_type from rptest.util import wait_for_local_storage_truncate, firewall_blocked from rptest.services.admin import Admin +from rptest.tests.partition_movement import PartitionMovementMixin def get_kvstore_topic_key_counts(redpanda): @@ -140,16 +141,23 @@ def __init__(self, test_context): self.kafka_tools = KafkaCliTools(self.redpanda) + def produce_until_partitions(self): + self.kafka_tools.produce(self.topic, 1024, 1024) + storage = self.redpanda.storage() + return len(list(storage.partitions("kafka", self.topic))) == 9 + + def dump_storage_listing(self): + for node in self.redpanda.nodes: + self.logger.error(f"Storage listing on {node.name}:") + for line in node.account.ssh_capture( + f"find {self.redpanda.DATA_DIR}"): + self.logger.error(line.strip()) + @cluster(num_nodes=3) @parametrize(with_restart=False) @parametrize(with_restart=True) def topic_delete_test(self, with_restart): - def produce_until_partitions(): - self.kafka_tools.produce(self.topic, 1024, 1024) - storage = self.redpanda.storage() - return len(list(storage.partitions("kafka", self.topic))) == 9 - - wait_until(lambda: produce_until_partitions(), + wait_until(lambda: self.produce_until_partitions(), timeout_sec=30, backoff_sec=2, err_msg="Expected partition did not materialize") @@ -173,16 +181,178 @@ def produce_until_partitions(): err_msg="Topic storage was not removed") except: - # On errors, dump listing of the storage location - for node in self.redpanda.nodes: - self.logger.error(f"Storage listing on {node.name}:") - for line in node.account.ssh_capture( - f"find {self.redpanda.DATA_DIR}"): - self.logger.error(line.strip()) + self.dump_storage_listing() + raise + + @cluster(num_nodes=3, log_allow_list=[r'filesystem error: remove failed']) + def topic_delete_orphan_files_test(self): + wait_until(lambda: self.produce_until_partitions(), + timeout_sec=30, + backoff_sec=2, + err_msg="Expected partition did not materialize") + + # Sanity check the kvstore checks: there should be at least one kvstore entry + # per partition while the topic exists. + assert sum(get_kvstore_topic_key_counts( + self.redpanda).values()) >= self.topics[0].partition_count + + down_node = self.redpanda.nodes[-1] + try: + # Make topic directory immutable to prevent deleting + down_node.account.ssh( + f"chattr +i {self.redpanda.DATA_DIR}/kafka/{self.topic}") + + self.kafka_tools.delete_topic(self.topic) + + def topic_deleted_on_all_nodes_except_one(redpanda, down_node, + topic_name): + storage = redpanda.storage() + log_not_removed_on_down = topic_name in next( + filter(lambda x: x.name == down_node.name, + storage.nodes)).ns["kafka"].topics + logs_removed_on_others = all( + map( + lambda n: topic_name not in n.ns["kafka"].topics, + filter(lambda x: x.name != down_node.name, + storage.nodes))) + return log_not_removed_on_down and logs_removed_on_others + + try: + wait_until( + lambda: topic_deleted_on_all_nodes_except_one( + self.redpanda, down_node, self.topic), + timeout_sec=30, + backoff_sec=2, + err_msg= + "Topic storage was not removed from running nodes or removed from down node" + ) + except: + self.dump_storage_listing() + raise + + self.redpanda.stop_node(down_node) + finally: + down_node.account.ssh( + f"chattr -i {self.redpanda.DATA_DIR}/kafka/{self.topic}") + self.redpanda.start_node(down_node) + + try: + wait_until(lambda: topic_storage_purged(self.redpanda, self.topic), + timeout_sec=10, + backoff_sec=2, + err_msg="Topic storage was not removed") + except: + self.dump_storage_listing() raise +class TopicDeleteAfterMovementTest(RedpandaTest): + """ + Verify that topic deleted after partition movement. + """ + partition_count = 3 + topics = (TopicSpec(partition_count=partition_count), ) + + def __init__(self, test_context): + super(TopicDeleteAfterMovementTest, + self).__init__(test_context=test_context, num_brokers=4) + + self.kafka_tools = KafkaCliTools(self.redpanda) + + def movement_done(self, partition, assignments): + results = [] + for n in self.redpanda._started: + info = self.admin.get_partitions(self.topic, partition, node=n) + self.logger.info( + f"current assignments for {self.topic}-{partition}: {info}") + converged = PartitionMovementMixin._equal_assignments( + info["replicas"], assignments) + results.append(converged and info["status"] == "done") + return all(results) + + def move_topic(self, assignments): + for partition in range(3): + + def get_nodes(partition): + return list(r['node_id'] for r in partition['replicas']) + + nodes_before = set( + get_nodes(self.admin.get_partitions(self.topic, partition))) + nodes_after = {r['node_id'] for r in assignments} + if nodes_before == nodes_after: + continue + self.admin.set_partition_replicas(self.topic, partition, + assignments) + + wait_until(lambda: self.movement_done(partition, assignments), + timeout_sec=60, + backoff_sec=2) + + @cluster(num_nodes=4, log_allow_list=[r'filesystem error: remove failed']) + def topic_delete_orphan_files_after_move_test(self): + + # Write out 10MB per partition + self.kafka_tools.produce(self.topic, + record_size=4096, + num_records=2560 * self.partition_count) + + self.admin = Admin(self.redpanda) + + # Move every partition to nodes 1,2,3 + assignments = [dict(node_id=n, core=0) for n in [1, 2, 3]] + self.move_topic(assignments) + + down_node = self.redpanda.nodes[0] + try: + # Make topic directory immutable to prevent deleting + down_node.account.ssh( + f"chattr +i {self.redpanda.DATA_DIR}/kafka/{self.topic}") + + # Move every partition from node 1 to node 4 + new_assignments = [dict(node_id=n, core=0) for n in [2, 3, 4]] + self.move_topic(new_assignments) + + def topic_exist_on_every_node(redpanda, topic_name): + storage = redpanda.storage() + exist_on_every = all( + map(lambda n: topic_name in n.ns["kafka"].topics, + storage.nodes)) + return exist_on_every + + wait_until( + lambda: topic_exist_on_every_node(self.redpanda, self.topic), + timeout_sec=30, + backoff_sec=2, + err_msg="Topic doesn't exist on some node") + + self.redpanda.stop_node(down_node) + finally: + down_node.account.ssh( + f"chattr -i {self.redpanda.DATA_DIR}/kafka/{self.topic}") + + self.redpanda.start_node(down_node) + + def topic_deleted_on_down_node_and_exist_on_others( + redpanda, down_node, topic_name): + storage = redpanda.storage() + log_removed_on_down = topic_name not in next( + filter(lambda x: x.name == down_node.name, + storage.nodes)).ns["kafka"].topics + logs_not_removed_on_others = all( + map(lambda n: topic_name in n.ns["kafka"].topics, + filter(lambda x: x.name != down_node.name, storage.nodes))) + return log_removed_on_down and logs_not_removed_on_others + + wait_until( + lambda: topic_deleted_on_down_node_and_exist_on_others( + self.redpanda, down_node, self.topic), + timeout_sec=30, + backoff_sec=2, + err_msg= + "Topic storage was not removed on down node or removed on other") + + class TopicDeleteCloudStorageTest(RedpandaTest): partition_count = 3 topics = (TopicSpec(partition_count=partition_count,