diff --git a/tests/rptest/tests/partition_balancer_test.py b/tests/rptest/tests/partition_balancer_test.py index 230ffa1021b62..adba1d60ba8fc 100644 --- a/tests/rptest/tests/partition_balancer_test.py +++ b/tests/rptest/tests/partition_balancer_test.py @@ -16,7 +16,7 @@ from rptest.util import wait_until_result from rptest.utils.mode_checks import skip_debug_mode from rptest.clients.default import DefaultClient -from rptest.services.redpanda import make_redpanda_service, CHAOS_LOG_ALLOW_LIST, MetricsEndpoint +from rptest.services.redpanda import SISettings, make_redpanda_service, CHAOS_LOG_ALLOW_LIST, MetricsEndpoint from rptest.services.failure_injector import make_failure_injector, FailureSpec from rptest.services.admin_ops_fuzzer import AdminOperationsFuzzer from rptest.services.kgo_verifier_services import KgoVerifierProducer @@ -658,7 +658,8 @@ def test_full_nodes(self): "health_monitor_max_metadata_age": 3000, "log_segment_size": 104857600, # 100 MiB "retention_local_target_capacity_percent": 100.0, - "disk_reservation_percent": 0.0, + # add disk reservation to buffer + "disk_reservation_percent": 10.0, "retention_local_trim_interval": 3000 }, environment={"__REDPANDA_TEST_DISK_SIZE": disk_size}) @@ -748,6 +749,159 @@ def is_ready_and_stable(s): # and partition balancing is not invoked yet assert used_ratio < 0.81 + @cluster(num_nodes=6, log_allow_list=CHAOS_LOG_ALLOW_LIST) + def test_nodes_with_reclaimable_space(self): + """ + Test partition balancer cooperation with space management policy + """ + + skip_reason = None + if not self.test_context.globals.get('use_xfs_partitions', False): + skip_reason = "looks like we are not using separate partitions for each node" + elif os.environ.get('BUILD_TYPE', None) == 'debug': + skip_reason = "debug builds are too slow" + + if skip_reason: + self.logger.warn("skipping test: " + skip_reason) + # avoid the "Test requested 6 nodes, used only 0" error + self.redpanda = make_redpanda_service(self.test_context, 0) + self.test_context.cluster.alloc(ClusterSpec.simple_linux(6)) + return + si_settings = None + segment_size = 5 * 1024 * 1024 # 5MB + disk_size = 10 * 1024 * 1024 * 1024 # 10GB + partition_count = 30 + + # configure local retention to be below the balancer threshold + local_retention_percents = 50 + local_retention_bytes = int( + (disk_size * (local_retention_percents / 100.0)) / partition_count) + + si_settings = SISettings( + test_context=self.test_context, + log_segment_size=segment_size, + cloud_storage_segment_max_upload_interval_sec=5, + cloud_storage_manifest_max_upload_interval_sec=3, + cloud_storage_cache_size=1024 * 1024 * 1024, + retention_local_strict=False) + + self.start_redpanda( + num_nodes=5, + extra_rp_conf={ + "storage_min_free_bytes": 10 * 1024 * 1024, + "raft_learner_recovery_rate": 100_000_000, + "health_monitor_max_metadata_age": 3000, + "log_segment_size": segment_size, + "retention_local_trim_interval": 100, + # add disk reservation to buffer + "disk_reservation_percent": 10.0, + "partition_autobalancing_max_disk_usage_percent": 70, + 'retention_local_target_capacity_percent': 100, + 'storage_space_alert_free_threshold_percent': 0, + "retention_local_strict": False, + "partition_autobalancing_mode": "off" + }, + environment={"__REDPANDA_TEST_DISK_SIZE": disk_size}, + si_settings=si_settings) + + self.topic = TopicSpec(partition_count=partition_count) + self.topic.redpanda_remote_read = True + self.topic.redpanda_remote_write = True + self.client().create_topic(self.topic) + self.client().alter_topic_config( + self.topic.name, TopicSpec.PROPERTY_RETENTION_LOCAL_TARGET_BYTES, + local_retention_bytes) + + def get_avg_disk_usage(): + node_disk_sizes = { + n.account.hostname: self.redpanda.get_node_disk_usage(n) + for n in self.redpanda.nodes + } + + for n, sz in node_disk_sizes.items(): + self.logger.info( + f"node: {n} used disk size: {sz/(1024*1024):.2f} MB, usage: {100.0*sz/disk_size:.2f} %" + ) + + return sum(node_disk_sizes.values()) / len( + self.redpanda.nodes) / disk_size + + msg_size = 102_400 + produce_batch_size = ceil(disk_size / msg_size / 30) + # produce up to 80 percentes of disk used + while get_avg_disk_usage() < 0.80: + producer = KgoVerifierProducer(self.test_context, + self.redpanda, + self.topic, + msg_size=msg_size, + msg_count=produce_batch_size) + producer.start(clean=False) + producer.wait_for_acks(produce_batch_size, + timeout_sec=120, + backoff_sec=5) + producer.stop() + producer.free() + + def print_disk_usage_per_node(): + for n in self.redpanda.nodes: + disk_usage = self.redpanda.get_node_disk_usage(n) + self.logger.info( + f"node {self.redpanda.idx(n)}: " + f"disk used percentage: {int(100.0 * disk_usage/disk_size)}" + ) + + # turn the partition balancer on + self.redpanda.set_cluster_config( + {"partition_autobalancing_mode": "continuous"}) + + # Wait until the balancer is active and stable for a few ticks in a row + ready_appeared_at = None + + # even tho the disk size is above the balancer threshold there should be enough reclaimable space that the balancer remains inactive + def is_ready_and_stable(s): + print_disk_usage_per_node() + nonlocal ready_appeared_at + if s["status"] == "ready": + if ready_appeared_at is None: + # Disable leader balancer after partition balancer has finished its work, + # because leadership transfers will create new segments that can cause disk + # usage to go over limit again even after we've verified that everything + # is stable. + self.redpanda.set_cluster_config( + {"enable_leader_balancer": False}) + ready_appeared_at = time.time() + else: + # ready status is stable for 11 seconds, should be enough for 3 ticks to pass + return time.time() - ready_appeared_at > 11.0 + else: + ready_appeared_at = None + + self.wait_until_status(is_ready_and_stable) + # check the partition balancer can still work with the disk + # having reclaimable space but no free space + with self.NodeStopper(self) as ns: + ns.make_unavailable(random.choice(self.redpanda.nodes), + failure_types=[FailureSpec.FAILURE_KILL]) + + # Wait until the balancer manages to move partitions from the killed node. + def is_in_progress(s): + print_disk_usage_per_node() + return s["status"] == "in_progress" + + self.wait_until_status(is_in_progress) + + def is_ready(s): + print_disk_usage_per_node() + return s["status"] == "ready" + + self.wait_until_status(is_ready) + self.check_no_replicas_on_node(ns.cur_failure.node) + + # bring failed node up + ns.make_available() + + self.wait_until_status(is_ready_and_stable) + @skip_debug_mode @cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST) @matrix(kill_same_node=[True, False])