Skip to content

Commit

Permalink
tests: added basic test verifying fast partition moves when adding node
Browse files Browse the repository at this point in the history
Signed-off-by: Michal Maslanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Oct 24, 2023
1 parent 68d4cbc commit 23fada2
Showing 1 changed file with 108 additions and 3 deletions.
111 changes: 108 additions & 3 deletions tests/rptest/tests/scaling_up_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
from rptest.clients.kafka_cat import KafkaCat
from rptest.clients.types import TopicSpec
from rptest.clients.default import DefaultClient
from rptest.services.kgo_verifier_services import KgoVerifierConsumerGroupConsumer, KgoVerifierProducer
from rptest.services.redpanda import make_redpanda_service
from rptest.services.kgo_verifier_services import KgoVerifierConsumerGroupConsumer, KgoVerifierSeqConsumer, KgoVerifierProducer
from rptest.services.redpanda import SISettings
import concurrent

from rptest.tests.prealloc_nodes import PreallocNodesTest
from rptest.utils.mode_checks import skip_debug_mode


class ScalingUpTest(PreallocNodesTest):
Expand Down Expand Up @@ -381,4 +383,107 @@ def initial_rebalance_finished():
self.redpanda.start_node(unavailable_node)
self.wait_for_partitions_rebalanced(total_replicas=total_replicas,
timeout_sec=self.rebalance_timeout)
self.verify()
self.verify()

def _kafka_usage(self, nodes):
usage_per_node = {}
for n in nodes:
id = self.redpanda.node_id(n)
disk_usage = self.redpanda.data_dir_usage("kafka", n)
usage_per_node[id] = disk_usage

return usage_per_node

@skip_debug_mode
@cluster(num_nodes=7)
def test_fast_node_addition(self):

log_segment_size = 2 * 1024 * 1024
total_segments_per_partition = 20
partition_cnt = 40
msg_size = 16 * 1024 # 16 KiB
data_size = log_segment_size * total_segments_per_partition * partition_cnt
msg_cnt = data_size // msg_size

# configure and start redpanda
extra_rp_conf = {
'cloud_storage_segment_max_upload_interval_sec': 10,
'cloud_storage_manifest_max_upload_interval_sec': 10,
# setup initial retention target to 1 segment
'initial_retention_local_target_bytes_default': log_segment_size,
}
# shadow indexing is required when we want to leverage fast partition movements
si_settings = SISettings(test_context=self.test_context,
log_segment_size=log_segment_size,
retention_local_strict=False)

self.redpanda.set_extra_rp_conf(extra_rp_conf)
self.redpanda.set_si_settings(si_settings)
self.redpanda.start(nodes=self.redpanda.nodes[0:4])
topic = TopicSpec(replication_factor=3,
partition_count=partition_cnt,
redpanda_remote_write=True,
redpanda_remote_read=True)

total_replicas = 3 * partition_cnt

self.client().create_topic(topic)
self.logger.info(
f"Producing {data_size} bytes of data in {msg_cnt} total messages")
self.producer = KgoVerifierProducer(
self.test_context,
self.redpanda,
topic.name,
msg_size=msg_size,
msg_count=msg_cnt,
custom_node=self.preallocated_nodes)
self.producer.start()
self.producer.wait()

# add fourth node
self.redpanda.start_node(self.redpanda.nodes[4])
self.wait_for_partitions_rebalanced(total_replicas=total_replicas,
timeout_sec=self.rebalance_timeout)

def print_disk_usage(usage):
for n, b in usage.items():
self.logger.info(
f"node: {n} total partitions size: {b/(1024*1024)} Mb")

def verify_disk_usage(usage: dict, added_ids: list, percentage: float):
old_nodes_usage = [
b for id, b in usage.items() if id not in added_ids
]
avg_usage = sum(old_nodes_usage) / len(old_nodes_usage)

for id in added_ids:
added_node_usage = usage[id]
assert added_node_usage < percentage * avg_usage, f"Added node {id} disk usage {added_node_usage} is to large, expected usage to be smaller than {percentage * avg_usage} bytes"

usage = self._kafka_usage(nodes=self.redpanda.nodes[0:5])
print_disk_usage(usage)

verify_disk_usage(usage,
[self.redpanda.node_id(self.redpanda.nodes[4])], 0.2)

# add fifth node
self.redpanda.start_node(self.redpanda.nodes[5])
self.wait_for_partitions_rebalanced(total_replicas=total_replicas,
timeout_sec=self.rebalance_timeout)

usage = self._kafka_usage(self.redpanda.nodes)
print_disk_usage(usage)
verify_disk_usage(usage, [
self.redpanda.node_id(self.redpanda.nodes[4]),
self.redpanda.node_id(self.redpanda.nodes[5])
], 0.2)
# verify that data can be read
self.consumer = KgoVerifierSeqConsumer(self.test_context,
self.redpanda,
topic.name,
msg_size,
nodes=self.preallocated_nodes)

self.consumer.start(clean=False)
self.consumer.wait()
assert self.consumer.consumer_status.validator.invalid_reads == 0, f"Invalid reads in topic: {topic.name}, invalid reads count: {self.consumer.consumer_status.validator.invalid_reads}"

0 comments on commit 23fada2

Please sign in to comment.