Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ci: Add util to test structure to clean cloudv2 after testrun #14255

Merged
merged 5 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 215 additions & 0 deletions tests/rp_cloud_cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
import json
import logging
import re
import os
import sys
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor
from rptest.services.provider_clients.rpcloud_client import RpCloudApiClient
from rptest.services.redpanda_cloud import CloudClusterConfig, ns_name_date_fmt, \
ns_name_prefix

gconfig_path = './'


def setupLogger(level):
root = logging.getLogger()
root.setLevel(level)
# Console
cli = logging.StreamHandler(sys.stdout)
cli.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(message)s')
cli.setFormatter(formatter)
root.addHandler(cli)

return root


class CloudCleanup():
_ns_pattern = f"{ns_name_prefix}"
# At this point all is disabled except namespaces
delete_clusters = False
delete_peerings = False
delete_networks = False
delete_namespaces = True

ns_regex = re.compile("^(?P<prefix>" + f"{ns_name_prefix}" + ")"
"(?P<date>\d{4}-\d{2}-\d{2}-\d{6}-)?"
"(?P<id>[a-zA-Z0-9]{8})$")

def __init__(self, log_level=logging.INFO):
self.log = setupLogger(log_level)
self.log.info("# CloudV2 resourse cleanup util")
# Load config
ducktape_globals = self.load_globals(gconfig_path)

# Serialize it to class
if "cloud_cluster" not in ducktape_globals:
self.log.warn("# WARNING: Cloud cluster configuration "
"not present, exiting")
sys.exit(1)
self.config = CloudClusterConfig(**ducktape_globals['cloud_cluster'])

# Init the REST client
self.cloudv2 = RpCloudApiClient(self.config, self.log)

def load_globals(self, path):
_gconfig = {}
_gpath = os.path.join(path, "globals.json")
try:
with open(_gpath, "r") as gf:
_gconfig = json.load(gf)
except Exception as e:
self.log.error(f"# ERROR: globals.json not found; {e}")
return _gconfig

def _delete_peerings(self, pool, queue):
if not self.delete_peerings:
self.log.info(f"# {len(queue)} network peerings could be deleted")
else:
self.log.info(f"# Deleting {len(queue)} network peerings")
for r in pool.map(self.cloudv2.delete_resource, queue):
# TODO: Check on real peering network delete request
if not r:
self.log.warning(f"# Network peering '{r['name']}' "
"not deleted")

def _delete_clusters(self, pool, queue):
if not self.delete_clusters:
self.log.info(f"# {len(queue)} clusters could be deleted")
else:
self.log.info(f"# Deleting {len(queue)} clusters")
for r in pool.map(self.cloudv2.delete_resource, queue):
if r['state'] != 'deleted':
self.log.warning(f"# Cluster '{r['name']}' not deleted")

def _delete_networks(self, pool, queue):
if not self.delete_networks:
self.log.info(f"# {len(queue)} networks could be deleted")
else:
self.log.info(f"# Deleting {len(queue)} networks")
for r in pool.map(self.cloudv2.delete_resource, queue):
if r['state'] != 'deleted':
self.log.warning(f"# Network '{r['name']}' not deleted")

def _delete_namespaces(self, pool, queue):
if not self.delete_namespaces:
self.log.info(f"# {len(queue)} namespaces could be deleted")
else:
self.log.info(f"# Deleting {len(queue)} namespaces")
for r in pool.map(self.cloudv2.delete_resource, queue):
if not r['deleted']:
self.log.warning(f"# Namespace '{r['name']}' not deleted")

def clean_namespaces(self):
"""
Function lists non-deleted namespaces and hierachically deletes
clusters, networks and network-peerings if any
"""
self.log.info("# Listing namespaces")
# Items to delete
cl_queue = []
net_queue = []
npr_queue = []
ns_queue = []

# Get namespaces
ns_list = self.cloudv2.list_namespaces()
self.log.info(f" {len(ns_list)} total namespaces found. "
f"Filtering with '{self._ns_pattern}*'")
# filter namespaces according to 'ns_name_prefix'
ns_list = [
n for n in ns_list if n['name'].startswith(self._ns_pattern)
]
# Processing namespaces
self.log.info(
f"# Searching for resources in {len(ns_list)} namespaces")
# Fugure out which time was 36h back
back_36h = datetime.now() - timedelta(hours=36)
for ns in ns_list:
# Filter out according to dates in name
# Detect date
ns_match = self.ns_regex.match(ns['name'])
date = ns_match['date']
if date is not None:
# Parse date into datetime object
ns_creation_date = datetime.strptime(date, ns_name_date_fmt)
if ns_creation_date > back_36h:
self.log.info(f" skipped '{ns['name']}', "
"36h delay not passed")
continue
# Check which ones are empty
# The areas that are checked is clusters, networks and network-peerings
clusters = self.cloudv2.list_clusters(ns_uuid=ns['id'])
networks = self.cloudv2.list_networks(ns_uuid=ns['id'])
# check if any peerings exists
peerings = []
for net in networks:
peerings += self.cloudv2.list_network_peerings(
net['id'], ns_uuid=ns['id'])
# Calculate existing resources for this namespace
counts = [len(clusters), len(networks), len(peerings)]
if any(counts):
# Announce counts
self.log.warning(f"# WARNING: Namespace '{ns['name']}' "
f"not empty: {counts[0]} clusters,"
f" {counts[1]} networks,"
f" {counts[2]} peering connections")
# TODO: Prepare needed data for peerings
if counts[2]:
for peernet in peerings:
# Add peerings delete handle to own list
npr_queue += [
self.cloudv2.network_peering_endpoint(
id=peernet['net_id'], peering_id=peernet['id'])
]
# TODO: Prepare needed data for clusters
if counts[0]:
for cluster in clusters:
# TODO: Check creation date
# Add cluster delete handle to own list
cl_queue += [
self.cloudv2.cluster_endpoint(cluster['id'])
]
# TODO: Prepare needed data for networks
if counts[1]:
for net in networks:
# Add network delete handle to own list
net_queue += [self.cloudv2.network_endpoint(net['id'])]
# Add ns delete handle to the list
ns_queue += [self.cloudv2.namespace_endpoint(uuid=ns['id'])]
# Use ThreadPool
# Deletion can be done only in this order:
# Peerings - > clusters -> networks -> namespaces
pool = ThreadPoolExecutor(max_workers=5)

# Delete network peerings
self._delete_peerings(pool, npr_queue)

# Delete clusters
self._delete_clusters(pool, cl_queue)

# Delete orphaned networks
self._delete_networks(pool, net_queue)

# Delete namespaces
self._delete_namespaces(pool, ns_queue)

# Cleanup thread resources
pool.shutdown()

return


# Main script
def cleanup_entrypoint():
# Init the cleaner class
cleaner = CloudCleanup()

# Namespaces
cleaner.clean_namespaces()


if __name__ == "__main__":
cleanup_entrypoint()
sys.exit(0)
172 changes: 172 additions & 0 deletions tests/rptest/services/provider_clients/rpcloud_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import requests


class RpCloudApiClient(object):
def __init__(self, config, log):
self._config = config
self._token = None
self._logger = log
self.lasterror = None

def _handle_error(self, response):
try:
response.raise_for_status()
except requests.HTTPError as e:
self.lasterror = f'{e} {response.text}'
self._logger.error(self.lasterror)
raise e
return response

def _get_token(self):
"""
Returns access token to be used in subsequent api calls to cloud api.

To save on repeated token generation, this function will cache it in a local variable.
Assumes the token has an expiration that will last throughout the usage of this cluster.

:return: access token as a string
"""

if self._token is None:
headers = {'Content-Type': "application/x-www-form-urlencoded"}
data = {
'grant_type': 'client_credentials',
'client_id': f'{self._config.oauth_client_id}',
'client_secret': f'{self._config.oauth_client_secret}',
'audience': f'{self._config.oauth_audience}'
}
resp = requests.post(f'{self._config.oauth_url}',
headers=headers,
data=data)
_r = self._handle_error(resp)
if _r is None:
return _r
j = resp.json()
self._token = j['access_token']
return self._token

def _http_get(self,
endpoint='',
base_url=None,
override_headers=None,
text_response=False,
**kwargs):
headers = override_headers
if headers is None:
token = self._get_token()
headers = {
'Authorization': f'Bearer {token}',
'Accept': 'application/json'
}
_base = base_url if base_url else self._config.api_url
resp = requests.get(f'{_base}{endpoint}', headers=headers, **kwargs)
_r = self._handle_error(resp)
if text_response:
return _r if _r is None else _r.text
return _r if _r is None else _r.json()

def _http_post(self, base_url=None, endpoint='', **kwargs):
token = self._get_token()
headers = {
'Authorization': f'Bearer {token}',
'Accept': 'application/json'
}
if base_url is None:
base_url = self._config.api_url
resp = requests.post(f'{base_url}{endpoint}',
headers=headers,
**kwargs)
_r = self._handle_error(resp)
return _r if _r is None else _r.json()

def _http_delete(self, endpoint='', **kwargs):
token = self._get_token()
headers = {
'Authorization': f'Bearer {token}',
'Accept': 'application/json'
}
resp = requests.delete(f'{self._config.api_url}{endpoint}',
headers=headers,
**kwargs)
_r = self._handle_error(resp)
return _r if _r is None else _r.json()

@staticmethod
def namespace_endpoint(uuid=None):
_e = "/api/v1/namespaces"
if uuid:
_e += f"/{uuid}"
return _e

@staticmethod
def cluster_endpoint(id=None):
_e = "/api/v1/clusters"
if id:
_e += f"/{id}"
return _e

@staticmethod
def network_endpoint(id=None):
_e = "/api/v1/networks"
if id:
_e += f"/{id}"
return _e

@staticmethod
def network_peering_endpoint(id=None, peering_id=None):
_e = "/api/v1/networks"
if id:
_e += f"/{id}/network-peerings"
if peering_id:
_e += f"/{peering_id}"
return _e

def _prepare_params(self, ns_uuid=None):
params = {}
if ns_uuid:
params['namespaceUuid'] = ns_uuid
return params

def list_namespaces(self, include_deleted=False):
# Use local var to manupulate output
_ret = self._http_get(self.namespace_endpoint())
# Filter out deleted ones
if include_deleted:
_namespaces = _ret
else:
_namespaces = [n for n in _ret if not n['deleted']]
# return it
return _namespaces

def list_networks(self, ns_uuid=None):
# get networks for a namespace
_ret = self._http_get(self.network_endpoint(),
params=self._prepare_params(ns_uuid))
# return it
return _ret

def list_clusters(self, ns_uuid=None):
# get networks for a namespace
_ret = self._http_get(self.cluster_endpoint(),
params=self._prepare_params(ns_uuid))
# return it
return _ret

def list_network_peerings(self, network_id, ns_uuid=None):
_ret = self._http_get(self.network_peering_endpoint(id=network_id),
params=self._prepare_params(ns_uuid=ns_uuid))
return _ret

def get_network(self, network_id):
_network = self._http_get(self.network_endpoint(id=network_id))
return _network

def delete_namespace(self, uuid):
_r = self._http_delete(endpoint=self.namespace_endpoint(uuid=uuid))
# Check status
return _r

def delete_resource(self, resource_handle):
_r = self._http_delete(endpoint=resource_handle)
self._logger.debug(f"...delete requested for '{resource_handle}'")
return _r
Loading
Loading