Skip to content

Commit

Permalink
scale_tests: stop-wait-start stage generalized
Browse files Browse the repository at this point in the history
Same stage function used for 3 tests: ungraceful stop and start,
stop wait and start, and potentially for 30-min stop
  • Loading branch information
dlex authored and andrewhsu committed Aug 17, 2023
1 parent a5763eb commit b2768df
Showing 1 changed file with 66 additions and 37 deletions.
103 changes: 66 additions & 37 deletions tests/rptest/scale_tests/tiered_storage_stability_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class TieredStorageWithLoadTest(PreallocNodesTest):
num_brokers = 4
scaling_factor = num_brokers / 13
scaled_data_bps = int(unscaled_data_bps * scaling_factor) # ~0.53 GiB/s
scaled_num_partitions = int(unscaled_num_partitions * scaling_factor) # 315
scaled_num_partitions = int(unscaled_num_partitions *
scaling_factor) # 315
scaled_segment_size = int(regular_segment_size * scaling_factor)
num_segments_per_partition = 1000
unavailable_timeout = 60
Expand Down Expand Up @@ -73,12 +74,17 @@ def __init__(self, test_ctx, *args, **kwargs):
self.rpk = RpkTool(self.redpanda)
self.s3_port = si_settings.cloud_storage_api_endpoint_port

def setup_cluster(self, segment_bytes: int, retention_local_bytes: int, extra_cluster_props: dict = {}):
self.redpanda.set_cluster_config({
'partition_autobalancing_node_availability_timeout_sec': self.unavailable_timeout,
'partition_autobalancing_mode': 'continuous',
'raft_learner_recovery_rate': 10 * 1024 * 1024 * 1024,
} | extra_cluster_props)
def setup_cluster(self,
segment_bytes: int,
retention_local_bytes: int,
extra_cluster_props: dict = {}):
self.redpanda.set_cluster_config(
{
'partition_autobalancing_node_availability_timeout_sec':
self.unavailable_timeout,
'partition_autobalancing_mode': 'continuous',
'raft_learner_recovery_rate': 10 * 1024 * 1024 * 1024,
} | extra_cluster_props)
topic_config = {
# Use a tiny segment size so we can generate many cloud segments
# very quickly.
Expand Down Expand Up @@ -122,9 +128,13 @@ def load_many_segments(self):

# Once some segments are generated, configure the topic to use more
# realistic sizes.
retention_bytes = int(self.scaled_data_bps * 60 * 60 * 6 / self.scaled_num_partitions)
self.rpk.alter_topic_config(self.topic_name, 'segment.bytes', self.regular_segment_size)
self.rpk.alter_topic_config(self.topic_name, 'retention.local.target.bytes', retention_bytes)
retention_bytes = int(self.scaled_data_bps * 60 * 60 * 6 /
self.scaled_num_partitions)
self.rpk.alter_topic_config(self.topic_name, 'segment.bytes',
self.regular_segment_size)
self.rpk.alter_topic_config(self.topic_name,
'retention.local.target.bytes',
retention_bytes)

def get_node(self, idx: int):
node = self.redpanda.nodes[idx]
Expand All @@ -150,8 +160,8 @@ def test_restarts(self):
try:
producer.start()
wait_until(lambda: producer.produce_status.acked > 10000,
timeout_sec=60,
backoff_sec=1.0)
timeout_sec=60,
backoff_sec=1.0)

# Run a rolling restart.
self.stage_rolling_restart()
Expand All @@ -161,7 +171,13 @@ def test_restarts(self):

# Stop a node, wait for enough time for movement to occur, then
# restart.
self.stage_stop_wait_start()
self.stage_stop_wait_start(forced_stop=False,
downtime=self.unavailable_timeout)

# Stop a node and wait for really long time to collect a lot
# of under-replicated msgs, then restart.
# This is not to be run nightly so disabled for now
#self.stage_stop_wait_start(forced_stop=False, downtime=60*30)

# Block traffic to/from one node.
self.stage_block_node_traffic()
Expand All @@ -172,17 +188,17 @@ def test_restarts(self):
self.free_preallocated_nodes()

NOS3_LOG_ALLOW_LIST = [
re.compile(
"s3 - .* - Accessing .*, unexpected REST API error "" detected, code: RequestTimeout"
),
re.compile("s3 - .* - Accessing .*, unexpected REST API error "
" detected, code: RequestTimeout"),
]


# Stages for the "test_restarts"

def stage_rolling_restart(self):
self.logger.info(f"Rolling restarting nodes")
self.redpanda.rolling_restart_nodes(self.redpanda.nodes, start_timeout=600, stop_timeout=600)
self.redpanda.rolling_restart_nodes(self.redpanda.nodes,
start_timeout=600,
stop_timeout=600)

def stage_block_node_traffic(self):
node, node_id, node_str = self.get_node(0)
Expand All @@ -200,15 +216,24 @@ def stage_block_node_traffic(self):
pass
wait_until(self.redpanda.healthy, timeout_sec=600, backoff_sec=1)

def stage_stop_wait_start(self):
def stage_stop_wait_start(self, forced_stop: bool, downtime: int):
node, node_id, node_str = self.get_node(1)
self.logger.info(f"Hard stopping node {node_str}")
self.redpanda.stop_node(node, forced=True)
time.sleep(60)
self.logger.info(f"Restarting node {node_str}")
self.redpanda.start_node(node, timeout=600)
wait_until(self.redpanda.healthy, timeout_sec=600, backoff_sec=1)
self.logger.info(
f"Stopping node {node_str} {'ungracefully' if forced_stop else 'gracefully'}"
)
self.redpanda.stop_node(node,
forced=forced_stop,
timeout=60 if forced_stop else 180)

self.logger.info(f"Node downtime {downtime} s")
time.sleep(downtime)

restart_timeout = 300 + int(900 * downtime / 60)
self.logger.info(f"Restarting node {node_str} for {restart_timeout} s")
self.redpanda.start_node(node, timeout=600)
wait_until(self.redpanda.healthy,
timeout_sec=restart_timeout,
backoff_sec=1)

@cluster(num_nodes=5, log_allow_list=NOS3_LOG_ALLOW_LIST)
def test_disrupt_cloud_storage(self):
Expand All @@ -219,7 +244,7 @@ def test_disrupt_cloud_storage(self):
self.setup_cluster(
# Segments should go into the cloud at a reasonable rate,
# that's why it is smaller than it should be
segment_bytes = int(self.scaled_segment_size/2),
segment_bytes=int(self.scaled_segment_size / 2),
retention_local_bytes=2 * self.scaled_segment_size,
)

Expand Down Expand Up @@ -249,7 +274,8 @@ def _cloud_storage_no_new_errors(self, redpanda, logger=None):
num_errors = redpanda.metric_sum(
"redpanda_cloud_storage_errors_total",
metrics_endpoint=MetricsEndpoint.PUBLIC_METRICS)
increase = (num_errors - self.last_num_errors) if self.last_num_errors>0 else 0
increase = (num_errors -
self.last_num_errors) if self.last_num_errors > 0 else 0
self.last_num_errors = num_errors
if logger:
logger.info(
Expand All @@ -266,14 +292,18 @@ def stage_block_s3(self):
self.last_num_errors = 0
with firewall_blocked(self.redpanda.nodes, self.s3_port):
# wait for the first cloud related failure + one minute
wait_until(lambda: not self._cloud_storage_no_new_errors(self.redpanda, self.logger),
timeout_sec=600, backoff_sec=10)
wait_until(lambda: not self._cloud_storage_no_new_errors(
self.redpanda, self.logger),
timeout_sec=600,
backoff_sec=10)
time.sleep(60)
# make sure nothing is crashed
wait_until(self.redpanda.healthy, timeout_sec=60, backoff_sec=1)
self.logger.info(f"Waiting for S3 errors to cease")
wait_until(lambda: self._cloud_storage_no_new_errors(self.redpanda, self.logger),
timeout_sec=600, backoff_sec=20)
wait_until(lambda: self._cloud_storage_no_new_errors(
self.redpanda, self.logger),
timeout_sec=600,
backoff_sec=20)

@cluster(num_nodes=5, log_allow_list=RESTART_LOG_ALLOW_LIST)
def test_decommission_and_add(self):
Expand Down Expand Up @@ -312,7 +342,7 @@ def test_decommission_and_add(self):
producer.wait(timeout_sec=600)
finally:
self.free_preallocated_nodes()

def stage_decommission_and_add(self):
node, node_id, node_str = self.get_node(1)

Expand All @@ -327,7 +357,7 @@ def topic_partitions_on_node():
])
self.logger.debug(f"Partitions in the node-topic: {n}")
return n

nt_partitions_before = topic_partitions_on_node()

self.logger.info(
Expand Down Expand Up @@ -358,8 +388,7 @@ def topic_partitions_on_node():
self.logger.info(
f"Node added, new node_id: {new_node_id}, waiting for {int(nt_partitions_before/2)} partitions to move there in {int(decomm_time)} s"
)
wait_until(
lambda: topic_partitions_on_node() > nt_partitions_before / 2,
timeout_sec=max(60, decomm_time),
backoff_sec=2)
wait_until(lambda: topic_partitions_on_node() > nt_partitions_before / 2,
timeout_sec=max(60, decomm_time),
backoff_sec=2)
self.logger.info(f"{topic_partitions_on_node()} partitions moved")

0 comments on commit b2768df

Please sign in to comment.