diff --git a/tests/rptest/tests/recovery_mode_test.py b/tests/rptest/tests/recovery_mode_test.py index 1f19bed39d89c..91b0348bf634a 100644 --- a/tests/rptest/tests/recovery_mode_test.py +++ b/tests/rptest/tests/recovery_mode_test.py @@ -8,6 +8,7 @@ # by the Apache License, Version 2.0 import tempfile +import dataclasses from ducktape.utils.util import wait_until @@ -15,6 +16,7 @@ from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST from rptest.services.cluster import cluster from rptest.clients.rpk import RpkTool, RpkException +from rptest.services.admin import Admin from rptest.services.rpk_producer import RpkProducer from rptest.util import wait_until_result @@ -196,3 +198,146 @@ def partitions_ready(): quiet=True).rstrip().split('\n') # check that group seek was successful assert len(consumed) == 2000 + + +@dataclasses.dataclass +class PartitionInfo: + ns: str + topic: str + partition_id: int + disabled: bool + + def from_json(json): + return PartitionInfo(**dict( + (f.name, json[f.name]) for f in dataclasses.fields(PartitionInfo))) + + +class DisablingPartitionsTest(RedpandaTest): + def __init__(self, *args, **kwargs): + super().__init__(*args, + num_brokers=4, + extra_rp_conf={"controller_snapshot_max_age_sec": 5}, + **kwargs) + + def sync(self): + admin = Admin(self.redpanda) + first = self.redpanda.nodes[0] + rest = self.redpanda.nodes[1:] + + def equal_everywhere(): + first_res = admin.get_cluster_partitions(node=first) + return all( + admin.get_cluster_partitions(node=n) == first_res + for n in rest) + + # give some time for controller updates to propagate + wait_until( + equal_everywhere, + timeout_sec=30, + backoff_sec=1, + err_msg="failed to wait for partitions metadata to equalize") + + @cluster(num_nodes=4) + def test_apis(self): + rpk = RpkTool(self.redpanda) + admin = Admin(self.redpanda) + + topics = ["mytopic1", "mytopic2", "mytopic3"] + for topic in topics: + rpk.create_topic(topic, partitions=3, replicas=3) + + admin.set_partitions_disabled(ns="kafka", topic="mytopic1") + + for p in [1, 2]: + admin.set_partitions_disabled(ns="kafka", + topic="mytopic2", + partition=p) + admin.set_partitions_disabled(ns="kafka", + topic="mytopic2", + partition=2, + value=False) + + admin.set_partitions_disabled(ns="kafka", topic="mytopic3") + admin.set_partitions_disabled(ns="kafka", + topic="mytopic3", + partition=1, + value=False) + + self.sync() + + def pi(topic_partition, disabled=False): + topic, partition = topic_partition.split('/') + return PartitionInfo('kafka', topic, int(partition), disabled) + + all_partitions = [ + pi('mytopic1/0', True), + pi('mytopic1/1', True), + pi('mytopic1/2', True), + pi('mytopic2/0', False), + pi('mytopic2/1', True), + pi('mytopic2/2', False), + pi('mytopic3/0', True), + pi('mytopic3/1', False), + pi('mytopic3/2', True), + ] + + def filtered(topic, partition, disabled): + def filter(p): + if topic is not None and p.topic != topic: + return False + if partition is not None and p.partition_id != partition: + return False + if disabled is not None and p.disabled != disabled: + return False + return True + + res = [p for p in all_partitions if filter(p)] + if partition is not None: + assert len(res) == 1 + res = res[0] + + return res + + def get(topic=None, partition=None, disabled=None): + if topic is None and partition is None: + ns = None + else: + ns = "kafka" + json = admin.get_cluster_partitions(ns=ns, + topic=topic, + partition=partition, + disabled=disabled) + if partition is None: + return [PartitionInfo.from_json(p) for p in json] + else: + return PartitionInfo.from_json(json) + + def check_everything(): + for topic in [None] + topics: + if topic is None: + partitions = [None] + else: + partitions = [None] + list(range(3)) + + for partition in partitions: + if partition is None: + disabled_list = [None, True, False] + else: + disabled_list = [None] + + for disabled in disabled_list: + filter = (topic, partition, disabled) + expected = filtered(*filter) + got = get(*filter) + self.logger.debug(f"{filter=} {got=} {expected=}") + assert got == expected + + check_everything() + + for n in self.redpanda.nodes: + self.redpanda.wait_for_controller_snapshot(n) + + self.redpanda.restart_nodes(self.redpanda.nodes) + self.redpanda.wait_for_membership(first_start=False) + + check_everything()