diff --git a/tests/rptest/scale_tests/tiered_storage_stability_test.py b/tests/rptest/scale_tests/tiered_storage_stability_test.py index cecfacaa1a1f6..832c2e0335e86 100644 --- a/tests/rptest/scale_tests/tiered_storage_stability_test.py +++ b/tests/rptest/scale_tests/tiered_storage_stability_test.py @@ -20,7 +20,8 @@ from rptest.tests.prealloc_nodes import PreallocNodesTest from rptest.services.redpanda import (RESTART_LOG_ALLOW_LIST, MetricsEndpoint, SISettings) -from rptest.services.kgo_verifier_services import KgoVerifierProducer +from rptest.services.kgo_verifier_services import (KgoVerifierProducer, + KgoVerifierRandomConsumer) from rptest.util import firewall_blocked import time @@ -65,11 +66,16 @@ def __init__(self, test_ctx, *args, **kwargs): # to pad out tiered storage metadata, we don't want them to # get merged together. 'cloud_storage_enable_segment_merging': False, + 'disable_batch_cache': True, + 'cloud_storage_cache_check_interval': 1000, }, disable_cloud_storage_diagnostics=True, **kwargs) - si_settings = SISettings(self.redpanda._context, - log_segment_size=self.small_segment_size) + si_settings = SISettings( + self.redpanda._context, + log_segment_size=self.small_segment_size, + cloud_storage_cache_size=10 * 1024 * 1024, + ) self.redpanda.set_si_settings(si_settings) self.rpk = RpkTool(self.redpanda) self.s3_port = si_settings.cloud_storage_api_endpoint_port @@ -392,3 +398,65 @@ def topic_partitions_on_node(): timeout_sec=max(60, decomm_time), backoff_sec=2) self.logger.info(f"{topic_partitions_on_node()} partitions moved") + + @cluster(num_nodes=5, log_allow_list=NOS3_LOG_ALLOW_LIST) + def test_cloud_cache_thrash(self): + """ + Try to exhaust cloud cache by reading at random offsets with many + consumers + """ + segment_size = int(self.scaled_segment_size / 8) + self.setup_cluster(segment_bytes=segment_size, + retention_local_bytes=2 * segment_size, + extra_cluster_props={ + 'cloud_storage_max_readers_per_shard': 256, + }) + + try: + producer = KgoVerifierProducer( + self.test_context, + self.redpanda, + self.topic_name, + msg_size=128 * 1024, + msg_count=5 * 1024 * 1024 * 1024 * 1024, + rate_limit_bps=self.scaled_data_bps, + custom_node=[self.preallocated_nodes[0]]) + producer.start() + wait_until(lambda: producer.produce_status.acked > 5000, + timeout_sec=60, + backoff_sec=1.0) + target_cloud_segments = 10 * self.scaled_num_partitions + wait_until(lambda: nodes_report_cloud_segments( + self.redpanda, target_cloud_segments), + timeout_sec=600, + backoff_sec=5) + producer.wait_for_offset_map() + + # Exhaust cloud cache with multiple consumers + # reading at random offsets + self.stage_cloud_cache_thrash() + + finally: + producer.stop() + producer.wait(timeout_sec=600) + self.free_preallocated_nodes() + + def stage_cloud_cache_thrash(self): + self.logger.info(f"Starting consumers") + consumer = KgoVerifierRandomConsumer( + self.test_context, + self.redpanda, + self.topic_name, + msg_size=128 * 1024, + rand_read_msgs=1, + parallel=4, + nodes=[self.preallocated_nodes[0]], + debug_logs=True, + ) + try: + consumer.start(clean=False) + time.sleep(240) + finally: + self.logger.info(f"Stopping consumers") + consumer.stop() + consumer.wait(timeout_sec=600)