Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: ability to dynamically parametrize cloud_storage_type #9283

Merged
merged 6 commits into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions tests/rptest/scale_tests/cloud_storage_compaction_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
from rptest.clients.rpk import RpkTool, RpkException
from rptest.clients.types import TopicSpec
from rptest.services.cluster import cluster
from rptest.services.redpanda import CloudStorageType, RedpandaService, MetricsEndpoint, SISettings
from rptest.services.redpanda import CloudStorageType, RedpandaService, MetricsEndpoint, SISettings, get_cloud_storage_type
from rptest.tests.end_to_end import EndToEndTest
from rptest.util import wait_until
from ducktape.mark import matrix

LOCAL_CONFIGURATION = {
"partition_amount": 3,
Expand Down Expand Up @@ -170,7 +171,8 @@ def _setup_read_replica(self):
"Cannot validate Kafka record batch. Missmatching CRC",
"batch has invalid CRC"
])
@parametrize(cloud_storage_type=CloudStorageType.AUTO)
@matrix(
cloud_storage_type=get_cloud_storage_type(docker_use_arbitrary=True))
def test_read_from_replica(self, cloud_storage_type):
self.start_workload()
self.start_consumer(num_nodes=2,
Expand Down
7 changes: 4 additions & 3 deletions tests/rptest/scale_tests/extreme_recovery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
from time import time
from typing import Callable, Sequence

from ducktape.mark import ignore, parametrize
from ducktape.mark import ignore, matrix
from ducktape.tests.test import TestContext
from rptest.archival.s3_client import S3Client
from rptest.clients.kafka_cli_tools import KafkaCliTools
from rptest.clients.rpk import RpkTool
from rptest.clients.types import TopicSpec
from rptest.services.redpanda import CloudStorageType
from rptest.services.redpanda import CloudStorageType, get_cloud_storage_type
from rptest.services.cluster import cluster
from rptest.services.franz_go_verifiable_services import \
FranzGoVerifiableProducer, \
Expand Down Expand Up @@ -171,7 +171,8 @@ def tearDown(self):
super(ExtremeRecoveryTest, self).tearDown()

@cluster(num_nodes=8, log_allow_list=TRANSIENT_ERRORS)
@parametrize(cloud_storage_type=CloudStorageType.AUTO)
@matrix(
cloud_storage_type=get_cloud_storage_type(docker_use_arbitrary=True))
def test_recovery_scale(self, cloud_storage_type):
# This test requires dedicated system resources
assert self.redpanda.dedicated_nodes
Expand Down
14 changes: 8 additions & 6 deletions tests/rptest/scale_tests/franz_go_verifiable_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import os

from ducktape.utils.util import wait_until
from ducktape.mark import parametrize
from ducktape.mark import matrix

from rptest.clients.rpk import RpkTool
from rptest.clients.types import TopicSpec
Expand All @@ -21,7 +21,7 @@
KgoVerifierRandomConsumer,
KgoVerifierConsumerGroupConsumer,
)
from rptest.services.redpanda import CloudStorageType, SISettings, RESTART_LOG_ALLOW_LIST
from rptest.services.redpanda import CloudStorageType, SISettings, RESTART_LOG_ALLOW_LIST, get_cloud_storage_type
from rptest.tests.prealloc_nodes import PreallocNodesTest
from rptest.utils.mode_checks import skip_debug_mode

Expand Down Expand Up @@ -226,12 +226,14 @@ def with_timeboxed(self):

class KgoVerifierWithSiTestLargeSegments(KgoVerifierWithSiTest):
@cluster(num_nodes=4, log_allow_list=KGO_LOG_ALLOW_LIST)
@parametrize(cloud_storage_type=CloudStorageType.AUTO)
@matrix(
cloud_storage_type=get_cloud_storage_type(docker_use_arbitrary=True))
def test_si_without_timeboxed(self, cloud_storage_type):
self.without_timeboxed()

@cluster(num_nodes=4, log_allow_list=KGO_RESTART_LOG_ALLOW_LIST)
@parametrize(cloud_storage_type=CloudStorageType.AUTO)
@matrix(
cloud_storage_type=get_cloud_storage_type(docker_use_arbitrary=True))
def test_si_with_timeboxed(self, cloud_storage_type):
self.with_timeboxed()

Expand All @@ -240,11 +242,11 @@ class KgoVerifierWithSiTestSmallSegments(KgoVerifierWithSiTest):
segment_size = 20 * 2**20

@cluster(num_nodes=4, log_allow_list=KGO_LOG_ALLOW_LIST)
@parametrize(cloud_storage_type=CloudStorageType.AUTO)
@matrix(cloud_storage_type=get_cloud_storage_type())
def test_si_without_timeboxed(self, cloud_storage_type):
self.without_timeboxed()

@cluster(num_nodes=4, log_allow_list=KGO_RESTART_LOG_ALLOW_LIST)
@parametrize(cloud_storage_type=CloudStorageType.AUTO)
@matrix(cloud_storage_type=get_cloud_storage_type())
def test_si_with_timeboxed(self, cloud_storage_type):
self.with_timeboxed()
83 changes: 42 additions & 41 deletions tests/rptest/services/redpanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,6 @@ class CloudStorageType(IntEnum):
S3 = 1
# Use Azure ABS on dedicated nodes, or azurite in docker
ABS = 2
# Auto-select the cloud's storage service on dedicated nodes, or use minio+S3 on docker
AUTO = 3


def one_or_many(value):
Expand All @@ -169,6 +167,46 @@ def one_or_many(value):
return value


def get_cloud_storage_type(applies_only_on: list(CloudStorageType) = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the type hint doesn't match. Should be Optional[..]

docker_use_arbitrary=False):
"""
Returns a list(CloudStorageType) based on the "CLOUD_PROVIDER"
environment variable. For example:
CLOUD_PROVIDER=docker => returns: [CloudStorageType.S3, CloudStorageType.ABS]
CLOUD_PROVIDER=aws => returns: [CloudStorageType.S3]

:env "CLOUD_PROVIDER": one of "aws", "gcp", "azure" or "docker"
:param applies_only_on: optional list(CloudStorageType)
that is the allow-list of the cloud storage type for a
test.
If it's set the function will return the inresection
of:
* <cloud_storage_type>: discovered based on the CLOUD_PROVIDER env
* <applies_only_on>: param provided
:param docker_use_arbitrary: optional bool to use arbitrary backend when
the cloud provider is docker.
"""

if applies_only_on is None:
applies_only_on = []

cloud_provider = os.getenv("CLOUD_PROVIDER", "docker")
if cloud_provider == "docker":
if docker_use_arbitrary:
cloud_storage_type = [CloudStorageType.S3]
else:
cloud_storage_type = [CloudStorageType.S3, CloudStorageType.ABS]
elif cloud_provider in ("aws", "gcp"):
cloud_storage_type = [CloudStorageType.S3]
elif cloud_provider == "azure":
cloud_storage_type = [CloudStorageType.ABS]

if applies_only_on:
cloud_storage_type = list(
set(applies_only_on).intersection(cloud_storage_type))
gousteris marked this conversation as resolved.
Show resolved Hide resolved
return cloud_storage_type
gousteris marked this conversation as resolved.
Show resolved Hide resolved


class ResourceSettings:
"""
Control CPU+memory footprint of Redpanda instances. Pass one
Expand Down Expand Up @@ -279,8 +317,6 @@ class SISettings:
GLOBAL_ABS_SHARED_KEY = "abs_shared_key"
GLOBAL_CLOUD_PROVIDER = "cloud_provider"

DEDICATED_NODE_KEY = "dedicated_nodes"

# The account and key to use with local Azurite testing.
# These are the default Azurite (Azure emulator) storage account and shared key.
# Both are readily available in the docs.
Expand Down Expand Up @@ -316,30 +352,13 @@ def __init__(self,
quickly when they wait for uploads to complete.
"""

self.cloud_storage_type = CloudStorageType.AUTO
self.cloud_storage_type = get_cloud_storage_type()[0]
if hasattr(test_context, 'injected_args') \
and test_context.injected_args is not None \
and 'cloud_storage_type' in test_context.injected_args:
self.cloud_storage_type = test_context.injected_args[
'cloud_storage_type']

if self.cloud_storage_type == CloudStorageType.AUTO:
dedicated_nodes = test_context.globals.get(self.DEDICATED_NODE_KEY,
False)
if dedicated_nodes:
abs_shared_key = test_context.globals.get(
self.GLOBAL_ABS_SHARED_KEY, None)
s3_region = test_context.globals.get(self.GLOBAL_S3_REGION_KEY,
None)
if abs_shared_key is not None:
self.cloud_storage_type = CloudStorageType.ABS
elif s3_region is not None:
self.cloud_storage_type = CloudStorageType.S3
else:
raise RuntimeError("Cannot autodetect cloud storage")
else:
self.cloud_storage_type = CloudStorageType.S3

if self.cloud_storage_type == CloudStorageType.S3:
self.cloud_storage_credentials_source = cloud_storage_credentials_source
self.cloud_storage_access_key = cloud_storage_access_key
Expand Down Expand Up @@ -398,14 +417,6 @@ def _load_abs_context(self, logger, test_context):
self.endpoint_url = None
self.cloud_storage_disable_tls = False
self.cloud_storage_api_endpoint_port = 443
if test_context.globals.get(self.GLOBAL_S3_SECRET_KEY, None):
test_context.ok_to_fail = True

msg = (
"Test requested ABS cloud storage, but provided globals for Azure."
" Stopping and marking as OFAIL.")
logger.info(msg)
raise Exception(msg)
else:
logger.debug("Running in Dockerised env against Azurite. "
"Using Azurite defualt credentials.")
Expand Down Expand Up @@ -445,17 +456,7 @@ def _load_s3_context(self, logger, test_context):
self.cloud_storage_region = cloud_storage_region
self.cloud_storage_api_endpoint_port = 443
else:
if test_context.globals.get(self.GLOBAL_ABS_SHARED_KEY, None):
test_context.ok_to_fail = True

msg = (
"Test requested S3 cloud storage, but provided globals for Azure."
" Stopping and marking as OFAIL.")
logger.info(msg)
raise Exception(msg)
else:
logger.info(
'No AWS credentials supplied, assuming minio defaults')
logger.info('No AWS credentials supplied, assuming minio defaults')

@property
def cloud_storage_bucket(self):
Expand Down
5 changes: 2 additions & 3 deletions tests/rptest/tests/adjacent_segment_merging_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from rptest.services.cluster import cluster
from rptest.tests.redpanda_test import RedpandaTest
from rptest.services.redpanda import CloudStorageType, SISettings
from rptest.services.redpanda import CloudStorageType, SISettings, get_cloud_storage_type

from rptest.clients.types import TopicSpec
from rptest.clients.rpk import RpkTool
Expand Down Expand Up @@ -70,8 +70,7 @@ def tearDown(self):
super().tearDown()

@cluster(num_nodes=3)
@matrix(acks=[-1, 0, 1],
cloud_storage_type=[CloudStorageType.ABS, CloudStorageType.S3])
@matrix(acks=[-1, 0, 1], cloud_storage_type=get_cloud_storage_type())
def test_reupload_of_local_segments(self, acks, cloud_storage_type):
"""Test adjacent segment merging using using local data.
The test starts by uploading large number of very small segments.
Expand Down
32 changes: 11 additions & 21 deletions tests/rptest/tests/archival_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from rptest.clients.kafka_cat import KafkaCat
from rptest.services.cluster import cluster
from rptest.tests.redpanda_test import RedpandaTest
from rptest.services.redpanda import CloudStorageType, RedpandaService, SISettings
from rptest.services.redpanda import CloudStorageType, RedpandaService, SISettings, get_cloud_storage_type

from rptest.clients.types import TopicSpec
from rptest.clients.rpk import RpkTool
Expand Down Expand Up @@ -182,17 +182,15 @@ def tearDown(self):
super().tearDown()

@cluster(num_nodes=3)
@parametrize(cloud_storage_type=CloudStorageType.ABS)
@parametrize(cloud_storage_type=CloudStorageType.S3)
@matrix(cloud_storage_type=get_cloud_storage_type())
def test_write(self, cloud_storage_type):
"""Simpe smoke test, write data to redpanda and check if the
data hit the S3 storage bucket"""
self.kafka_tools.produce(self.topic, 10000, 1024)
validate(self._quick_verify, self.logger, 90)

@cluster(num_nodes=3, log_allow_list=CONNECTION_ERROR_LOGS)
@parametrize(cloud_storage_type=CloudStorageType.ABS)
@parametrize(cloud_storage_type=CloudStorageType.S3)
@matrix(cloud_storage_type=get_cloud_storage_type())
def test_isolate(self, cloud_storage_type):
"""Verify that our isolate/rejoin facilities actually work"""

Expand Down Expand Up @@ -245,8 +243,7 @@ def data_uploaded():
err_msg="Data not uploaded after firewall unblocked")

@cluster(num_nodes=3, log_allow_list=CONNECTION_ERROR_LOGS)
@parametrize(cloud_storage_type=CloudStorageType.ABS)
@parametrize(cloud_storage_type=CloudStorageType.S3)
@matrix(cloud_storage_type=get_cloud_storage_type())
def test_reconnect(self, cloud_storage_type):
"""Disconnect redpanda from S3, write data, connect redpanda to S3
and check that the data is uploaded"""
Expand All @@ -259,8 +256,7 @@ def test_reconnect(self, cloud_storage_type):
validate(self._quick_verify, self.logger, 90)

@cluster(num_nodes=3, log_allow_list=CONNECTION_ERROR_LOGS)
@parametrize(cloud_storage_type=CloudStorageType.ABS)
@parametrize(cloud_storage_type=CloudStorageType.S3)
@matrix(cloud_storage_type=get_cloud_storage_type())
def test_one_node_reconnect(self, cloud_storage_type):
"""Disconnect one redpanda node from S3, write data, connect redpanda to S3
and check that the data is uploaded"""
Expand All @@ -275,8 +271,7 @@ def test_one_node_reconnect(self, cloud_storage_type):
validate(self._quick_verify, self.logger, 90)

@cluster(num_nodes=3, log_allow_list=CONNECTION_ERROR_LOGS)
@parametrize(cloud_storage_type=CloudStorageType.ABS)
@parametrize(cloud_storage_type=CloudStorageType.S3)
@matrix(cloud_storage_type=get_cloud_storage_type())
def test_connection_drop(self, cloud_storage_type):
"""Disconnect redpanda from S3 during the active upload, restore connection
and check that everything is uploaded"""
Expand All @@ -289,8 +284,7 @@ def test_connection_drop(self, cloud_storage_type):
validate(self._quick_verify, self.logger, 90)

@cluster(num_nodes=3, log_allow_list=CONNECTION_ERROR_LOGS)
@parametrize(cloud_storage_type=CloudStorageType.ABS)
@parametrize(cloud_storage_type=CloudStorageType.S3)
@matrix(cloud_storage_type=get_cloud_storage_type())
def test_connection_flicker(self, cloud_storage_type):
"""Disconnect redpanda from S3 during the active upload for short period of time
during upload and check that everything is uploaded"""
Expand All @@ -308,8 +302,7 @@ def test_connection_flicker(self, cloud_storage_type):
validate(self._quick_verify, self.logger, 90)

@cluster(num_nodes=3)
@parametrize(cloud_storage_type=CloudStorageType.ABS)
@parametrize(cloud_storage_type=CloudStorageType.S3)
@matrix(cloud_storage_type=get_cloud_storage_type())
def test_single_partition_leadership_transfer(self, cloud_storage_type):
"""Start uploading data, restart leader node of the partition 0 to trigger the
leadership transfer, continue upload, verify S3 bucket content"""
Expand All @@ -325,8 +318,7 @@ def test_single_partition_leadership_transfer(self, cloud_storage_type):
validate(self._cross_node_verify, self.logger, 90)

@cluster(num_nodes=3)
@parametrize(cloud_storage_type=CloudStorageType.ABS)
@parametrize(cloud_storage_type=CloudStorageType.S3)
@matrix(cloud_storage_type=get_cloud_storage_type())
def test_all_partitions_leadership_transfer(self, cloud_storage_type):
"""Start uploading data, restart leader nodes of all partitions to trigger the
leadership transfer, continue upload, verify S3 bucket content"""
Expand All @@ -343,8 +335,7 @@ def test_all_partitions_leadership_transfer(self, cloud_storage_type):
validate(self._cross_node_verify, self.logger, 90)

@cluster(num_nodes=3)
@matrix(acks=[-1, 0, 1],
cloud_storage_type=[CloudStorageType.ABS, CloudStorageType.S3])
@matrix(acks=[-1, 0, 1], cloud_storage_type=get_cloud_storage_type())
def test_timeboxed_uploads(self, acks, cloud_storage_type):
"""This test checks segment upload time limit. The feature is enabled in the
configuration. The configuration defines maximum time interval between uploads.
Expand Down Expand Up @@ -422,8 +413,7 @@ def check_upload():
validate(check_upload, self.logger, 90)

@cluster(num_nodes=3, log_allow_list=CONNECTION_ERROR_LOGS)
@matrix(acks=[1, -1],
cloud_storage_type=[CloudStorageType.ABS, CloudStorageType.S3])
@matrix(acks=[1, -1], cloud_storage_type=get_cloud_storage_type())
def test_retention_archival_coordination(self, acks, cloud_storage_type):
"""
Test that only archived segments can be evicted and that eviction
Expand Down
9 changes: 4 additions & 5 deletions tests/rptest/tests/cloud_retention_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from rptest.tests.redpanda_test import RedpandaTest
from rptest.clients.types import TopicSpec
from rptest.clients.rpk import RpkTool
from rptest.services.redpanda import CloudStorageType, SISettings, MetricsEndpoint, CloudStorageType, CHAOS_LOG_ALLOW_LIST
from rptest.services.redpanda import CloudStorageType, SISettings, MetricsEndpoint, CloudStorageType, CHAOS_LOG_ALLOW_LIST, get_cloud_storage_type
from rptest.services.kgo_verifier_services import (
KgoVerifierConsumerGroupConsumer, KgoVerifierProducer)
from rptest.utils.mode_checks import skip_debug_mode
Expand All @@ -40,7 +40,7 @@ def setUp(self):

@cluster(num_nodes=4)
@matrix(max_consume_rate_mb=[20, None],
cloud_storage_type=[CloudStorageType.ABS, CloudStorageType.S3])
cloud_storage_type=get_cloud_storage_type())
@skip_debug_mode
def test_cloud_retention(self, max_consume_rate_mb, cloud_storage_type):
"""
Expand Down Expand Up @@ -156,7 +156,7 @@ def check_bucket_size():

@cluster(num_nodes=4)
@skip_debug_mode
@matrix(cloud_storage_type=[CloudStorageType.ABS, CloudStorageType.S3])
@matrix(cloud_storage_type=get_cloud_storage_type())
def test_gc_entire_manifest(self, cloud_storage_type):
"""
Regression test for #8945, where GCing all cloud segments could prevent
Expand Down Expand Up @@ -279,8 +279,7 @@ def __init__(self, test_context):

@cluster(num_nodes=4, log_allow_list=CHAOS_LOG_ALLOW_LIST)
@skip_debug_mode
@parametrize(cloud_storage_type=CloudStorageType.ABS)
@parametrize(cloud_storage_type=CloudStorageType.S3)
@matrix(cloud_storage_type=get_cloud_storage_type())
def test_retention_with_node_failures(self, cloud_storage_type):
max_overshoot_percentage = 100
runtime = 120
Expand Down
Loading