Skip to content

Commit

Permalink
ducktape: test orphan files after delete topic
Browse files Browse the repository at this point in the history
  • Loading branch information
ZeDRoman committed Mar 22, 2023
1 parent 6f9be07 commit 6c0fbca
Showing 1 changed file with 75 additions and 12 deletions.
87 changes: 75 additions & 12 deletions tests/rptest/tests/topic_delete_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,23 @@ def __init__(self, test_context):

self.kafka_tools = KafkaCliTools(self.redpanda)

def produce_until_partitions(self):
self.kafka_tools.produce(self.topic, 1024, 1024)
storage = self.redpanda.storage()
return len(list(storage.partitions("kafka", self.topic))) == 9

def dump_storage_listing(self):
for node in self.redpanda.nodes:
self.logger.error(f"Storage listing on {node.name}:")
for line in node.account.ssh_capture(
f"find {self.redpanda.DATA_DIR}"):
self.logger.error(line.strip())

@cluster(num_nodes=3)
@parametrize(with_restart=False)
@parametrize(with_restart=True)
def topic_delete_test(self, with_restart):
def produce_until_partitions():
self.kafka_tools.produce(self.topic, 1024, 1024)
storage = self.redpanda.storage()
return len(list(storage.partitions("kafka", self.topic))) == 9

wait_until(lambda: produce_until_partitions(),
wait_until(lambda: self.produce_until_partitions(),
timeout_sec=30,
backoff_sec=2,
err_msg="Expected partition did not materialize")
Expand All @@ -173,13 +180,69 @@ def produce_until_partitions():
err_msg="Topic storage was not removed")

except:
# On errors, dump listing of the storage location
for node in self.redpanda.nodes:
self.logger.error(f"Storage listing on {node.name}:")
for line in node.account.ssh_capture(
f"find {self.redpanda.DATA_DIR}"):
self.logger.error(line.strip())
self.dump_storage_listing()
raise

@cluster(num_nodes=3, log_allow_list=[r'filesystem error: remove failed'])
def topic_delete_orphan_files_test(self):
wait_until(lambda: self.produce_until_partitions(),
timeout_sec=30,
backoff_sec=2,
err_msg="Expected partition did not materialize")

# Sanity check the kvstore checks: there should be at least one kvstore entry
# per partition while the topic exists.
assert sum(get_kvstore_topic_key_counts(
self.redpanda).values()) >= self.topics[0].partition_count

down_node = self.redpanda.nodes[-1]
try:
# Make topic directory immutable to prevent deleting
down_node.account.ssh(
f"chattr +i {self.redpanda.DATA_DIR}/kafka/{self.topic}")

self.kafka_tools.delete_topic(self.topic)

def topic_deleted_on_all_nodes_except_one(redpanda, down_node,
topic_name):
storage = redpanda.storage()
log_not_removed_on_down = topic_name in next(
filter(lambda x: x.name == down_node.name,
storage.nodes)).ns["kafka"].topics
logs_removed_on_others = all(
map(
lambda n: topic_name not in n.ns["kafka"].topics,
filter(lambda x: x.name != down_node.name,
storage.nodes)))
return log_not_removed_on_down and logs_removed_on_others

try:
wait_until(
lambda: topic_deleted_on_all_nodes_except_one(
self.redpanda, down_node, self.topic),
timeout_sec=30,
backoff_sec=2,
err_msg=
"Topic storage was not removed from running nodes or removed from down node"
)
except:
self.dump_storage_listing()
raise

self.redpanda.stop_node(down_node)
finally:
down_node.account.ssh(
f"chattr -i {self.redpanda.DATA_DIR}/kafka/{self.topic}")

self.redpanda.start_node(down_node)

try:
wait_until(lambda: topic_storage_purged(self.redpanda, self.topic),
timeout_sec=10,
backoff_sec=2,
err_msg="Topic storage was not removed")
except:
self.dump_storage_listing()
raise


Expand Down

0 comments on commit 6c0fbca

Please sign in to comment.