Skip to content

Commit

Permalink
tests: added tiered storage to random node operations test
Browse files Browse the repository at this point in the history
Signed-off-by: Michal Maslanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Sep 5, 2023
1 parent 025d283 commit 2827219
Showing 1 changed file with 90 additions and 29 deletions.
119 changes: 90 additions & 29 deletions tests/rptest/tests/random_node_operations_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import random
import threading
from rptest.clients.rpk import RpkTool
from rptest.services.admin import Admin
from rptest.tests.prealloc_nodes import PreallocNodesTest

Expand All @@ -19,7 +20,7 @@
from rptest.clients.types import TopicSpec
from rptest.clients.default import DefaultClient
from rptest.services.kgo_verifier_services import KgoVerifierConsumerGroupConsumer, KgoVerifierProducer
from rptest.services.redpanda import CHAOS_LOG_ALLOW_LIST, PREV_VERSION_LOG_ALLOW_LIST
from rptest.services.redpanda import CHAOS_LOG_ALLOW_LIST, PREV_VERSION_LOG_ALLOW_LIST, SISettings
from rptest.services.redpanda_installer import RedpandaInstaller
from rptest.utils.mode_checks import cleanup_on_early_exit, skip_debug_mode
from rptest.utils.node_operations import FailureInjectorBackgroundThread, NodeOpsExecutor, generate_random_workload
Expand Down Expand Up @@ -84,11 +85,6 @@ def _setup_test_scale(self, num_to_upgrade):
self.producer_timeout = 180
self.consumer_timeout = 180

if num_to_upgrade > 0:
# we can only enable controller snapshot if all nodes are the newest version
cleanup_on_early_exit(self)
return

if self.redpanda.dedicated_nodes:
# scale test setup
self.max_partitions = 32
Expand Down Expand Up @@ -122,8 +118,17 @@ def _setup_test_scale(self, num_to_upgrade):
f"running test with: [message_size {self.msg_size}, total_bytes: {self.total_data}, message_count: {self.msg_count}, rate_limit: {self.rate_limit}, cluster_operations: {self.node_operations}]"
)

def _start_redpanda(self, number_nodes_to_upgrade,
enable_controller_snapshots):
def _start_redpanda(self, number_nodes_to_upgrade, with_tiered_storage):

if with_tiered_storage:
si_settings = SISettings(self.test_context,
cloud_storage_enable_remote_read=True,
cloud_storage_enable_remote_write=True)
# since this test is deleting topics we must tolerate missing manifests
si_settings.set_expected_damage(
{"ntr_no_topic_manifest", "ntpr_no_manifest"})
self.redpanda.set_si_settings(si_settings)

self.redpanda.set_seed_servers(self.redpanda.nodes)
if number_nodes_to_upgrade > 0:
installer = self.redpanda._installer
Expand All @@ -140,34 +145,54 @@ def _start_redpanda(self, number_nodes_to_upgrade,
self.redpanda.start(auto_assign_node_id=True,
omit_seeds_on_idx_one=False)

def _alter_local_topic_retention_bytes(self, topic, retention_bytes):
rpk = RpkTool(self.redpanda)

def alter_and_verify():
try:
rpk.alter_topic_config(
topic, TopicSpec.PROPERTY_RETENTION_LOCAL_TARGET_BYTES,
retention_bytes)

cfgs = rpk.describe_topic_configs(topic)
retention = int(
cfgs[TopicSpec.PROPERTY_RETENTION_LOCAL_TARGET_BYTES][0])
return retention == retention_bytes
except:
return False

wait_until(alter_and_verify, 15, 0.5)

class producer_consumer:
def __init__(self,
test_context,
logger,
topic_spec,
topic_name,
redpanda,
nodes,
msg_size,
rate_limit_bps,
msg_count,
consumers_count,
compaction_enabled=False,
key_set_cardinality=None):
self.test_context = test_context
self.logger = logger
self.topic = topic_spec
self.topic = topic_name
self.redpanda = redpanda
self.nodes = nodes
self.msg_size = msg_size
self.rate_limit_bps = rate_limit_bps
self.msg_count = msg_count
self.consumer_count = consumers_count
self.compaction_enabled = compaction_enabled
self.key_set_cardinality = key_set_cardinality

def _start_producer(self):
self.producer = KgoVerifierProducer(
self.test_context,
self.redpanda,
self.topic.name,
self.topic,
self.msg_size,
self.msg_count,
custom_node=self.nodes,
Expand All @@ -180,20 +205,21 @@ def _start_producer(self):
timeout_sec=120,
backoff_sec=1)

def _start_consumer(self):
def _start_consumer(self, with_logs=False):

self.consumer = KgoVerifierConsumerGroupConsumer(
self.test_context,
self.redpanda,
self.topic.name,
self.topic,
self.msg_size,
readers=self.consumer_count,
nodes=self.nodes)
nodes=self.nodes,
debug_logs=with_logs,
trace_logs=with_logs)

self.consumer.start(clean=False)

def start(self):
DefaultClient(self.redpanda).create_topic(self.topic)
self.logger.info(
f"starting workload: topic: {self.topic}, with [rate_limit: {self.rate_limit_bps}, message size: {self.msg_size}, message count: {self.msg_count}]"
)
Expand All @@ -207,6 +233,7 @@ def verify(self):
)
self.producer.wait()

assert self.producer.produce_status.bad_offsets == 0
# Await the consumer that is reading only the subset of data that
# was written before it started.
self.consumer.wait()
Expand All @@ -215,9 +242,9 @@ def verify(self):
del self.consumer

# Start a new consumer to read all data written
self._start_consumer()
self._start_consumer(with_logs=True)
self.consumer.wait()
if self.topic.cleanup_policy != TopicSpec.CLEANUP_COMPACT:
if not self.compaction_enabled:
assert self.consumer.consumer_status.validator.valid_reads >= self.producer.produce_status.acked, f"Missing messages from topic: {self.topic}. valid reads: {self.consumer.consumer_status.validator.valid_reads}, acked messages: {self.producer.produce_status.acked}"

assert self.consumer.consumer_status.validator.invalid_reads == 0, f"Invalid reads in topic: {self.topic}, invalid reads count: {self.consumer.consumer_status.validator.invalid_reads}"
Expand All @@ -226,12 +253,20 @@ def verify(self):
@cluster(num_nodes=7,
log_allow_list=CHAOS_LOG_ALLOW_LIST + PREV_VERSION_LOG_ALLOW_LIST)
@matrix(enable_failures=[False, True],
num_to_upgrade=[0],
enable_controller_snapshots=[True, False])
num_to_upgrade=[0, 3],
enable_controller_snapshots=[True, False],
with_tiered_storage=[True, False])
def test_node_operations(self, enable_failures, num_to_upgrade,
enable_controller_snapshots):
enable_controller_snapshots, with_tiered_storage):

lock = threading.Lock()
default_segment_size = 1024 * 1024

# do not test controller snapshots with older versions
# as they do not support it
if num_to_upgrade > 0 and enable_controller_snapshots:
cleanup_on_early_exit(self)
return

# setup test case scale parameters
self._setup_test_scale(num_to_upgrade)
Expand All @@ -246,7 +281,8 @@ def test_node_operations(self, enable_failures, num_to_upgrade,
return

# start redpanda process
self._start_redpanda(num_to_upgrade, enable_controller_snapshots)
self._start_redpanda(num_to_upgrade,
with_tiered_storage=with_tiered_storage)

admin = Admin(self.redpanda)
if enable_controller_snapshots:
Expand All @@ -257,33 +293,50 @@ def test_node_operations(self, enable_failures, num_to_upgrade,

# create some initial topics
self._create_topics(10)
regular_topic = TopicSpec(partition_count=self.max_partitions,
replication_factor=3,
cleanup_policy=TopicSpec.CLEANUP_DELETE,
segment_bytes=default_segment_size,
redpanda_remote_read=with_tiered_storage,
redpanda_remote_write=with_tiered_storage)
DefaultClient(self.redpanda).create_topic(regular_topic)

if with_tiered_storage:
# change local retention policy to make some local segments will be deleted during the test
self._alter_local_topic_retention_bytes(regular_topic.name,
3 * default_segment_size)

regular_producer_consumer = RandomNodeOperationsTest.producer_consumer(
test_context=self.test_context,
logger=self.logger,
topic_spec=TopicSpec(partition_count=self.max_partitions,
replication_factor=3,
cleanup_policy=TopicSpec.CLEANUP_DELETE),
topic_name=regular_topic.name,
redpanda=self.redpanda,
nodes=[self.preallocated_nodes[0]],
msg_size=self.msg_size,
rate_limit_bps=self.rate_limit,
msg_count=self.msg_count,
consumers_count=self.consumers_count)
consumers_count=self.consumers_count,
compaction_enabled=False)

compacted_topic = TopicSpec(partition_count=self.max_partitions,
cleanup_policy=TopicSpec.CLEANUP_COMPACT,
segment_bytes=default_segment_size,
redpanda_remote_read=with_tiered_storage,
redpanda_remote_write=with_tiered_storage)
DefaultClient(self.redpanda).create_topic(compacted_topic)

compacted_producer_consumer = RandomNodeOperationsTest.producer_consumer(
test_context=self.test_context,
logger=self.logger,
topic_spec=TopicSpec(partition_count=self.max_partitions,
cleanup_policy=TopicSpec.CLEANUP_COMPACT,
segment_bytes=1 * 1024 * 1024),
topic_name=compacted_topic.name,
redpanda=self.redpanda,
nodes=[self.preallocated_nodes[1]],
msg_size=self.msg_size,
rate_limit_bps=self.rate_limit,
msg_count=self.msg_count,
consumers_count=self.consumers_count,
key_set_cardinality=500)
key_set_cardinality=500,
compaction_enabled=True)

regular_producer_consumer.start()
compacted_producer_consumer.start()
Expand Down Expand Up @@ -334,3 +387,11 @@ def test_node_operations(self, enable_failures, num_to_upgrade,
self.logger.info(
f"Read {len(controller_records)} controller records from node {node.name} successfully"
)

# start nodes back up to enable tiered storage scrubbing
if with_tiered_storage:
for n in self.redpanda.nodes:
self.redpanda.set_seed_servers(self.redpanda.nodes[0:3])
self.redpanda.restart_nodes(n,
auto_assign_node_id=True,
omit_seeds_on_idx_one=False)

0 comments on commit 2827219

Please sign in to comment.