Skip to content

Commit

Permalink
ducktape: test orphan files after partition movement
Browse files Browse the repository at this point in the history
  • Loading branch information
ZeDRoman committed Mar 22, 2023
1 parent 6c0fbca commit e6facfa
Showing 1 changed file with 107 additions and 0 deletions.
107 changes: 107 additions & 0 deletions tests/rptest/tests/topic_delete_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from rptest.services.redpanda import CloudStorageType, SISettings
from rptest.util import wait_for_local_storage_truncate, firewall_blocked
from rptest.services.admin import Admin
from rptest.tests.partition_movement import PartitionMovementMixin


def get_kvstore_topic_key_counts(redpanda):
Expand Down Expand Up @@ -246,6 +247,112 @@ def topic_deleted_on_all_nodes_except_one(redpanda, down_node,
raise


class TopicDeleteAfterMovementTest(RedpandaTest):
"""
Verify that topic deleted after partition movement.
"""
partition_count = 3
topics = (TopicSpec(partition_count=partition_count), )

def __init__(self, test_context):
super(TopicDeleteAfterMovementTest,
self).__init__(test_context=test_context, num_brokers=4)

self.kafka_tools = KafkaCliTools(self.redpanda)

def movement_done(self, partition, assignments):
results = []
for n in self.redpanda._started:
info = self.admin.get_partitions(self.topic, partition, node=n)
self.logger.info(
f"current assignments for {self.topic}-{partition}: {info}")
converged = PartitionMovementMixin._equal_assignments(
info["replicas"], assignments)
results.append(converged and info["status"] == "done")
return all(results)

def move_topic(self, assignments):
for partition in range(3):

def get_nodes(partition):
return list(r['node_id'] for r in partition['replicas'])

nodes_before = set(
get_nodes(self.admin.get_partitions(self.topic, partition)))
nodes_after = {r['node_id'] for r in assignments}
if nodes_before == nodes_after:
continue
self.admin.set_partition_replicas(self.topic, partition,
assignments)

wait_until(lambda: self.movement_done(partition, assignments),
timeout_sec=60,
backoff_sec=2)

@cluster(num_nodes=4, log_allow_list=[r'filesystem error: remove failed'])
def topic_delete_orphan_files_after_move_test(self):

# Write out 10MB per partition
self.kafka_tools.produce(self.topic,
record_size=4096,
num_records=2560 * self.partition_count)

self.admin = Admin(self.redpanda)

# Move every partition to nodes 1,2,3
assignments = [dict(node_id=n, core=0) for n in [1, 2, 3]]
self.move_topic(assignments)

down_node = self.redpanda.nodes[0]
try:
# Make topic directory immutable to prevent deleting
down_node.account.ssh(
f"chattr +i {self.redpanda.DATA_DIR}/kafka/{self.topic}")

# Move every partition from node 1 to node 4
new_assignments = [dict(node_id=n, core=0) for n in [2, 3, 4]]
self.move_topic(new_assignments)

def topic_exist_on_every_node(redpanda, topic_name):
storage = redpanda.storage()
exist_on_every = all(
map(lambda n: topic_name in n.ns["kafka"].topics,
storage.nodes))
return exist_on_every

wait_until(
lambda: topic_exist_on_every_node(self.redpanda, self.topic),
timeout_sec=30,
backoff_sec=2,
err_msg="Topic doesn't exist on some node")

self.redpanda.stop_node(down_node)
finally:
down_node.account.ssh(
f"chattr -i {self.redpanda.DATA_DIR}/kafka/{self.topic}")

self.redpanda.start_node(down_node)

def topic_deleted_on_down_node_and_exist_on_others(
redpanda, down_node, topic_name):
storage = redpanda.storage()
log_removed_on_down = topic_name not in next(
filter(lambda x: x.name == down_node.name,
storage.nodes)).ns["kafka"].topics
logs_not_removed_on_others = all(
map(lambda n: topic_name in n.ns["kafka"].topics,
filter(lambda x: x.name != down_node.name, storage.nodes)))
return log_removed_on_down and logs_not_removed_on_others

wait_until(
lambda: topic_deleted_on_down_node_and_exist_on_others(
self.redpanda, down_node, self.topic),
timeout_sec=30,
backoff_sec=2,
err_msg=
"Topic storage was not removed on down node or removed on other")


class TopicDeleteCloudStorageTest(RedpandaTest):
partition_count = 3
topics = (TopicSpec(partition_count=partition_count,
Expand Down

0 comments on commit e6facfa

Please sign in to comment.