Skip to content

Commit

Permalink
Merge pull request redpanda-data#23015 from vbotbuildovich/backport-p…
Browse files Browse the repository at this point in the history
…r-22946-v24.2.x-801

[v24.2.x] [rptest] always clean up object stores (that are small) for non-GCP
  • Loading branch information
jackietung-redpanda authored Aug 23, 2024
2 parents 61b92d3 + 7b86a28 commit eecac8c
Showing 1 changed file with 63 additions and 4 deletions.
67 changes: 63 additions & 4 deletions tests/rptest/services/redpanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,23 @@
from rptest.services.utils import NodeCrash, LogSearchLocal, LogSearchCloud, Stopwatch
from rptest.util import inject_remote_script, ssh_output_stderr, wait_until_result
from rptest.utils.allow_logs_on_predicate import AllowLogsOnPredicate
import enum

Partition = collections.namedtuple('Partition',
['topic', 'index', 'leader', 'replicas'])

MetricSample = collections.namedtuple(
'MetricSample', ['family', 'sample', 'node', 'value', 'labels'])


class CloudStorageCleanupStrategy(enum.Enum):
# I.e. if we are using a lifecycle rule, then we do NOT clean the bucket
IF_NOT_USING_LIFECYCLE_RULE = "IF_NOT_USING_LIFECYCLE_RULE"

# Ignore large buckets (based on number of objects). For small buckets, ALWAYS clean.
ALWAYS_SMALL_BUCKETS_ONLY = "ALWAYS_SMALL_BUCKETS_ONLY"


SaslCredentials = redpanda_types.SaslCredentials

# Map of path -> (checksum, size)
Expand Down Expand Up @@ -536,6 +546,10 @@ def __init__(self,
CloudStorageType, test_context.
injected_args['cloud_storage_type_and_url_style'][0])

# For most cloud storage backends, we clean up small buckets always.
# In some cases, we will modify this strategy in the block below.
self.cloud_storage_cleanup_strategy = CloudStorageCleanupStrategy.ALWAYS_SMALL_BUCKETS_ONLY

if self.cloud_storage_type == CloudStorageType.S3:
self.cloud_storage_credentials_source = cloud_storage_credentials_source
self.cloud_storage_access_key = cloud_storage_access_key
Expand All @@ -551,6 +565,10 @@ def __init__(self,
elif test_context.globals.get(self.GLOBAL_CLOUD_PROVIDER,
'aws') == 'gcp':
self.cloud_storage_api_endpoint = 'storage.googleapis.com'
# For GCP, we currently use S3 compat API over boto3, which does not support batch deletes
# This makes cleanup slow, even for small-ish buckets.
# Therefore, we maintain logic of skipping bucket cleaning completely, if we are using lifecycle rule.
self.cloud_storage_cleanup_strategy = CloudStorageCleanupStrategy.IF_NOT_USING_LIFECYCLE_RULE
else:
# Endpoint defaults to minio-s3
self.cloud_storage_api_endpoint = 'minio-s3'
Expand Down Expand Up @@ -589,6 +607,7 @@ def __init__(self,
self.endpoint_url = f'http://{self.cloud_storage_api_endpoint}:{self.cloud_storage_api_endpoint_port}'
self.bypass_bucket_creation = bypass_bucket_creation
self.use_bucket_cleanup_policy = use_bucket_cleanup_policy

self.cloud_storage_housekeeping_interval_ms = cloud_storage_housekeeping_interval_ms
self.cloud_storage_spillover_manifest_max_segments = cloud_storage_spillover_manifest_max_segments
self.retention_local_strict = retention_local_strict
Expand Down Expand Up @@ -3220,21 +3239,61 @@ def cloud_storage_client(self):
return self._cloud_storage_client

def delete_bucket_from_si(self):
if self.si_settings.use_bucket_cleanup_policy:
self.logger.info(
f"cloud_storage_cleanup_strategy = {self.si_settings.cloud_storage_cleanup_strategy}"
)

if self.si_settings.cloud_storage_cleanup_strategy == CloudStorageCleanupStrategy.ALWAYS_SMALL_BUCKETS_ONLY:
bucket_is_small = True
max_object_count = 3000

# See if the bucket is small enough
t = time.time()
for i, m in enumerate(
self.cloud_storage_client.list_objects(
self.si_settings.cloud_storage_bucket)):
if i >= max_object_count:
bucket_is_small = False
break
self.logger.info(
f"Skipping deletion of bucket/container: {self.si_settings.cloud_storage_bucket}."
"Using a cleanup policy instead. Please delete it manually")
return
f"Determining bucket count for {self.si_settings.cloud_storage_bucket} up to {max_object_count} objects took {time.time() - t}s"
)
if bucket_is_small:
# Log grep hint: "a small bucket"
self.logger.info(
f"Bucket {self.si_settings.cloud_storage_bucket} is a small bucket (deleting it)"
)
else:
self.logger.info(
f"Bucket {self.si_settings.cloud_storage_bucket} is NOT a small bucket (NOT deleting it)"
)
return

elif self.si_settings.cloud_storage_cleanup_strategy == CloudStorageCleanupStrategy.IF_NOT_USING_LIFECYCLE_RULE:
if self.si_settings.use_bucket_cleanup_policy:
self.logger.info(
f"Skipping deletion of bucket/container: {self.si_settings.cloud_storage_bucket}. "
"Using a cleanup policy instead.")
return
else:
raise ValueError(
f"Unimplemented cloud storage cleanup strategy {self.si_settings.cloud_storage_cleanup_strategy}"
)

self.logger.debug(
f"Deleting bucket/container: {self.si_settings.cloud_storage_bucket}"
)

assert self.si_settings.cloud_storage_bucket, f"missing bucket : {self.si_settings.cloud_storage_bucket}"
t = time.time()
self.cloud_storage_client.empty_and_delete_bucket(
self.si_settings.cloud_storage_bucket,
parallel=self.dedicated_nodes)

self.logger.info(
f"Emptying and deleting bucket {self.si_settings.cloud_storage_bucket} took {time.time() - t}s"
)

def get_objects_from_si(self):
assert self.cloud_storage_client and self._si_settings and self._si_settings.cloud_storage_bucket, \
f"bad si config {self.cloud_storage_client} : {self._si_settings.cloud_storage_bucket if self._si_settings else self._si_settings}"
Expand Down

0 comments on commit eecac8c

Please sign in to comment.