Skip to content

Commit

Permalink
tests: add core count decrease test case to shard_placement_test
Browse files Browse the repository at this point in the history
  • Loading branch information
ztlpn committed Jun 28, 2024
1 parent 3c2c5b6 commit a2a27c6
Showing 1 changed file with 57 additions and 13 deletions.
70 changes: 57 additions & 13 deletions tests/rptest/tests/shard_placement_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,15 +345,19 @@ def test_manual_rebalance(self):

@cluster(num_nodes=6)
def test_core_count_change(self):
self.redpanda.set_resource_settings(ResourceSettings(num_cpus=1))
initial_core_count = self.redpanda.get_node_cpu_count()

self.redpanda.set_resource_settings(
ResourceSettings(num_cpus=initial_core_count - 1))
self.redpanda.start()

admin = Admin(self.redpanda)
rpk = RpkTool(self.redpanda)

n_partitions = 10

for topic in ["foo", "bar"]:
topics = ["foo", "bar"]
for topic in topics:
# create topics with rf=5 for ease of accounting
rpk.create_topic(topic, partitions=n_partitions, replicas=5)

Expand All @@ -365,18 +369,41 @@ def test_core_count_change(self):

node = self.redpanda.nodes[0]
node_id = self.redpanda.node_id(node)
self.redpanda.stop_node(node)
self.redpanda.set_resource_settings(ResourceSettings(num_cpus=2))
self.redpanda.start_node(node)
self.redpanda.wait_for_membership(first_start=False)

def restart_node(num_cpus):
self.redpanda.stop_node(node)
self.redpanda.set_resource_settings(
ResourceSettings(num_cpus=num_cpus))
self.redpanda.start_node(node)
self.redpanda.wait_for_membership(first_start=False)

def configuration_updated():
for n in self.redpanda.nodes:
broker = [
b for b in admin.get_brokers(node=n)
if b["node_id"] == node_id
][0]
if broker["num_cores"] != num_cpus:
return False
return True

wait_until(configuration_updated, timeout_sec=15, backoff_sec=2)

restart_node(num_cpus=initial_core_count)

# check that the node moved partitions to the new core
def check_balanced_shard_map(shard_map, num_cpus):
self.print_shard_stats(shard_map)
counts_by_topic = self.get_shard_counts_by_topic(
shard_map, node_id)
for topic in topics:
shard_counts = counts_by_topic[topic]
assert len(shard_counts) == num_cpus
assert sum(shard_counts) == n_partitions
assert max(shard_counts) - min(shard_counts) <= 1

shard_map = self.get_replica_shard_map([node], admin)
self.print_shard_stats(shard_map)
counts_by_topic = self.get_shard_counts_by_topic(shard_map, node_id)
assert len(counts_by_topic) > 0
for topic, shard_counts in counts_by_topic.items():
assert max(shard_counts) - min(shard_counts) <= 1
check_balanced_shard_map(shard_map, initial_core_count)

# do some manual moves and check that their effects remain
# if the core count doesn't change.
Expand Down Expand Up @@ -404,9 +431,26 @@ def test_core_count_change(self):
self.print_shard_stats(map_after_restart)
assert map_after_restart == shard_map

self.stop_client_load()
self.logger.info("decreasing core count...")

restart_node(num_cpus=initial_core_count - 1)
shard_map = self.get_replica_shard_map([node], admin)
check_balanced_shard_map(shard_map, initial_core_count - 1)

self.logger.info("creating another topic...")
rpk.create_topic("quux", partitions=n_partitions, replicas=5)
topics.append("quux")

# TODO: core count decrease (not supported yet)
shard_map = self.wait_shard_map_stationary([node], admin)
check_balanced_shard_map(shard_map, initial_core_count - 1)

self.logger.info("increasing core count back...")

restart_node(num_cpus=initial_core_count)
shard_map = self.get_replica_shard_map([node], admin)
check_balanced_shard_map(shard_map, initial_core_count)

self.stop_client_load()

@cluster(num_nodes=6)
def test_node_join(self):
Expand Down

0 comments on commit a2a27c6

Please sign in to comment.