Skip to content

Commit

Permalink
tests: add disabling partitions APIs test
Browse files Browse the repository at this point in the history
  • Loading branch information
ztlpn committed Nov 3, 2023
1 parent 796cb35 commit 0451ed9
Showing 1 changed file with 146 additions and 0 deletions.
146 changes: 146 additions & 0 deletions tests/rptest/tests/recovery_mode_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
# by the Apache License, Version 2.0

import tempfile
import dataclasses

from ducktape.utils.util import wait_until

from rptest.tests.redpanda_test import RedpandaTest
from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST
from rptest.services.cluster import cluster
from rptest.clients.rpk import RpkTool, RpkException
from rptest.services.admin import Admin
from rptest.services.rpk_producer import RpkProducer
from rptest.util import wait_until_result

Expand Down Expand Up @@ -196,3 +198,147 @@ def partitions_ready():
quiet=True).rstrip().split('\n')
# check that group seek was successful
assert len(consumed) == 2000


@dataclasses.dataclass
class PartitionInfo:
ns: str
topic: str
partition_id: int
disabled: bool

def from_json(json):
return PartitionInfo(**dict(
(f.name, json[f.name]) for f in dataclasses.fields(PartitionInfo)))


class DisablingPartitionsTest(RedpandaTest):
def __init__(self, *args, **kwargs):
super().__init__(*args,
num_brokers=4,
extra_rp_conf={"controller_snapshot_max_age_sec": 5},
**kwargs)

def sync(self):
admin = Admin(self.redpanda)
first = self.redpanda.nodes[0]
rest = self.redpanda.nodes[1:]

def equal_everywhere():
first_res = admin.get_cluster_partitions(node=first)
return all(
admin.get_cluster_partitions(node=n) == first_res
for n in rest)

# give some time for controller updates to propagate
wait_until(
equal_everywhere,
timeout_sec=30,
backoff_sec=1,
err_msg="failed to wait for partitions metadata to equalize")

@cluster(num_nodes=4)
def test_apis(self):
rpk = RpkTool(self.redpanda)
admin = Admin(self.redpanda)

topics = ["mytopic1", "mytopic2", "mytopic3"]
for topic in topics:
rpk.create_topic(topic, partitions=3, replicas=3)

admin.set_partitions_disabled(ns="kafka", topic="mytopic1")

for p in [1, 2]:
admin.set_partitions_disabled(ns="kafka",
topic="mytopic2",
partition=p)
admin.set_partitions_disabled(ns="kafka",
topic="mytopic2",
partition=2,
value=False)

admin.set_partitions_disabled(ns="kafka", topic="mytopic3")
admin.set_partitions_disabled(ns="kafka",
topic="mytopic3",
partition=1,
value=False)

self.sync()

def pi(topic_partition, disabled=False):
topic, partition = topic_partition.split('/')
return PartitionInfo('kafka', topic, int(partition), disabled)

all_partitions = [
pi('mytopic1/0', True),
pi('mytopic1/1', True),
pi('mytopic1/2', True),
pi('mytopic2/0', False),
pi('mytopic2/1', True),
pi('mytopic2/2', False),
pi('mytopic3/0', True),
pi('mytopic3/1', False),
pi('mytopic3/2', True),
]

def filtered(topic, partition, disabled):
def filter(p):
if topic is not None and p.topic != topic:
return False
if partition is not None and p.partition_id != partition:
return False
if disabled is not None and p.disabled != disabled:
return False
return True

res = [p for p in all_partitions if filter(p)]
if partition is not None:
assert len(res) == 1
res = res[0]

return res

def get(topic=None, partition=None, disabled=None):
if topic is None and partition is None:
ns = None
else:
ns = "kafka"
if partition is None:
json = admin.get_cluster_partitions(ns=ns,
topic=topic,
disabled=disabled)

return [PartitionInfo.from_json(p) for p in json]
else:
json = admin.get_partition(ns, topic, partition)
return PartitionInfo.from_json(json)

def check_everything():
for topic in [None] + topics:
if topic is None:
partitions = [None]
else:
partitions = [None] + list(range(3))

for partition in partitions:
if partition is None:
disabled_list = [None, True, False]
else:
disabled_list = [None]

for disabled in disabled_list:
filter = (topic, partition, disabled)
expected = filtered(*filter)
got = get(*filter)
self.logger.debug(f"{filter=} {got=} {expected=}")
assert got == expected

check_everything()

for n in self.redpanda.nodes:
self.redpanda.wait_for_controller_snapshot(n)

self.redpanda.restart_nodes(self.redpanda.nodes)
self.redpanda.wait_for_membership(first_start=False)

check_everything()

0 comments on commit 0451ed9

Please sign in to comment.