diff --git a/tests/rptest/tests/scaling_up_test.py b/tests/rptest/tests/scaling_up_test.py index 91ae574d01fb..f7aaa83e9970 100644 --- a/tests/rptest/tests/scaling_up_test.py +++ b/tests/rptest/tests/scaling_up_test.py @@ -497,3 +497,115 @@ def verify_disk_usage(usage: dict, added_ids: list, percentage: float): assert self.consumer.consumer_status.validator.invalid_reads == 0, \ f"Invalid reads in topic: {topic.name}, invalid reads count: " "{self.consumer.consumer_status.validator.invalid_reads}" + + @skip_debug_mode + @cluster(num_nodes=7) + @matrix(use_topic_property=[True, False]) + def test_moves_with_local_retention(self, use_topic_property): + + log_segment_size = 2 * 1024 * 1024 + total_segments_per_partition = 40 + partition_cnt = 20 + msg_size = 16 * 1024 # 16 KiB + data_size = log_segment_size * total_segments_per_partition * partition_cnt + msg_cnt = data_size // msg_size + + # configure and start redpanda + extra_rp_conf = { + 'cloud_storage_segment_max_upload_interval_sec': 10, + 'cloud_storage_manifest_max_upload_interval_sec': 10, + } + # shadow indexing is required when we want to leverage fast partition movements + si_settings = SISettings(test_context=self.test_context, + log_segment_size=log_segment_size, + retention_local_strict=False) + + self.redpanda.set_extra_rp_conf(extra_rp_conf) + self.redpanda.set_si_settings(si_settings) + self.redpanda.start(nodes=self.redpanda.nodes[0:4]) + topic = TopicSpec(replication_factor=3, + partition_count=partition_cnt, + redpanda_remote_write=True, + redpanda_remote_read=True) + + total_replicas = 3 * partition_cnt + + self.client().create_topic(topic) + requested_local_retention = log_segment_size * 15 + + if use_topic_property: + self.client().alter_topic_config( + topic.name, TopicSpec.PROPERTY_RETENTION_LOCAL_TARGET_BYTES, + requested_local_retention) + else: + self.redpanda.set_cluster_config({ + "retention_local_target_bytes_default": + requested_local_retention + }) + + self.logger.info( + f"Producing {data_size} bytes of data in {msg_cnt} total messages") + self.producer = KgoVerifierProducer( + self.test_context, + self.redpanda, + topic.name, + msg_size=msg_size, + msg_count=msg_cnt, + custom_node=self.preallocated_nodes) + self.producer.start() + self.producer.wait() + + # add fifth node + self.redpanda.start_node(self.redpanda.nodes[4]) + self.wait_for_partitions_rebalanced(total_replicas=total_replicas, + timeout_sec=self.rebalance_timeout) + + def print_disk_usage(usage): + for n, b in usage.items(): + self.logger.info( + f"node: {n} total partitions size: {b/(1024*1024)} Mb") + + def verify_node_disk_usage(nodes, node_id): + usage_per_node = self._kafka_usage(nodes=nodes) + print_disk_usage(usage_per_node) + replicas_per_node = self._topic_replicas_per_node() + node_replicas = replicas_per_node[topic.name][node_id] + + target_size = node_replicas * requested_local_retention + + current_usage = usage_per_node[node_id] + tolerance = 0.1 + max = target_size * (1.0 + tolerance) + min = target_size * (1.0 - tolerance) + self.logger.info( + f"node {node_id} target size: {target_size}, current size: {usage_per_node[node_id]}, expected range ({min}, {max})" + ) + assert current_usage > min and current_usage < max, \ + f"node {node_id} disk usage should be withing the range ({min}, {max}). Current value: {current_usage} " + + first_new_id = self.redpanda.node_id(self.redpanda.nodes[4]) + + verify_node_disk_usage(self.redpanda.nodes[0:5], first_new_id) + + # add sixth node + self.redpanda.start_node(self.redpanda.nodes[5]) + self.wait_for_partitions_rebalanced(total_replicas=total_replicas, + timeout_sec=self.rebalance_timeout) + + usage = self._kafka_usage(self.redpanda.nodes) + print_disk_usage(usage) + next_new_id = self.redpanda.node_id(self.redpanda.nodes[5]) + verify_node_disk_usage(self.redpanda.nodes, next_new_id) + # verify that data can be read + self.consumer = KgoVerifierSeqConsumer(self.test_context, + self.redpanda, + topic.name, + msg_size, + nodes=self.preallocated_nodes) + + self.consumer.start(clean=False) + self.consumer.wait() + + assert self.consumer.consumer_status.validator.invalid_reads == 0, \ + f"Invalid reads in topic: {topic.name}, invalid reads count: " + "{self.consumer.consumer_status.validator.invalid_reads}"