diff --git a/tests/rptest/tests/shard_placement_test.py b/tests/rptest/tests/shard_placement_test.py index 687d465ea979b..20baaa86f4fbd 100644 --- a/tests/rptest/tests/shard_placement_test.py +++ b/tests/rptest/tests/shard_placement_test.py @@ -345,7 +345,10 @@ def test_manual_rebalance(self): @cluster(num_nodes=6) def test_core_count_change(self): - self.redpanda.set_resource_settings(ResourceSettings(num_cpus=1)) + initial_core_count = self.redpanda.get_node_cpu_count() + + self.redpanda.set_resource_settings( + ResourceSettings(num_cpus=initial_core_count - 1)) self.redpanda.start() admin = Admin(self.redpanda) @@ -353,7 +356,8 @@ def test_core_count_change(self): n_partitions = 10 - for topic in ["foo", "bar"]: + topics = ["foo", "bar"] + for topic in topics: # create topics with rf=5 for ease of accounting rpk.create_topic(topic, partitions=n_partitions, replicas=5) @@ -365,18 +369,41 @@ def test_core_count_change(self): node = self.redpanda.nodes[0] node_id = self.redpanda.node_id(node) - self.redpanda.stop_node(node) - self.redpanda.set_resource_settings(ResourceSettings(num_cpus=2)) - self.redpanda.start_node(node) - self.redpanda.wait_for_membership(first_start=False) + + def restart_node(num_cpus): + self.redpanda.stop_node(node) + self.redpanda.set_resource_settings( + ResourceSettings(num_cpus=num_cpus)) + self.redpanda.start_node(node) + self.redpanda.wait_for_membership(first_start=False) + + def configuration_updated(): + for n in self.redpanda.nodes: + broker = [ + b for b in admin.get_brokers(node=n) + if b["node_id"] == node_id + ][0] + if broker["num_cores"] != num_cpus: + return False + return True + + wait_until(configuration_updated, timeout_sec=15, backoff_sec=2) + + restart_node(num_cpus=initial_core_count) # check that the node moved partitions to the new core + def check_balanced_shard_map(shard_map, num_cpus): + self.print_shard_stats(shard_map) + counts_by_topic = self.get_shard_counts_by_topic( + shard_map, node_id) + for topic in topics: + shard_counts = counts_by_topic[topic] + assert len(shard_counts) == num_cpus + assert sum(shard_counts) == n_partitions + assert max(shard_counts) - min(shard_counts) <= 1 + shard_map = self.get_replica_shard_map([node], admin) - self.print_shard_stats(shard_map) - counts_by_topic = self.get_shard_counts_by_topic(shard_map, node_id) - assert len(counts_by_topic) > 0 - for topic, shard_counts in counts_by_topic.items(): - assert max(shard_counts) - min(shard_counts) <= 1 + check_balanced_shard_map(shard_map, initial_core_count) # do some manual moves and check that their effects remain # if the core count doesn't change. @@ -404,9 +431,26 @@ def test_core_count_change(self): self.print_shard_stats(map_after_restart) assert map_after_restart == shard_map - self.stop_client_load() + self.logger.info("decreasing core count...") + + restart_node(num_cpus=initial_core_count - 1) + shard_map = self.get_replica_shard_map([node], admin) + check_balanced_shard_map(shard_map, initial_core_count - 1) + + self.logger.info("creating another topic...") + rpk.create_topic("quux", partitions=n_partitions, replicas=5) + topics.append("quux") - # TODO: core count decrease (not supported yet) + shard_map = self.wait_shard_map_stationary([node], admin) + check_balanced_shard_map(shard_map, initial_core_count - 1) + + self.logger.info("increasing core count back...") + + restart_node(num_cpus=initial_core_count) + shard_map = self.get_replica_shard_map([node], admin) + check_balanced_shard_map(shard_map, initial_core_count) + + self.stop_client_load() @cluster(num_nodes=6) def test_node_join(self):