diff --git a/tests/rptest/scale_tests/tiered_storage_stability_test.py b/tests/rptest/scale_tests/tiered_storage_stability_test.py index f97aa3ec2016..cecfacaa1a1f 100644 --- a/tests/rptest/scale_tests/tiered_storage_stability_test.py +++ b/tests/rptest/scale_tests/tiered_storage_stability_test.py @@ -41,7 +41,8 @@ class TieredStorageWithLoadTest(PreallocNodesTest): num_brokers = 4 scaling_factor = num_brokers / 13 scaled_data_bps = int(unscaled_data_bps * scaling_factor) # ~0.53 GiB/s - scaled_num_partitions = int(unscaled_num_partitions * scaling_factor) # 315 + scaled_num_partitions = int(unscaled_num_partitions * + scaling_factor) # 315 scaled_segment_size = int(regular_segment_size * scaling_factor) num_segments_per_partition = 1000 unavailable_timeout = 60 @@ -73,12 +74,17 @@ def __init__(self, test_ctx, *args, **kwargs): self.rpk = RpkTool(self.redpanda) self.s3_port = si_settings.cloud_storage_api_endpoint_port - def setup_cluster(self, segment_bytes: int, retention_local_bytes: int, extra_cluster_props: dict = {}): - self.redpanda.set_cluster_config({ - 'partition_autobalancing_node_availability_timeout_sec': self.unavailable_timeout, - 'partition_autobalancing_mode': 'continuous', - 'raft_learner_recovery_rate': 10 * 1024 * 1024 * 1024, - } | extra_cluster_props) + def setup_cluster(self, + segment_bytes: int, + retention_local_bytes: int, + extra_cluster_props: dict = {}): + self.redpanda.set_cluster_config( + { + 'partition_autobalancing_node_availability_timeout_sec': + self.unavailable_timeout, + 'partition_autobalancing_mode': 'continuous', + 'raft_learner_recovery_rate': 10 * 1024 * 1024 * 1024, + } | extra_cluster_props) topic_config = { # Use a tiny segment size so we can generate many cloud segments # very quickly. @@ -122,9 +128,13 @@ def load_many_segments(self): # Once some segments are generated, configure the topic to use more # realistic sizes. - retention_bytes = int(self.scaled_data_bps * 60 * 60 * 6 / self.scaled_num_partitions) - self.rpk.alter_topic_config(self.topic_name, 'segment.bytes', self.regular_segment_size) - self.rpk.alter_topic_config(self.topic_name, 'retention.local.target.bytes', retention_bytes) + retention_bytes = int(self.scaled_data_bps * 60 * 60 * 6 / + self.scaled_num_partitions) + self.rpk.alter_topic_config(self.topic_name, 'segment.bytes', + self.regular_segment_size) + self.rpk.alter_topic_config(self.topic_name, + 'retention.local.target.bytes', + retention_bytes) def get_node(self, idx: int): node = self.redpanda.nodes[idx] @@ -150,8 +160,8 @@ def test_restarts(self): try: producer.start() wait_until(lambda: producer.produce_status.acked > 10000, - timeout_sec=60, - backoff_sec=1.0) + timeout_sec=60, + backoff_sec=1.0) # Run a rolling restart. self.stage_rolling_restart() @@ -161,7 +171,13 @@ def test_restarts(self): # Stop a node, wait for enough time for movement to occur, then # restart. - self.stage_stop_wait_start() + self.stage_stop_wait_start(forced_stop=False, + downtime=self.unavailable_timeout) + + # Stop a node and wait for really long time to collect a lot + # of under-replicated msgs, then restart. + # This is not to be run nightly so disabled for now + #self.stage_stop_wait_start(forced_stop=False, downtime=60*30) # Block traffic to/from one node. self.stage_block_node_traffic() @@ -172,17 +188,17 @@ def test_restarts(self): self.free_preallocated_nodes() NOS3_LOG_ALLOW_LIST = [ - re.compile( - "s3 - .* - Accessing .*, unexpected REST API error "" detected, code: RequestTimeout" - ), + re.compile("s3 - .* - Accessing .*, unexpected REST API error " + " detected, code: RequestTimeout"), ] - # Stages for the "test_restarts" def stage_rolling_restart(self): self.logger.info(f"Rolling restarting nodes") - self.redpanda.rolling_restart_nodes(self.redpanda.nodes, start_timeout=600, stop_timeout=600) + self.redpanda.rolling_restart_nodes(self.redpanda.nodes, + start_timeout=600, + stop_timeout=600) def stage_block_node_traffic(self): node, node_id, node_str = self.get_node(0) @@ -200,15 +216,24 @@ def stage_block_node_traffic(self): pass wait_until(self.redpanda.healthy, timeout_sec=600, backoff_sec=1) - def stage_stop_wait_start(self): + def stage_stop_wait_start(self, forced_stop: bool, downtime: int): node, node_id, node_str = self.get_node(1) - self.logger.info(f"Hard stopping node {node_str}") - self.redpanda.stop_node(node, forced=True) - time.sleep(60) - self.logger.info(f"Restarting node {node_str}") - self.redpanda.start_node(node, timeout=600) - wait_until(self.redpanda.healthy, timeout_sec=600, backoff_sec=1) + self.logger.info( + f"Stopping node {node_str} {'ungracefully' if forced_stop else 'gracefully'}" + ) + self.redpanda.stop_node(node, + forced=forced_stop, + timeout=60 if forced_stop else 180) + + self.logger.info(f"Node downtime {downtime} s") + time.sleep(downtime) + restart_timeout = 300 + int(900 * downtime / 60) + self.logger.info(f"Restarting node {node_str} for {restart_timeout} s") + self.redpanda.start_node(node, timeout=600) + wait_until(self.redpanda.healthy, + timeout_sec=restart_timeout, + backoff_sec=1) @cluster(num_nodes=5, log_allow_list=NOS3_LOG_ALLOW_LIST) def test_disrupt_cloud_storage(self): @@ -219,7 +244,7 @@ def test_disrupt_cloud_storage(self): self.setup_cluster( # Segments should go into the cloud at a reasonable rate, # that's why it is smaller than it should be - segment_bytes = int(self.scaled_segment_size/2), + segment_bytes=int(self.scaled_segment_size / 2), retention_local_bytes=2 * self.scaled_segment_size, ) @@ -249,7 +274,8 @@ def _cloud_storage_no_new_errors(self, redpanda, logger=None): num_errors = redpanda.metric_sum( "redpanda_cloud_storage_errors_total", metrics_endpoint=MetricsEndpoint.PUBLIC_METRICS) - increase = (num_errors - self.last_num_errors) if self.last_num_errors>0 else 0 + increase = (num_errors - + self.last_num_errors) if self.last_num_errors > 0 else 0 self.last_num_errors = num_errors if logger: logger.info( @@ -266,14 +292,18 @@ def stage_block_s3(self): self.last_num_errors = 0 with firewall_blocked(self.redpanda.nodes, self.s3_port): # wait for the first cloud related failure + one minute - wait_until(lambda: not self._cloud_storage_no_new_errors(self.redpanda, self.logger), - timeout_sec=600, backoff_sec=10) + wait_until(lambda: not self._cloud_storage_no_new_errors( + self.redpanda, self.logger), + timeout_sec=600, + backoff_sec=10) time.sleep(60) # make sure nothing is crashed wait_until(self.redpanda.healthy, timeout_sec=60, backoff_sec=1) self.logger.info(f"Waiting for S3 errors to cease") - wait_until(lambda: self._cloud_storage_no_new_errors(self.redpanda, self.logger), - timeout_sec=600, backoff_sec=20) + wait_until(lambda: self._cloud_storage_no_new_errors( + self.redpanda, self.logger), + timeout_sec=600, + backoff_sec=20) @cluster(num_nodes=5, log_allow_list=RESTART_LOG_ALLOW_LIST) def test_decommission_and_add(self): @@ -312,7 +342,7 @@ def test_decommission_and_add(self): producer.wait(timeout_sec=600) finally: self.free_preallocated_nodes() - + def stage_decommission_and_add(self): node, node_id, node_str = self.get_node(1) @@ -327,7 +357,7 @@ def topic_partitions_on_node(): ]) self.logger.debug(f"Partitions in the node-topic: {n}") return n - + nt_partitions_before = topic_partitions_on_node() self.logger.info( @@ -358,8 +388,7 @@ def topic_partitions_on_node(): self.logger.info( f"Node added, new node_id: {new_node_id}, waiting for {int(nt_partitions_before/2)} partitions to move there in {int(decomm_time)} s" ) - wait_until( - lambda: topic_partitions_on_node() > nt_partitions_before / 2, - timeout_sec=max(60, decomm_time), - backoff_sec=2) + wait_until(lambda: topic_partitions_on_node() > nt_partitions_before / 2, + timeout_sec=max(60, decomm_time), + backoff_sec=2) self.logger.info(f"{topic_partitions_on_node()} partitions moved")