From cefbef2fc2c40142537332e8695ec7030acfd92a Mon Sep 17 00:00:00 2001 From: ZeDRoman Date: Wed, 1 Feb 2023 14:09:22 +0300 Subject: [PATCH 1/4] cluster: refactor unify contains_node function --- src/v/cluster/controller_backend.cc | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/src/v/cluster/controller_backend.cc b/src/v/cluster/controller_backend.cc index 33fbc39f4ef35..d7a2b931eb859 100644 --- a/src/v/cluster/controller_backend.cc +++ b/src/v/cluster/controller_backend.cc @@ -56,20 +56,12 @@ namespace cluster { namespace { -inline bool contains_node( - model::node_id id, const std::vector& replicas) { - return std::any_of( - replicas.cbegin(), replicas.cend(), [id](const model::broker_shard& bs) { - return bs.node_id == id; - }); -} - bool is_cross_core_update(model::node_id self, const topic_table_delta& delta) { if (!delta.previous_replica_set) { return false; } return has_local_replicas(self, delta.new_assignment.replicas) - && contains_node(self, *delta.previous_replica_set) + && contains_node(*delta.previous_replica_set, self) && !has_local_replicas(self, *delta.previous_replica_set); } @@ -219,7 +211,7 @@ std::error_code check_configuration_update( change_revision); return errc::partition_configuration_revision_not_updated; } - const bool includes_self = contains_node(self, bs); + const bool includes_self = contains_node(bs, self); /* * if configuration includes current node, we expect configuration to be @@ -981,7 +973,7 @@ controller_backend::process_partition_reconfiguration( * 1) shutdown partition instance * 2) create instance on target remote core */ - if (contains_node(_self, target_assignment.replicas)) { + if (contains_node(target_assignment.replicas, _self)) { co_return co_await shutdown_on_current_shard( std::move(ntp), command_rev); } @@ -1006,7 +998,7 @@ controller_backend::process_partition_reconfiguration( // Wait fo the operation to be finished on one of the nodes co_return errc::waiting_for_reconfiguration_finish; } - const auto cross_core_move = contains_node(_self, previous_replicas) + const auto cross_core_move = contains_node(previous_replicas, _self) && !has_local_replicas( _self, previous_replicas); /** From 2862c324ca2513040a138e846e40763a5fb43c28 Mon Sep 17 00:00:00 2001 From: ZeDRoman Date: Mon, 6 Feb 2023 16:40:02 +0300 Subject: [PATCH 2/4] controller: clean up topic orphan files When redpanda is restarted while delete operation is not finish Partition files might be left on disk. We need to cleanup orphan partition files after bootstrap. --- src/v/cluster/controller_backend.cc | 117 ++++++++++++++++++++++++++++ src/v/cluster/controller_backend.h | 14 ++++ src/v/storage/fs_utils.h | 20 +++++ src/v/storage/log_manager.cc | 95 ++++++++++++++++++++++ src/v/storage/log_manager.h | 12 +++ 5 files changed, 258 insertions(+) diff --git a/src/v/cluster/controller_backend.cc b/src/v/cluster/controller_backend.cc index d7a2b931eb859..3b436585dcf5d 100644 --- a/src/v/cluster/controller_backend.cc +++ b/src/v/cluster/controller_backend.cc @@ -43,6 +43,7 @@ #include #include +#include #include #include @@ -318,10 +319,76 @@ void controller_backend::setup_metrics() { "Number of partitions with ongoing/requested operations")), }); } +/** + * Create snapshot of topic table that contains all ntp that + * must be presented on local disk storage and their revision + * This snapshot will help to define orphan topic files + * If topic is not presented in this snapshot or its revision + * its revision is less than in this snapshot then this topic + * directory is orphan + */ +static absl::flat_hash_map +create_topic_table_snapshot( + ss::sharded& topics, model::node_id current_node) { + absl::flat_hash_map snapshot; + + for (const auto& nt : topics.local().all_topics()) { + auto ntp_view = model::topic_namespace_view(nt); + auto ntp_meta = topics.local().get_topic_metadata(ntp_view); + if (!ntp_meta) { + continue; + } + for (const auto& p : ntp_meta->get_assignments()) { + auto ntp = model::ntp(nt.ns, nt.tp, p.id); + if (!ntp_meta->is_topic_replicable()) { + snapshot.emplace(ntp, 0); + continue; + } + auto revision_id = ntp_meta->get_revision(); + if (cluster::contains_node(p.replicas, current_node)) { + snapshot.emplace(ntp, revision_id); + continue; + } + auto target_replica_set = topics.local().get_target_replica_set( + ntp); + if ( + target_replica_set + && cluster::contains_node(*target_replica_set, current_node)) { + snapshot.emplace(ntp, revision_id); + continue; + } + auto previous_replica_set = topics.local().get_previous_replica_set( + ntp); + if ( + previous_replica_set + && cluster::contains_node(*previous_replica_set, current_node)) { + snapshot.emplace(ntp, revision_id); + continue; + } + } + } + return snapshot; +} ss::future<> controller_backend::start() { setup_metrics(); return bootstrap_controller_backend().then([this] { + if (ss::this_shard_id() == cluster::controller_stm_shard) { + auto bootstrap_revision = _topics.local().last_applied_revision(); + auto snapshot = create_topic_table_snapshot(_topics, _self); + ssx::spawn_with_gate( + _gate, + [this, bootstrap_revision, snapshot = std::move(snapshot)] { + return clear_orphan_topic_files( + bootstrap_revision, std::move(snapshot)) + .handle_exception([](const std::exception_ptr& err) { + vlog( + clusterlog.error, + "Exception while cleaning oprhan files {}", + err); + }); + }); + } start_topics_reconciliation_loop(); _housekeeping_timer.set_callback([this] { housekeeping(); }); _housekeeping_timer.arm(_housekeeping_timer_interval); @@ -577,6 +644,56 @@ ss::future<> controller_backend::fetch_deltas() { }); } +/** + * Topic files is qualified as orphan if we don't have it in topic table + * or it's revision is less than revision it topic table + * And it's revision is less than topic table snapshot revision + */ +bool topic_files_are_orphan( + const model::ntp& ntp, + storage::partition_path::metadata ntp_directory_data, + const absl::flat_hash_map& + topic_table_snapshot, + model::revision_id last_applied_revision) { + vlog(clusterlog.debug, "Checking topic files for ntp {} are orphan", ntp); + + if (ntp_directory_data.revision_id > last_applied_revision) { + return false; + } + if ( + topic_table_snapshot.contains(ntp) + && ntp_directory_data.revision_id + >= topic_table_snapshot.find(ntp)->second) { + return false; + } + return true; +} + +ss::future<> controller_backend::clear_orphan_topic_files( + model::revision_id bootstrap_revision, + absl::flat_hash_map topic_table_snapshot) { + vlog( + clusterlog.info, + "Cleaning up orphan topic files. bootstrap_revision: {}", + bootstrap_revision); + // Init with default namespace to clean if there is no topics + absl::flat_hash_set namespaces = {{model::kafka_namespace}}; + for (const auto& t : _topics.local().all_topics()) { + namespaces.emplace(t.ns); + } + + return _storage.local().log_mgr().remove_orphan_files( + _data_directory, + std::move(namespaces), + [&, + bootstrap_revision, + topic_table_snapshot = std::move(topic_table_snapshot)]( + model::ntp ntp, storage::partition_path::metadata p) { + return topic_files_are_orphan( + ntp, p, topic_table_snapshot, bootstrap_revision); + }); +} + void controller_backend::start_topics_reconciliation_loop() { ssx::spawn_with_gate(_gate, [this] { return ss::do_until( diff --git a/src/v/cluster/controller_backend.h b/src/v/cluster/controller_backend.h index 99b2686ff305b..8822e16bd8eee 100644 --- a/src/v/cluster/controller_backend.h +++ b/src/v/cluster/controller_backend.h @@ -270,6 +270,20 @@ class controller_backend // Topics ss::future<> bootstrap_controller_backend(); + /** + * Function that will clean orphan topic files on redpanda startup + * Orphan topic files is files that left on disk after some node + * manipullations and redpanda doesn't know about these files, so it is + * impossible to remove them with default approach. Currently we may leave + * orphan topic files when we restart redpanda while partition deletion + * operation was evaluating but hasn't finished yet. + * We assume that we can leave orphan files only on redpanda restart + * so we run clean process on startup, after bootstrap when redpanda + * already knows about all topics that it should containt on disk + **/ + ss::future<> clear_orphan_topic_files( + model::revision_id bootstrap_revision, + absl::flat_hash_map topic_table_snapshot); void start_topics_reconciliation_loop(); ss::future<> fetch_deltas(); diff --git a/src/v/storage/fs_utils.h b/src/v/storage/fs_utils.h index 2e5354dacaed8..50a4e40e3d147 100644 --- a/src/v/storage/fs_utils.h +++ b/src/v/storage/fs_utils.h @@ -67,6 +67,26 @@ class segment_full_path; class partition_path { public: + struct metadata { + model::partition_id partition_id; + model::revision_id revision_id; + }; + + /// Parse ntp directory name + static std::optional + parse_partition_directory(const ss::sstring& name) { + const std::regex re(R"(^(\d+)_(\d+)$)"); + std::cmatch match; + if (!std::regex_match(name.c_str(), match, re)) { + return std::nullopt; + } + return metadata{ + .partition_id = model::partition_id( + boost::lexical_cast(match[1].str())), + .revision_id = model::revision_id( + boost::lexical_cast(match[2].str()))}; + } + partition_path(const ntp_config& ntpc); operator std::filesystem::path() const { return make_path(); } diff --git a/src/v/storage/log_manager.cc b/src/v/storage/log_manager.cc index 8d91d478374af..cad7658a2453e 100644 --- a/src/v/storage/log_manager.cc +++ b/src/v/storage/log_manager.cc @@ -9,9 +9,12 @@ #include "storage/log_manager.h" +#include "cluster/cluster_utils.h" +#include "cluster/topic_table.h" #include "config/configuration.h" #include "likely.h" #include "model/fundamental.h" +#include "model/metadata.h" #include "model/timestamp.h" #include "resource_mgmt/io_priority.h" #include "ssx/async-clear.h" @@ -46,6 +49,7 @@ #include #include #include +#include #include #include @@ -430,6 +434,97 @@ ss::future<> log_manager::remove(model::ntp ntp) { co_await dispatch_topic_dir_deletion(topic_dir); } +ss::future<> remove_orphan_partition_files( + ss::sstring topic_directory_path, + model::topic_namespace nt, + ss::noncopyable_function& + orphan_filter) { + return directory_walker::walk( + topic_directory_path, + [topic_directory_path, nt, &orphan_filter]( + ss::directory_entry entry) -> ss::future<> { + auto ntp_directory_data = partition_path::parse_partition_directory( + entry.name); + if (!ntp_directory_data) { + return ss::now(); + } + + auto ntp = model::ntp(nt.ns, nt.tp, ntp_directory_data->partition_id); + if (orphan_filter(ntp, *ntp_directory_data)) { + auto ntp_directory = std::filesystem::path(topic_directory_path) + / std::filesystem::path(entry.name); + vlog(stlog.info, "Cleaning up ntp directory {} ", ntp_directory); + return ss::recursive_remove_directory(ntp_directory) + .handle_exception_type([ntp_directory]( + std::filesystem:: + filesystem_error const& err) { + vlog( + stlog.error, + "Exception while cleaning oprhan files for {} Error: {}", + ntp_directory, + err); + }); + } + return ss::now(); + }); +} + +ss::future<> log_manager::remove_orphan_files( + ss::sstring data_directory_path, + absl::flat_hash_set namespaces, + ss::noncopyable_function + orphan_filter) { + auto data_directory_exist = co_await ss::file_exists(data_directory_path); + if (!data_directory_exist) { + co_return; + } + + for (const auto& ns : namespaces) { + auto namespace_directory = std::filesystem::path(data_directory_path) + / std::filesystem::path(ss::sstring(ns)); + auto namespace_directory_exist = co_await ss::file_exists( + namespace_directory.string()); + if (!namespace_directory_exist) { + continue; + } + co_await directory_walker::walk( + namespace_directory.string(), + [this, &namespace_directory, &ns, &orphan_filter]( + ss::directory_entry entry) -> ss::future<> { + if (entry.type != ss::directory_entry_type::directory) { + return ss::now(); + } + auto topic_directory = namespace_directory + / std::filesystem::path(entry.name); + return remove_orphan_partition_files( + topic_directory.string(), + model::topic_namespace(ns, model::topic(entry.name)), + orphan_filter) + .then([this, topic_directory]() { + vlog( + stlog.info, + "Trying to clean up topic directory {} ", + topic_directory); + return dispatch_topic_dir_deletion( + topic_directory.string()); + }) + .handle_exception_type( + [](std::filesystem::filesystem_error const& err) { + vlog( + stlog.error, + "Exception while cleaning oprhan files {}", + err); + }); + }) + .handle_exception_type( + [](std::filesystem::filesystem_error const& err) { + vlog( + stlog.error, "Exception while cleaning oprhan files {}", err); + }); + } + co_return; +} + ss::future<> log_manager::dispatch_topic_dir_deletion(ss::sstring dir) { return ss::smp::submit_to( 0, diff --git a/src/v/storage/log_manager.h b/src/v/storage/log_manager.h index 37b966a89829c..244fcae3cf1b8 100644 --- a/src/v/storage/log_manager.h +++ b/src/v/storage/log_manager.h @@ -11,9 +11,11 @@ #pragma once +#include "cluster/topic_table.h" #include "config/property.h" #include "features/feature_table.h" #include "model/fundamental.h" +#include "model/metadata.h" #include "random/simple_time_jitter.h" #include "seastarx.h" #include "storage/batch_cache.h" @@ -174,6 +176,16 @@ class log_manager { * rebalancing partitions across the cluster, etc... */ ss::future<> remove(model::ntp); + /** + * This function walsk through entire directory structure in an async fiber + * to remove all orphan files in that directory. It checks if file is orphan + * with special orphan_filter + */ + ss::future<> remove_orphan_files( + ss::sstring data_directory_path, + absl::flat_hash_set namespaces, + ss::noncopyable_function + orphan_filter); ss::future<> stop(); From ab214edadeffd650ff4bac34dc49fe179208b68d Mon Sep 17 00:00:00 2001 From: ZeDRoman Date: Mon, 6 Feb 2023 16:40:36 +0300 Subject: [PATCH 3/4] ducktape: test orphan files after delete topic --- tests/rptest/tests/topic_delete_test.py | 87 +++++++++++++++++++++---- 1 file changed, 75 insertions(+), 12 deletions(-) diff --git a/tests/rptest/tests/topic_delete_test.py b/tests/rptest/tests/topic_delete_test.py index 33d98f7065d25..a82c2f1748b90 100644 --- a/tests/rptest/tests/topic_delete_test.py +++ b/tests/rptest/tests/topic_delete_test.py @@ -140,16 +140,23 @@ def __init__(self, test_context): self.kafka_tools = KafkaCliTools(self.redpanda) + def produce_until_partitions(self): + self.kafka_tools.produce(self.topic, 1024, 1024) + storage = self.redpanda.storage() + return len(list(storage.partitions("kafka", self.topic))) == 9 + + def dump_storage_listing(self): + for node in self.redpanda.nodes: + self.logger.error(f"Storage listing on {node.name}:") + for line in node.account.ssh_capture( + f"find {self.redpanda.DATA_DIR}"): + self.logger.error(line.strip()) + @cluster(num_nodes=3) @parametrize(with_restart=False) @parametrize(with_restart=True) def topic_delete_test(self, with_restart): - def produce_until_partitions(): - self.kafka_tools.produce(self.topic, 1024, 1024) - storage = self.redpanda.storage() - return len(list(storage.partitions("kafka", self.topic))) == 9 - - wait_until(lambda: produce_until_partitions(), + wait_until(lambda: self.produce_until_partitions(), timeout_sec=30, backoff_sec=2, err_msg="Expected partition did not materialize") @@ -173,13 +180,69 @@ def produce_until_partitions(): err_msg="Topic storage was not removed") except: - # On errors, dump listing of the storage location - for node in self.redpanda.nodes: - self.logger.error(f"Storage listing on {node.name}:") - for line in node.account.ssh_capture( - f"find {self.redpanda.DATA_DIR}"): - self.logger.error(line.strip()) + self.dump_storage_listing() + raise + + @cluster(num_nodes=3, log_allow_list=[r'filesystem error: remove failed']) + def topic_delete_orphan_files_test(self): + wait_until(lambda: self.produce_until_partitions(), + timeout_sec=30, + backoff_sec=2, + err_msg="Expected partition did not materialize") + + # Sanity check the kvstore checks: there should be at least one kvstore entry + # per partition while the topic exists. + assert sum(get_kvstore_topic_key_counts( + self.redpanda).values()) >= self.topics[0].partition_count + + down_node = self.redpanda.nodes[-1] + try: + # Make topic directory immutable to prevent deleting + down_node.account.ssh( + f"chattr +i {self.redpanda.DATA_DIR}/kafka/{self.topic}") + + self.kafka_tools.delete_topic(self.topic) + + def topic_deleted_on_all_nodes_except_one(redpanda, down_node, + topic_name): + storage = redpanda.storage() + log_not_removed_on_down = topic_name in next( + filter(lambda x: x.name == down_node.name, + storage.nodes)).ns["kafka"].topics + logs_removed_on_others = all( + map( + lambda n: topic_name not in n.ns["kafka"].topics, + filter(lambda x: x.name != down_node.name, + storage.nodes))) + return log_not_removed_on_down and logs_removed_on_others + + try: + wait_until( + lambda: topic_deleted_on_all_nodes_except_one( + self.redpanda, down_node, self.topic), + timeout_sec=30, + backoff_sec=2, + err_msg= + "Topic storage was not removed from running nodes or removed from down node" + ) + except: + self.dump_storage_listing() + raise + self.redpanda.stop_node(down_node) + finally: + down_node.account.ssh( + f"chattr -i {self.redpanda.DATA_DIR}/kafka/{self.topic}") + + self.redpanda.start_node(down_node) + + try: + wait_until(lambda: topic_storage_purged(self.redpanda, self.topic), + timeout_sec=10, + backoff_sec=2, + err_msg="Topic storage was not removed") + except: + self.dump_storage_listing() raise From 2eb0976dc05864267756292b03503b5d2ed65fd6 Mon Sep 17 00:00:00 2001 From: ZeDRoman Date: Mon, 6 Feb 2023 16:42:59 +0300 Subject: [PATCH 4/4] ducktape: test orphan files after partition movement --- tests/rptest/tests/topic_delete_test.py | 107 ++++++++++++++++++++++++ 1 file changed, 107 insertions(+) diff --git a/tests/rptest/tests/topic_delete_test.py b/tests/rptest/tests/topic_delete_test.py index a82c2f1748b90..ca26cf03f1b6f 100644 --- a/tests/rptest/tests/topic_delete_test.py +++ b/tests/rptest/tests/topic_delete_test.py @@ -26,6 +26,7 @@ from rptest.services.redpanda import CloudStorageType, SISettings, get_cloud_storage_type from rptest.util import wait_for_local_storage_truncate, firewall_blocked from rptest.services.admin import Admin +from rptest.tests.partition_movement import PartitionMovementMixin def get_kvstore_topic_key_counts(redpanda): @@ -246,6 +247,112 @@ def topic_deleted_on_all_nodes_except_one(redpanda, down_node, raise +class TopicDeleteAfterMovementTest(RedpandaTest): + """ + Verify that topic deleted after partition movement. + """ + partition_count = 3 + topics = (TopicSpec(partition_count=partition_count), ) + + def __init__(self, test_context): + super(TopicDeleteAfterMovementTest, + self).__init__(test_context=test_context, num_brokers=4) + + self.kafka_tools = KafkaCliTools(self.redpanda) + + def movement_done(self, partition, assignments): + results = [] + for n in self.redpanda._started: + info = self.admin.get_partitions(self.topic, partition, node=n) + self.logger.info( + f"current assignments for {self.topic}-{partition}: {info}") + converged = PartitionMovementMixin._equal_assignments( + info["replicas"], assignments) + results.append(converged and info["status"] == "done") + return all(results) + + def move_topic(self, assignments): + for partition in range(3): + + def get_nodes(partition): + return list(r['node_id'] for r in partition['replicas']) + + nodes_before = set( + get_nodes(self.admin.get_partitions(self.topic, partition))) + nodes_after = {r['node_id'] for r in assignments} + if nodes_before == nodes_after: + continue + self.admin.set_partition_replicas(self.topic, partition, + assignments) + + wait_until(lambda: self.movement_done(partition, assignments), + timeout_sec=60, + backoff_sec=2) + + @cluster(num_nodes=4, log_allow_list=[r'filesystem error: remove failed']) + def topic_delete_orphan_files_after_move_test(self): + + # Write out 10MB per partition + self.kafka_tools.produce(self.topic, + record_size=4096, + num_records=2560 * self.partition_count) + + self.admin = Admin(self.redpanda) + + # Move every partition to nodes 1,2,3 + assignments = [dict(node_id=n, core=0) for n in [1, 2, 3]] + self.move_topic(assignments) + + down_node = self.redpanda.nodes[0] + try: + # Make topic directory immutable to prevent deleting + down_node.account.ssh( + f"chattr +i {self.redpanda.DATA_DIR}/kafka/{self.topic}") + + # Move every partition from node 1 to node 4 + new_assignments = [dict(node_id=n, core=0) for n in [2, 3, 4]] + self.move_topic(new_assignments) + + def topic_exist_on_every_node(redpanda, topic_name): + storage = redpanda.storage() + exist_on_every = all( + map(lambda n: topic_name in n.ns["kafka"].topics, + storage.nodes)) + return exist_on_every + + wait_until( + lambda: topic_exist_on_every_node(self.redpanda, self.topic), + timeout_sec=30, + backoff_sec=2, + err_msg="Topic doesn't exist on some node") + + self.redpanda.stop_node(down_node) + finally: + down_node.account.ssh( + f"chattr -i {self.redpanda.DATA_DIR}/kafka/{self.topic}") + + self.redpanda.start_node(down_node) + + def topic_deleted_on_down_node_and_exist_on_others( + redpanda, down_node, topic_name): + storage = redpanda.storage() + log_removed_on_down = topic_name not in next( + filter(lambda x: x.name == down_node.name, + storage.nodes)).ns["kafka"].topics + logs_not_removed_on_others = all( + map(lambda n: topic_name in n.ns["kafka"].topics, + filter(lambda x: x.name != down_node.name, storage.nodes))) + return log_removed_on_down and logs_not_removed_on_others + + wait_until( + lambda: topic_deleted_on_down_node_and_exist_on_others( + self.redpanda, down_node, self.topic), + timeout_sec=30, + backoff_sec=2, + err_msg= + "Topic storage was not removed on down node or removed on other") + + class TopicDeleteCloudStorageTest(RedpandaTest): partition_count = 3 topics = (TopicSpec(partition_count=partition_count,