Skip to content

Commit

Permalink
scale_tests: 1000 readers stage
Browse files Browse the repository at this point in the history
Exhaust cloud cache with multiple consumers reading at random offsets.
There are some cluster properties adjusted for this test that
require node restarts, that's why they are put globally to the ctor
and apply to all tests, but other tests should not be affected.
  • Loading branch information
dlex authored and andrewhsu committed Aug 17, 2023
1 parent b2768df commit d9e31f3
Showing 1 changed file with 71 additions and 3 deletions.
74 changes: 71 additions & 3 deletions tests/rptest/scale_tests/tiered_storage_stability_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from rptest.tests.prealloc_nodes import PreallocNodesTest
from rptest.services.redpanda import (RESTART_LOG_ALLOW_LIST, MetricsEndpoint,
SISettings)
from rptest.services.kgo_verifier_services import KgoVerifierProducer
from rptest.services.kgo_verifier_services import (KgoVerifierProducer,
KgoVerifierRandomConsumer)
from rptest.util import firewall_blocked
import time

Expand Down Expand Up @@ -65,11 +66,16 @@ def __init__(self, test_ctx, *args, **kwargs):
# to pad out tiered storage metadata, we don't want them to
# get merged together.
'cloud_storage_enable_segment_merging': False,
'disable_batch_cache': True,
'cloud_storage_cache_check_interval': 1000,
},
disable_cloud_storage_diagnostics=True,
**kwargs)
si_settings = SISettings(self.redpanda._context,
log_segment_size=self.small_segment_size)
si_settings = SISettings(
self.redpanda._context,
log_segment_size=self.small_segment_size,
cloud_storage_cache_size=10 * 1024 * 1024,
)
self.redpanda.set_si_settings(si_settings)
self.rpk = RpkTool(self.redpanda)
self.s3_port = si_settings.cloud_storage_api_endpoint_port
Expand Down Expand Up @@ -392,3 +398,65 @@ def topic_partitions_on_node():
timeout_sec=max(60, decomm_time),
backoff_sec=2)
self.logger.info(f"{topic_partitions_on_node()} partitions moved")

@cluster(num_nodes=5, log_allow_list=NOS3_LOG_ALLOW_LIST)
def test_cloud_cache_thrash(self):
"""
Try to exhaust cloud cache by reading at random offsets with many
consumers
"""
segment_size = int(self.scaled_segment_size / 8)
self.setup_cluster(segment_bytes=segment_size,
retention_local_bytes=2 * segment_size,
extra_cluster_props={
'cloud_storage_max_readers_per_shard': 256,
})

try:
producer = KgoVerifierProducer(
self.test_context,
self.redpanda,
self.topic_name,
msg_size=128 * 1024,
msg_count=5 * 1024 * 1024 * 1024 * 1024,
rate_limit_bps=self.scaled_data_bps,
custom_node=[self.preallocated_nodes[0]])
producer.start()
wait_until(lambda: producer.produce_status.acked > 5000,
timeout_sec=60,
backoff_sec=1.0)
target_cloud_segments = 10 * self.scaled_num_partitions
wait_until(lambda: nodes_report_cloud_segments(
self.redpanda, target_cloud_segments),
timeout_sec=600,
backoff_sec=5)
producer.wait_for_offset_map()

# Exhaust cloud cache with multiple consumers
# reading at random offsets
self.stage_cloud_cache_thrash()

finally:
producer.stop()
producer.wait(timeout_sec=600)
self.free_preallocated_nodes()

def stage_cloud_cache_thrash(self):
self.logger.info(f"Starting consumers")
consumer = KgoVerifierRandomConsumer(
self.test_context,
self.redpanda,
self.topic_name,
msg_size=128 * 1024,
rand_read_msgs=1,
parallel=4,
nodes=[self.preallocated_nodes[0]],
debug_logs=True,
)
try:
consumer.start(clean=False)
time.sleep(240)
finally:
self.logger.info(f"Stopping consumers")
consumer.stop()
consumer.wait(timeout_sec=600)

0 comments on commit d9e31f3

Please sign in to comment.