diff --git a/tests/rptest/tests/topic_delete_test.py b/tests/rptest/tests/topic_delete_test.py index 10f30a88b00e1..a98524ad9bb2c 100644 --- a/tests/rptest/tests/topic_delete_test.py +++ b/tests/rptest/tests/topic_delete_test.py @@ -140,16 +140,23 @@ def __init__(self, test_context): self.kafka_tools = KafkaCliTools(self.redpanda) + def produce_until_partitions(self): + self.kafka_tools.produce(self.topic, 1024, 1024) + storage = self.redpanda.storage() + return len(list(storage.partitions("kafka", self.topic))) == 9 + + def dump_storage_listing(self): + for node in self.redpanda.nodes: + self.logger.error(f"Storage listing on {node.name}:") + for line in node.account.ssh_capture( + f"find {self.redpanda.DATA_DIR}"): + self.logger.error(line.strip()) + @cluster(num_nodes=3) @parametrize(with_restart=False) @parametrize(with_restart=True) def topic_delete_test(self, with_restart): - def produce_until_partitions(): - self.kafka_tools.produce(self.topic, 1024, 1024) - storage = self.redpanda.storage() - return len(list(storage.partitions("kafka", self.topic))) == 9 - - wait_until(lambda: produce_until_partitions(), + wait_until(lambda: self.produce_until_partitions(), timeout_sec=30, backoff_sec=2, err_msg="Expected partition did not materialize") @@ -173,13 +180,69 @@ def produce_until_partitions(): err_msg="Topic storage was not removed") except: - # On errors, dump listing of the storage location - for node in self.redpanda.nodes: - self.logger.error(f"Storage listing on {node.name}:") - for line in node.account.ssh_capture( - f"find {self.redpanda.DATA_DIR}"): - self.logger.error(line.strip()) + self.dump_storage_listing() + raise + + @cluster(num_nodes=3, log_allow_list=[r'filesystem error: remove failed']) + def topic_delete_orphan_files_test(self): + wait_until(lambda: self.produce_until_partitions(), + timeout_sec=30, + backoff_sec=2, + err_msg="Expected partition did not materialize") + + # Sanity check the kvstore checks: there should be at least one kvstore entry + # per partition while the topic exists. + assert sum(get_kvstore_topic_key_counts( + self.redpanda).values()) >= self.topics[0].partition_count + + down_node = self.redpanda.nodes[-1] + try: + # Make topic directory immutable to prevent deleting + down_node.account.ssh( + f"chattr +i {self.redpanda.DATA_DIR}/kafka/{self.topic}") + + self.kafka_tools.delete_topic(self.topic) + + def topic_deleted_on_all_nodes_except_one(redpanda, down_node, + topic_name): + storage = redpanda.storage() + log_not_removed_on_down = topic_name in next( + filter(lambda x: x.name == down_node.name, + storage.nodes)).ns["kafka"].topics + logs_removed_on_others = all( + map( + lambda n: topic_name not in n.ns["kafka"].topics, + filter(lambda x: x.name != down_node.name, + storage.nodes))) + return log_not_removed_on_down and logs_removed_on_others + + try: + wait_until( + lambda: topic_deleted_on_all_nodes_except_one( + self.redpanda, down_node, self.topic), + timeout_sec=30, + backoff_sec=2, + err_msg= + "Topic storage was not removed from running nodes or removed from down node" + ) + except: + self.dump_storage_listing() + raise + self.redpanda.stop_node(down_node) + finally: + down_node.account.ssh( + f"chattr -i {self.redpanda.DATA_DIR}/kafka/{self.topic}") + + self.redpanda.start_node(down_node) + + try: + wait_until(lambda: topic_storage_purged(self.redpanda, self.topic), + timeout_sec=10, + backoff_sec=2, + err_msg="Topic storage was not removed") + except: + self.dump_storage_listing() raise