diff --git a/tests/rptest/tests/random_node_operations_test.py b/tests/rptest/tests/random_node_operations_test.py index 17f5e062b8265..8b7327e590ddd 100644 --- a/tests/rptest/tests/random_node_operations_test.py +++ b/tests/rptest/tests/random_node_operations_test.py @@ -9,6 +9,7 @@ import random import threading +from rptest.clients.rpk import RpkTool from rptest.services.admin import Admin from rptest.tests.prealloc_nodes import PreallocNodesTest @@ -19,7 +20,7 @@ from rptest.clients.types import TopicSpec from rptest.clients.default import DefaultClient from rptest.services.kgo_verifier_services import KgoVerifierConsumerGroupConsumer, KgoVerifierProducer -from rptest.services.redpanda import CHAOS_LOG_ALLOW_LIST, PREV_VERSION_LOG_ALLOW_LIST +from rptest.services.redpanda import CHAOS_LOG_ALLOW_LIST, PREV_VERSION_LOG_ALLOW_LIST, SISettings from rptest.services.redpanda_installer import RedpandaInstaller from rptest.utils.mode_checks import cleanup_on_early_exit, skip_debug_mode from rptest.utils.node_operations import FailureInjectorBackgroundThread, NodeOpsExecutor, generate_random_workload @@ -84,11 +85,6 @@ def _setup_test_scale(self, num_to_upgrade): self.producer_timeout = 180 self.consumer_timeout = 180 - if num_to_upgrade > 0: - # we can only enable controller snapshot if all nodes are the newest version - cleanup_on_early_exit(self) - return - if self.redpanda.dedicated_nodes: # scale test setup self.max_partitions = 32 @@ -122,8 +118,17 @@ def _setup_test_scale(self, num_to_upgrade): f"running test with: [message_size {self.msg_size}, total_bytes: {self.total_data}, message_count: {self.msg_count}, rate_limit: {self.rate_limit}, cluster_operations: {self.node_operations}]" ) - def _start_redpanda(self, number_nodes_to_upgrade, - enable_controller_snapshots): + def _start_redpanda(self, number_nodes_to_upgrade, with_tiered_storage): + + if with_tiered_storage: + si_settings = SISettings(self.test_context, + cloud_storage_enable_remote_read=True, + cloud_storage_enable_remote_write=True) + # since this test is deleting topics we must tolerate missing manifests + si_settings.set_expected_damage( + {"ntr_no_topic_manifest", "ntpr_no_manifest"}) + self.redpanda.set_si_settings(si_settings) + self.redpanda.set_seed_servers(self.redpanda.nodes) if number_nodes_to_upgrade > 0: installer = self.redpanda._installer @@ -140,34 +145,54 @@ def _start_redpanda(self, number_nodes_to_upgrade, self.redpanda.start(auto_assign_node_id=True, omit_seeds_on_idx_one=False) + def _alter_local_topic_retention_bytes(self, topic, retention_bytes): + rpk = RpkTool(self.redpanda) + + def alter_and_verify(): + try: + rpk.alter_topic_config( + topic, TopicSpec.PROPERTY_RETENTION_LOCAL_TARGET_BYTES, + retention_bytes) + + cfgs = rpk.describe_topic_configs(topic) + retention = int( + cfgs[TopicSpec.PROPERTY_RETENTION_LOCAL_TARGET_BYTES][0]) + return retention == retention_bytes + except: + return False + + wait_until(alter_and_verify, 15, 0.5) + class producer_consumer: def __init__(self, test_context, logger, - topic_spec, + topic_name, redpanda, nodes, msg_size, rate_limit_bps, msg_count, consumers_count, + compaction_enabled=False, key_set_cardinality=None): self.test_context = test_context self.logger = logger - self.topic = topic_spec + self.topic = topic_name self.redpanda = redpanda self.nodes = nodes self.msg_size = msg_size self.rate_limit_bps = rate_limit_bps self.msg_count = msg_count self.consumer_count = consumers_count + self.compaction_enabled = compaction_enabled self.key_set_cardinality = key_set_cardinality def _start_producer(self): self.producer = KgoVerifierProducer( self.test_context, self.redpanda, - self.topic.name, + self.topic, self.msg_size, self.msg_count, custom_node=self.nodes, @@ -180,20 +205,21 @@ def _start_producer(self): timeout_sec=120, backoff_sec=1) - def _start_consumer(self): + def _start_consumer(self, with_logs=False): self.consumer = KgoVerifierConsumerGroupConsumer( self.test_context, self.redpanda, - self.topic.name, + self.topic, self.msg_size, readers=self.consumer_count, - nodes=self.nodes) + nodes=self.nodes, + debug_logs=with_logs, + trace_logs=with_logs) self.consumer.start(clean=False) def start(self): - DefaultClient(self.redpanda).create_topic(self.topic) self.logger.info( f"starting workload: topic: {self.topic}, with [rate_limit: {self.rate_limit_bps}, message size: {self.msg_size}, message count: {self.msg_count}]" ) @@ -207,6 +233,7 @@ def verify(self): ) self.producer.wait() + assert self.producer.produce_status.bad_offsets == 0 # Await the consumer that is reading only the subset of data that # was written before it started. self.consumer.wait() @@ -215,9 +242,9 @@ def verify(self): del self.consumer # Start a new consumer to read all data written - self._start_consumer() + self._start_consumer(with_logs=True) self.consumer.wait() - if self.topic.cleanup_policy != TopicSpec.CLEANUP_COMPACT: + if not self.compaction_enabled: assert self.consumer.consumer_status.validator.valid_reads >= self.producer.produce_status.acked, f"Missing messages from topic: {self.topic}. valid reads: {self.consumer.consumer_status.validator.valid_reads}, acked messages: {self.producer.produce_status.acked}" assert self.consumer.consumer_status.validator.invalid_reads == 0, f"Invalid reads in topic: {self.topic}, invalid reads count: {self.consumer.consumer_status.validator.invalid_reads}" @@ -226,12 +253,20 @@ def verify(self): @cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST + PREV_VERSION_LOG_ALLOW_LIST) @matrix(enable_failures=[False, True], - num_to_upgrade=[0], - enable_controller_snapshots=[True, False]) + num_to_upgrade=[0, 3], + enable_controller_snapshots=[True, False], + with_tiered_storage=[True, False]) def test_node_operations(self, enable_failures, num_to_upgrade, - enable_controller_snapshots): + enable_controller_snapshots, with_tiered_storage): lock = threading.Lock() + default_segment_size = 1024 * 1024 + + # do not test controller snapshots with older versions + # as they do not support it + if num_to_upgrade > 0 and enable_controller_snapshots: + cleanup_on_early_exit(self) + return # setup test case scale parameters self._setup_test_scale(num_to_upgrade) @@ -246,7 +281,8 @@ def test_node_operations(self, enable_failures, num_to_upgrade, return # start redpanda process - self._start_redpanda(num_to_upgrade, enable_controller_snapshots) + self._start_redpanda(num_to_upgrade, + with_tiered_storage=with_tiered_storage) admin = Admin(self.redpanda) if enable_controller_snapshots: @@ -257,33 +293,50 @@ def test_node_operations(self, enable_failures, num_to_upgrade, # create some initial topics self._create_topics(10) + regular_topic = TopicSpec(partition_count=self.max_partitions, + replication_factor=3, + cleanup_policy=TopicSpec.CLEANUP_DELETE, + segment_bytes=default_segment_size, + redpanda_remote_read=with_tiered_storage, + redpanda_remote_write=with_tiered_storage) + DefaultClient(self.redpanda).create_topic(regular_topic) + + if with_tiered_storage: + # change local retention policy to make some local segments will be deleted during the test + self._alter_local_topic_retention_bytes(regular_topic.name, + 3 * default_segment_size) regular_producer_consumer = RandomNodeOperationsTest.producer_consumer( test_context=self.test_context, logger=self.logger, - topic_spec=TopicSpec(partition_count=self.max_partitions, - replication_factor=3, - cleanup_policy=TopicSpec.CLEANUP_DELETE), + topic_name=regular_topic.name, redpanda=self.redpanda, nodes=[self.preallocated_nodes[0]], msg_size=self.msg_size, rate_limit_bps=self.rate_limit, msg_count=self.msg_count, - consumers_count=self.consumers_count) + consumers_count=self.consumers_count, + compaction_enabled=False) + + compacted_topic = TopicSpec(partition_count=self.max_partitions, + cleanup_policy=TopicSpec.CLEANUP_COMPACT, + segment_bytes=default_segment_size, + redpanda_remote_read=with_tiered_storage, + redpanda_remote_write=with_tiered_storage) + DefaultClient(self.redpanda).create_topic(compacted_topic) compacted_producer_consumer = RandomNodeOperationsTest.producer_consumer( test_context=self.test_context, logger=self.logger, - topic_spec=TopicSpec(partition_count=self.max_partitions, - cleanup_policy=TopicSpec.CLEANUP_COMPACT, - segment_bytes=1 * 1024 * 1024), + topic_name=compacted_topic.name, redpanda=self.redpanda, nodes=[self.preallocated_nodes[1]], msg_size=self.msg_size, rate_limit_bps=self.rate_limit, msg_count=self.msg_count, consumers_count=self.consumers_count, - key_set_cardinality=500) + key_set_cardinality=500, + compaction_enabled=True) regular_producer_consumer.start() compacted_producer_consumer.start() @@ -334,3 +387,11 @@ def test_node_operations(self, enable_failures, num_to_upgrade, self.logger.info( f"Read {len(controller_records)} controller records from node {node.name} successfully" ) + + # start nodes back up to enable tiered storage scrubbing + if with_tiered_storage: + for n in self.redpanda.nodes: + self.redpanda.set_seed_servers(self.redpanda.nodes[0:3]) + self.redpanda.restart_nodes(n, + auto_assign_node_id=True, + omit_seeds_on_idx_one=False)