Skip to content

Commit

Permalink
tests: added scaling up test with non strict local retention
Browse files Browse the repository at this point in the history
Signed-off-by: Michał Maślanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Jun 12, 2024
1 parent bf97be3 commit 4d56cfe
Showing 1 changed file with 112 additions and 0 deletions.
112 changes: 112 additions & 0 deletions tests/rptest/tests/scaling_up_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,3 +497,115 @@ def verify_disk_usage(usage: dict, added_ids: list, percentage: float):
assert self.consumer.consumer_status.validator.invalid_reads == 0, \
f"Invalid reads in topic: {topic.name}, invalid reads count: "
"{self.consumer.consumer_status.validator.invalid_reads}"

@skip_debug_mode
@cluster(num_nodes=7)
@matrix(use_topic_property=[True, False])
def test_moves_with_local_retention(self, use_topic_property):

log_segment_size = 2 * 1024 * 1024
total_segments_per_partition = 40
partition_cnt = 20
msg_size = 16 * 1024 # 16 KiB
data_size = log_segment_size * total_segments_per_partition * partition_cnt
msg_cnt = data_size // msg_size

# configure and start redpanda
extra_rp_conf = {
'cloud_storage_segment_max_upload_interval_sec': 10,
'cloud_storage_manifest_max_upload_interval_sec': 10,
}
# shadow indexing is required when we want to leverage fast partition movements
si_settings = SISettings(test_context=self.test_context,
log_segment_size=log_segment_size,
retention_local_strict=False)

self.redpanda.set_extra_rp_conf(extra_rp_conf)
self.redpanda.set_si_settings(si_settings)
self.redpanda.start(nodes=self.redpanda.nodes[0:4])
topic = TopicSpec(replication_factor=3,
partition_count=partition_cnt,
redpanda_remote_write=True,
redpanda_remote_read=True)

total_replicas = 3 * partition_cnt

self.client().create_topic(topic)
requested_local_retention = log_segment_size * 15

if use_topic_property:
self.client().alter_topic_config(
topic.name, TopicSpec.PROPERTY_RETENTION_LOCAL_TARGET_BYTES,
requested_local_retention)
else:
self.redpanda.set_cluster_config({
"retention_local_target_bytes_default":
requested_local_retention
})

self.logger.info(
f"Producing {data_size} bytes of data in {msg_cnt} total messages")
self.producer = KgoVerifierProducer(
self.test_context,
self.redpanda,
topic.name,
msg_size=msg_size,
msg_count=msg_cnt,
custom_node=self.preallocated_nodes)
self.producer.start()
self.producer.wait()

# add fifth node
self.redpanda.start_node(self.redpanda.nodes[4])
self.wait_for_partitions_rebalanced(total_replicas=total_replicas,
timeout_sec=self.rebalance_timeout)

def print_disk_usage(usage):
for n, b in usage.items():
self.logger.info(
f"node: {n} total partitions size: {b/(1024*1024)} Mb")

def verify_node_disk_usage(nodes, node_id):
usage_per_node = self._kafka_usage(nodes=nodes)
print_disk_usage(usage_per_node)
replicas_per_node = self._topic_replicas_per_node()
node_replicas = replicas_per_node[topic.name][node_id]

target_size = node_replicas * requested_local_retention

current_usage = usage_per_node[node_id]
tolerance = 0.1
max = target_size * (1.0 + tolerance)
min = target_size * (1.0 - tolerance)
self.logger.info(
f"node {node_id} target size: {target_size}, current size: {usage_per_node[node_id]}, expected range ({min}, {max})"
)
assert current_usage > min and current_usage < max, \
f"node {node_id} disk usage should be withing the range ({min}, {max}). Current value: {current_usage} "

first_new_id = self.redpanda.node_id(self.redpanda.nodes[4])

verify_node_disk_usage(self.redpanda.nodes[0:5], first_new_id)

# add sixth node
self.redpanda.start_node(self.redpanda.nodes[5])
self.wait_for_partitions_rebalanced(total_replicas=total_replicas,
timeout_sec=self.rebalance_timeout)

usage = self._kafka_usage(self.redpanda.nodes)
print_disk_usage(usage)
next_new_id = self.redpanda.node_id(self.redpanda.nodes[5])
verify_node_disk_usage(self.redpanda.nodes, next_new_id)
# verify that data can be read
self.consumer = KgoVerifierSeqConsumer(self.test_context,
self.redpanda,
topic.name,
msg_size,
nodes=self.preallocated_nodes)

self.consumer.start(clean=False)
self.consumer.wait()

assert self.consumer.consumer_status.validator.invalid_reads == 0, \
f"Invalid reads in topic: {topic.name}, invalid reads count: "
"{self.consumer.consumer_status.validator.invalid_reads}"

0 comments on commit 4d56cfe

Please sign in to comment.