diff --git a/tests/rptest/tests/topic_delete_test.py b/tests/rptest/tests/topic_delete_test.py index a98524ad9bb2c..336360f59126b 100644 --- a/tests/rptest/tests/topic_delete_test.py +++ b/tests/rptest/tests/topic_delete_test.py @@ -26,6 +26,7 @@ from rptest.services.redpanda import CloudStorageType, SISettings from rptest.util import wait_for_local_storage_truncate, firewall_blocked from rptest.services.admin import Admin +from rptest.tests.partition_movement import PartitionMovementMixin def get_kvstore_topic_key_counts(redpanda): @@ -246,6 +247,112 @@ def topic_deleted_on_all_nodes_except_one(redpanda, down_node, raise +class TopicDeleteAfterMovementTest(RedpandaTest): + """ + Verify that topic deleted after partition movement. + """ + partition_count = 3 + topics = (TopicSpec(partition_count=partition_count), ) + + def __init__(self, test_context): + super(TopicDeleteAfterMovementTest, + self).__init__(test_context=test_context, num_brokers=4) + + self.kafka_tools = KafkaCliTools(self.redpanda) + + def movement_done(self, partition, assignments): + results = [] + for n in self.redpanda._started: + info = self.admin.get_partitions(self.topic, partition, node=n) + self.logger.info( + f"current assignments for {self.topic}-{partition}: {info}") + converged = PartitionMovementMixin._equal_assignments( + info["replicas"], assignments) + results.append(converged and info["status"] == "done") + return all(results) + + def move_topic(self, assignments): + for partition in range(3): + + def get_nodes(partition): + return list(r['node_id'] for r in partition['replicas']) + + nodes_before = set( + get_nodes(self.admin.get_partitions(self.topic, partition))) + nodes_after = {r['node_id'] for r in assignments} + if nodes_before == nodes_after: + continue + self.admin.set_partition_replicas(self.topic, partition, + assignments) + + wait_until(lambda: self.movement_done(partition, assignments), + timeout_sec=60, + backoff_sec=2) + + @cluster(num_nodes=4, log_allow_list=[r'filesystem error: remove failed']) + def topic_delete_orphan_files_after_move_test(self): + + # Write out 10MB per partition + self.kafka_tools.produce(self.topic, + record_size=4096, + num_records=2560 * self.partition_count) + + self.admin = Admin(self.redpanda) + + # Move every partition to nodes 1,2,3 + assignments = [dict(node_id=n, core=0) for n in [1, 2, 3]] + self.move_topic(assignments) + + down_node = self.redpanda.nodes[0] + try: + # Make topic directory immutable to prevent deleting + down_node.account.ssh( + f"chattr +i {self.redpanda.DATA_DIR}/kafka/{self.topic}") + + # Move every partition from node 1 to node 4 + new_assignments = [dict(node_id=n, core=0) for n in [2, 3, 4]] + self.move_topic(new_assignments) + + def topic_exist_on_every_node(redpanda, topic_name): + storage = redpanda.storage() + exist_on_every = all( + map(lambda n: topic_name in n.ns["kafka"].topics, + storage.nodes)) + return exist_on_every + + wait_until( + lambda: topic_exist_on_every_node(self.redpanda, self.topic), + timeout_sec=30, + backoff_sec=2, + err_msg="Topic doesn't exist on some node") + + self.redpanda.stop_node(down_node) + finally: + down_node.account.ssh( + f"chattr -i {self.redpanda.DATA_DIR}/kafka/{self.topic}") + + self.redpanda.start_node(down_node) + + def topic_deleted_on_down_node_and_exist_on_others( + redpanda, down_node, topic_name): + storage = redpanda.storage() + log_removed_on_down = topic_name not in next( + filter(lambda x: x.name == down_node.name, + storage.nodes)).ns["kafka"].topics + logs_not_removed_on_others = all( + map(lambda n: topic_name in n.ns["kafka"].topics, + filter(lambda x: x.name != down_node.name, storage.nodes))) + return log_removed_on_down and logs_not_removed_on_others + + wait_until( + lambda: topic_deleted_on_down_node_and_exist_on_others( + self.redpanda, down_node, self.topic), + timeout_sec=30, + backoff_sec=2, + err_msg= + "Topic storage was not removed on down node or removed on other") + + class TopicDeleteCloudStorageTest(RedpandaTest): partition_count = 3 topics = (TopicSpec(partition_count=partition_count,