From d9e31f311e5807f9a21873dfbe286ae0198aed33 Mon Sep 17 00:00:00 2001 From: Alexey Biryukov Date: Thu, 9 Mar 2023 21:59:06 -0500 Subject: [PATCH] scale_tests: 1000 readers stage Exhaust cloud cache with multiple consumers reading at random offsets. There are some cluster properties adjusted for this test that require node restarts, that's why they are put globally to the ctor and apply to all tests, but other tests should not be affected. --- .../tiered_storage_stability_test.py | 74 ++++++++++++++++++- 1 file changed, 71 insertions(+), 3 deletions(-) diff --git a/tests/rptest/scale_tests/tiered_storage_stability_test.py b/tests/rptest/scale_tests/tiered_storage_stability_test.py index cecfacaa1a1f6..832c2e0335e86 100644 --- a/tests/rptest/scale_tests/tiered_storage_stability_test.py +++ b/tests/rptest/scale_tests/tiered_storage_stability_test.py @@ -20,7 +20,8 @@ from rptest.tests.prealloc_nodes import PreallocNodesTest from rptest.services.redpanda import (RESTART_LOG_ALLOW_LIST, MetricsEndpoint, SISettings) -from rptest.services.kgo_verifier_services import KgoVerifierProducer +from rptest.services.kgo_verifier_services import (KgoVerifierProducer, + KgoVerifierRandomConsumer) from rptest.util import firewall_blocked import time @@ -65,11 +66,16 @@ def __init__(self, test_ctx, *args, **kwargs): # to pad out tiered storage metadata, we don't want them to # get merged together. 'cloud_storage_enable_segment_merging': False, + 'disable_batch_cache': True, + 'cloud_storage_cache_check_interval': 1000, }, disable_cloud_storage_diagnostics=True, **kwargs) - si_settings = SISettings(self.redpanda._context, - log_segment_size=self.small_segment_size) + si_settings = SISettings( + self.redpanda._context, + log_segment_size=self.small_segment_size, + cloud_storage_cache_size=10 * 1024 * 1024, + ) self.redpanda.set_si_settings(si_settings) self.rpk = RpkTool(self.redpanda) self.s3_port = si_settings.cloud_storage_api_endpoint_port @@ -392,3 +398,65 @@ def topic_partitions_on_node(): timeout_sec=max(60, decomm_time), backoff_sec=2) self.logger.info(f"{topic_partitions_on_node()} partitions moved") + + @cluster(num_nodes=5, log_allow_list=NOS3_LOG_ALLOW_LIST) + def test_cloud_cache_thrash(self): + """ + Try to exhaust cloud cache by reading at random offsets with many + consumers + """ + segment_size = int(self.scaled_segment_size / 8) + self.setup_cluster(segment_bytes=segment_size, + retention_local_bytes=2 * segment_size, + extra_cluster_props={ + 'cloud_storage_max_readers_per_shard': 256, + }) + + try: + producer = KgoVerifierProducer( + self.test_context, + self.redpanda, + self.topic_name, + msg_size=128 * 1024, + msg_count=5 * 1024 * 1024 * 1024 * 1024, + rate_limit_bps=self.scaled_data_bps, + custom_node=[self.preallocated_nodes[0]]) + producer.start() + wait_until(lambda: producer.produce_status.acked > 5000, + timeout_sec=60, + backoff_sec=1.0) + target_cloud_segments = 10 * self.scaled_num_partitions + wait_until(lambda: nodes_report_cloud_segments( + self.redpanda, target_cloud_segments), + timeout_sec=600, + backoff_sec=5) + producer.wait_for_offset_map() + + # Exhaust cloud cache with multiple consumers + # reading at random offsets + self.stage_cloud_cache_thrash() + + finally: + producer.stop() + producer.wait(timeout_sec=600) + self.free_preallocated_nodes() + + def stage_cloud_cache_thrash(self): + self.logger.info(f"Starting consumers") + consumer = KgoVerifierRandomConsumer( + self.test_context, + self.redpanda, + self.topic_name, + msg_size=128 * 1024, + rand_read_msgs=1, + parallel=4, + nodes=[self.preallocated_nodes[0]], + debug_logs=True, + ) + try: + consumer.start(clean=False) + time.sleep(240) + finally: + self.logger.info(f"Stopping consumers") + consumer.stop() + consumer.wait(timeout_sec=600)