Skip to content

Commit

Permalink
Merge pull request redpanda-data#12936 from VladLazar/hns-followups
Browse files Browse the repository at this point in the history
cloud_storage: follow-ups and test fixes for Azure HNS
  • Loading branch information
piyushredpanda authored Aug 30, 2023
2 parents 66eef6f + 5232066 commit 5db425f
Show file tree
Hide file tree
Showing 19 changed files with 69 additions and 82 deletions.
7 changes: 6 additions & 1 deletion src/v/cloud_storage_clients/client_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,15 @@ ss::future<> client_pool::stop() {
// Wait until all leased objects are returned
co_await _gate.close();

std::vector<ss::future<>> stops;
stops.reserve(_pool.size());

for (auto& it : _pool) {
co_await it->stop();
stops.emplace_back(it->stop());
}

co_await ss::when_all_succeed(stops.begin(), stops.end());

vlog(pool_log.info, "Stopped client pool");
_probe = nullptr;
}
Expand Down
5 changes: 5 additions & 0 deletions src/v/cloud_storage_clients/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ ss::future<abs_configuration> abs_configuration::make_configuration(
return ssx::sformat("{}.blob.core.windows.net", storage_account_name());
}();

// The ABS TLS server misbehaves and does not send an EOF
// when prompted to close the connection. Thus, skip the wait
// in order to avoid Seastar's hardcoded 10s wait.
client_cfg.wait_for_tls_server_eof = false;

client_cfg.tls_sni_hostname = endpoint_uri;
client_cfg.storage_account_name = storage_account_name;
client_cfg.shared_key = shared_key;
Expand Down
7 changes: 5 additions & 2 deletions src/v/net/transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ base_transport::base_transport(configuration c)
: _probe(std::make_unique<client_probe>())
, _server_addr(c.server_addr)
, _creds(c.credentials)
, _tls_sni_hostname(c.tls_sni_hostname) {}
, _tls_sni_hostname(c.tls_sni_hostname)
, _wait_for_tls_server_eof(c.wait_for_tls_server_eof) {}

ss::future<> base_transport::do_connect(clock_type::time_point timeout) {
// hold invariant of having an always valid dispatch gate
Expand All @@ -52,7 +53,9 @@ ss::future<> base_transport::do_connect(clock_type::time_point timeout) {
fd = co_await ss::tls::wrap_client(
_creds,
std::move(fd),
_tls_sni_hostname ? *_tls_sni_hostname : ss::sstring{});
_tls_sni_hostname ? *_tls_sni_hostname : ss::sstring{},
ss::tls::tls_options{
.wait_for_eof_on_shutdown = _wait_for_tls_server_eof});
}
_fd = std::make_unique<ss::connected_socket>(std::move(fd));
_probe->connection_established();
Expand Down
3 changes: 3 additions & 0 deletions src/v/net/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class base_transport {
= net::public_metrics_disabled::no;
/// Optional server name indication (SNI) for TLS connection
std::optional<ss::sstring> tls_sni_hostname;
/// Potentially skip wait for EOF after BYE message on TLS session end
bool wait_for_tls_server_eof = true;
};

explicit base_transport(configuration c);
Expand Down Expand Up @@ -93,6 +95,7 @@ class base_transport {
unresolved_address _server_addr;
ss::shared_ptr<ss::tls::certificate_credentials> _creds;
std::optional<ss::sstring> _tls_sni_hostname;
bool _wait_for_tls_server_eof;

// Track if shutdown was called on the current `_fd`
bool _shutdown{false};
Expand Down
3 changes: 3 additions & 0 deletions tests/rptest/archival/abs_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ def list_objects(self,
self.logger.debug(f"Skip {blob_props.name} for {topic}")
continue

if blob_props.content_settings.content_md5 is None:
continue

yield ObjectMetadata(
bucket=blob_props.container,
key=blob_props.name,
Expand Down
4 changes: 3 additions & 1 deletion tests/rptest/services/redpanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@
# e.g. raft - [group_id:3, {kafka/topic/2}] consensus.cc:2317 - unable to replicate updated configuration: raft::errc::replicated_entry_truncated
"raft - .*unable to replicate updated configuration: .*",
# e.g. recovery_stm.cc:432 - recovery append entries error: rpc::errc::client_request_timeout"
"raft - .*recovery append entries error.*client_request_timeout"
"raft - .*recovery append entries error.*client_request_timeout",
# Pre v23.2 Redpanda's don't know how to interact with HNS Storage Accounts correctly
"abs - .*FeatureNotYetSupportedForHierarchicalNamespaceAccounts"
]

# Path to the LSAN suppressions file
Expand Down
4 changes: 0 additions & 4 deletions tests/rptest/tests/adjacent_segment_merging_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,6 @@ def __init__(self, test_context):
def setUp(self):
super().setUp() # topic is created here

def tearDown(self):
self.cloud_storage_client.empty_bucket(self.bucket_name)
super().tearDown()

@cluster(num_nodes=3)
@matrix(acks=[-1, 1], cloud_storage_type=get_cloud_storage_type())
def test_reupload_of_local_segments(self, acks, cloud_storage_type):
Expand Down
4 changes: 0 additions & 4 deletions tests/rptest/tests/archival_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,6 @@ def setUp(self):
self.rpk.alter_topic_config(topic.name, 'redpanda.remote.write',
'true')

def tearDown(self):
self.cloud_storage_client.empty_bucket(self.s3_bucket_name)
super().tearDown()

@cluster(num_nodes=3)
@matrix(cloud_storage_type=get_cloud_storage_type())
def test_write(self, cloud_storage_type):
Expand Down
4 changes: 0 additions & 4 deletions tests/rptest/tests/cloud_storage_chunk_read_path_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ def setup(self):
# Do not start redpanda here, let the tests start with custom config options
pass

def teardown(self):
self.redpanda.cloud_storage_client.empty_bucket(
self.si_settings.cloud_storage_bucket)

def _set_params_and_start_redpanda(self, **kwargs):
if kwargs:
self.extra_rp_conf.update(kwargs)
Expand Down
4 changes: 0 additions & 4 deletions tests/rptest/tests/e2e_shadow_indexing_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@ def setUp(self):
for topic in self.topics:
self.kafka_tools.create_topic(topic)

def tearDown(self):
assert self.redpanda and self.redpanda.cloud_storage_client
self.redpanda.cloud_storage_client.empty_bucket(self.s3_bucket_name)


def num_manifests_uploaded(test_self):
s = test_self.redpanda.metric_sum(
Expand Down
4 changes: 0 additions & 4 deletions tests/rptest/tests/follower_fetching_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ def __init__(self, test_context):
},
si_settings=si_settings)

def tearDown(self):
self.cloud_storage_client.empty_bucket(self.s3_bucket_name)
super().tearDown()

def setUp(self):
# Delay startup, so that the test case can configure redpanda
# based on test parameters before starting it.
Expand Down
11 changes: 8 additions & 3 deletions tests/rptest/tests/license_upgrade_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
import time

from ducktape.utils.util import wait_until
from ducktape.mark import matrix
from rptest.utils.rpenv import sample_license
from rptest.services.admin import Admin
from ducktape.utils.util import wait_until
from rptest.tests.redpanda_test import RedpandaTest
from rptest.services.redpanda import SISettings
from rptest.services.redpanda import SISettings, CloudStorageType, get_cloud_storage_type
from rptest.services.cluster import cluster
from requests.exceptions import HTTPError
from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST
Expand Down Expand Up @@ -49,7 +50,9 @@ def setUp(self):
super(UpgradeToLicenseChecks, self).setUp()

@cluster(num_nodes=3, log_allow_list=RESTART_LOG_ALLOW_LIST)
def test_basic_upgrade(self):
@matrix(cloud_storage_type=get_cloud_storage_type(
applies_only_on=[CloudStorageType.S3]))
def test_basic_upgrade(self, cloud_storage_type):
# Modified environment variables apply to processes restarted from this point onwards
self.redpanda.set_environment({
'__REDPANDA_LICENSE_CHECK_INTERVAL_SEC':
Expand Down Expand Up @@ -129,7 +132,9 @@ def setUp(self):
super(UpgradeMigratingLicenseVersion, self).setUp()

@cluster(num_nodes=3, log_allow_list=RESTART_LOG_ALLOW_LIST)
def test_license_upgrade(self):
@matrix(cloud_storage_type=get_cloud_storage_type(
applies_only_on=[CloudStorageType.S3]))
def test_license_upgrade(self, cloud_storage_type):
license = sample_license()
if license is None:
self.logger.info(
Expand Down
12 changes: 9 additions & 3 deletions tests/rptest/tests/read_replica_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,9 @@ def _bucket_delta(self, bu1: BucketUsage,
return None

@cluster(num_nodes=7, log_allow_list=READ_REPLICA_LOG_ALLOW_LIST)
@matrix(partition_count=[5], cloud_storage_type=[CloudStorageType.S3])
@matrix(
partition_count=[5],
cloud_storage_type=get_cloud_storage_type(docker_use_arbitrary=True))
def test_identical_lwms_after_delete_records(
self, partition_count: int,
cloud_storage_type: CloudStorageType) -> None:
Expand Down Expand Up @@ -322,7 +324,9 @@ def clusters_report_identical_lwms():
check_lwm(7)

@cluster(num_nodes=8, log_allow_list=READ_REPLICA_LOG_ALLOW_LIST)
@matrix(partition_count=[5], cloud_storage_type=[CloudStorageType.S3])
@matrix(
partition_count=[5],
cloud_storage_type=get_cloud_storage_type(docker_use_arbitrary=True))
def test_identical_hwms(self, partition_count: int,
cloud_storage_type: CloudStorageType) -> None:
self._setup_read_replica(partition_count=partition_count,
Expand Down Expand Up @@ -460,7 +464,9 @@ def __init__(self, test_context: TestContext):
self.second_cluster = None

@cluster(num_nodes=8)
def test_upgrades(self):
@matrix(cloud_storage_type=get_cloud_storage_type(
applies_only_on=[CloudStorageType.S3]))
def test_upgrades(self, cloud_storage_type):
partition_count = 1
install_opts = InstallOptions(install_previous_version=True)
self.start_redpanda(3,
Expand Down
3 changes: 2 additions & 1 deletion tests/rptest/tests/services_self_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ def __init__(self, test_context, *args, **kwargs):
@skip_debug_mode # We wait for a decent amount of traffic
@cluster(num_nodes=4)
#@matrix(cloud_storage_type=get_cloud_storage_type())
@matrix(cloud_storage_type=[CloudStorageType.S3])
@matrix(cloud_storage_type=get_cloud_storage_type(
applies_only_on=[CloudStorageType.S3]))
def test_missing_segment(self, cloud_storage_type):
topic = 'test'

Expand Down
4 changes: 0 additions & 4 deletions tests/rptest/tests/shadow_indexing_admin_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ def setUp(self):
# feature on after start.
self.redpanda.set_cluster_config({'admin_api_require_auth': True})

def tearDown(self):
self.cloud_storage_client.empty_bucket(self.s3_bucket_name)
super().tearDown()

@cluster(num_nodes=3, log_allow_list=CONNECTION_ERROR_LOGS)
@matrix(cloud_storage_type=get_cloud_storage_type())
def test_bucket_validation(self, cloud_storage_type):
Expand Down
21 changes: 13 additions & 8 deletions tests/rptest/tests/topic_creation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@
from rptest.clients.rpk import RpkException, RpkTool
from rptest.clients.kafka_cat import KafkaCat
from rptest.services.producer_swarm import ProducerSwarm
from rptest.services.redpanda import ResourceSettings, SISettings
from rptest.services.redpanda import ResourceSettings, SISettings, CloudStorageType, get_cloud_storage_type
from rptest.services.redpanda_installer import RedpandaInstaller
from rptest.services.rpk_producer import RpkProducer
from rptest.clients.kafka_cli_tools import KafkaCliTools
from rptest.util import wait_for_local_storage_truncate, expect_exception
from rptest.utils.mode_checks import skip_azure_blob_storage
from rptest.clients.kcl import KCL

from ducktape.utils.util import wait_until
Expand Down Expand Up @@ -560,8 +559,10 @@ def _populate_tiered_storage_topic(self, topic_name, local_retention):
target_bytes=local_retention)

@cluster(num_nodes=3)
@skip_azure_blob_storage
def test_cloud_storage_sticky_enablement_v22_2_to_v22_3(self):
@matrix(cloud_storage_type=get_cloud_storage_type(
applies_only_on=[CloudStorageType.S3]))
def test_cloud_storage_sticky_enablement_v22_2_to_v22_3(
self, cloud_storage_type):
"""
In Redpanda 22.3, the cluster defaults for cloud storage change
from being applied at runtime to being sticky at creation time,
Expand Down Expand Up @@ -628,8 +629,10 @@ def test_cloud_storage_sticky_enablement_v22_2_to_v22_3(self):
assert described['redpanda.remote.read'] == ('false', 'DEFAULT_CONFIG')

@cluster(num_nodes=3)
@skip_azure_blob_storage
def test_retention_config_on_upgrade_from_v22_2_to_v22_3(self):
@matrix(cloud_storage_type=get_cloud_storage_type(
applies_only_on=[CloudStorageType.S3]))
def test_retention_config_on_upgrade_from_v22_2_to_v22_3(
self, cloud_storage_type):
self.install_and_start()

self.rpk.create_topic("test-topic-with-retention",
Expand Down Expand Up @@ -814,8 +817,10 @@ def is_empty():
assert len(deleted_objects) == 0

@cluster(num_nodes=3)
@skip_azure_blob_storage
def test_retention_upgrade_with_cluster_remote_write(self):
@matrix(cloud_storage_type=get_cloud_storage_type(
applies_only_on=[CloudStorageType.S3]))
def test_retention_upgrade_with_cluster_remote_write(
self, cloud_storage_type):
"""
Validate how the cluster-wide cloud_storage_enable_remote_write
is handled on upgrades from <=22.2
Expand Down
10 changes: 5 additions & 5 deletions tests/rptest/tests/upgrade_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from collections import defaultdict
from packaging.version import Version

from ducktape.mark import parametrize
from ducktape.mark import parametrize, matrix
from ducktape.utils.util import wait_until
from rptest.services.admin import Admin
from rptest.clients.rpk import RpkTool
Expand All @@ -28,9 +28,8 @@
wait_until_segments,
)
from rptest.utils.si_utils import BucketView
from rptest.utils.mode_checks import skip_azure_blob_storage
from rptest.services.cluster import cluster
from rptest.services.redpanda import SISettings
from rptest.services.redpanda import SISettings, CloudStorageType, get_cloud_storage_type
from rptest.services.kgo_verifier_services import (
KgoVerifierProducer,
KgoVerifierSeqConsumer,
Expand Down Expand Up @@ -380,8 +379,9 @@ def install_and_start(self):
super().setUp()

@cluster(num_nodes=4, log_allow_list=RESTART_LOG_ALLOW_LIST)
@skip_azure_blob_storage
def test_rolling_upgrade(self):
@matrix(cloud_storage_type=get_cloud_storage_type(
applies_only_on=[CloudStorageType.S3]))
def test_rolling_upgrade(self, cloud_storage_type):
"""
Verify that when tiered storage writes happen during a rolling upgrade,
we continue to write remote content that old versions can read, until
Expand Down
9 changes: 7 additions & 2 deletions tests/rptest/tests/workload_upgrade_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from rptest.clients.offline_log_viewer import OfflineLogViewer
from rptest.services.cluster import cluster
from rptest.services.admin import Admin
from rptest.services.redpanda import SISettings
from rptest.services.redpanda import SISettings, CloudStorageType, get_cloud_storage_type
from rptest.services.redpanda_installer import RedpandaInstaller, RedpandaVersion, RedpandaVersionTriple
from rptest.services.workload_protocol import PWorkload
from rptest.tests.prealloc_nodes import PreallocNodesTest
Expand All @@ -22,6 +22,7 @@
from rptest.tests.redpanda_test import RedpandaTest
from rptest.tests.workload_license import LicenseWorkload
from rptest.utils.mode_checks import skip_debug_mode
from ducktape.mark import matrix


def expand_version(
Expand Down Expand Up @@ -248,7 +249,11 @@ def cluster_version(self) -> int:

@skip_debug_mode
@cluster(num_nodes=4)
def test_workloads_through_releases(self):
# TODO(vlad): Allow this test on ABS once we have at least two versions
# of Redpanda that support Azure Hierarchical Namespaces.
@matrix(cloud_storage_type=get_cloud_storage_type(
applies_only_on=[CloudStorageType.S3]))
def test_workloads_through_releases(self, cloud_storage_type):
# this callback will be called between each upgrade, in a mixed version state
def mid_upgrade_check(raw_versions: dict[Any, RedpandaVersion]):
rp_versions = {
Expand Down
32 changes: 0 additions & 32 deletions tests/rptest/utils/mode_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,35 +63,3 @@ def f(*args, **kwargs):
return func(*args, **kwargs)

return f


def skip_azure_blob_storage(func):
"""
Decorator applied to a test class method. The property `azure_blob_storage` should be present
on the object.
If set to true, the wrapped function call is skipped, and a cleanup action
is performed instead.
If set to false, the wrapped function (usually a test case) is called.
"""
@functools.wraps(func)
def f(*args, **kwargs):
assert args, 'skip_azure_blob_storage must be placed on a test method in a class'

caller = args[0]

assert hasattr(
caller, 'azure_blob_storage'
), 'skip_azure_blob_storage called on object which does not have azure_blob_storage attribute'
assert hasattr(
caller, 'logger'
), 'skip_azure_blob_storage called on object which has no logger'
if caller.azure_blob_storage:
caller.logger.info(
"Skipping Azure Blob Storage test in (requires S3)")
cleanup_on_early_exit(caller)
return None
return func(*args, **kwargs)

return f

0 comments on commit 5db425f

Please sign in to comment.