From ef66f444ebb21686cabf527c0686536afe0893c5 Mon Sep 17 00:00:00 2001 From: whitfiea Date: Wed, 11 Dec 2024 13:02:00 +0000 Subject: [PATCH] Add verify tests --- .../04-postsync-maximoit-sanity.yaml | 7 - .../04-postsync-maximoit-verify.yaml | 512 ++++++++++++++++++ 2 files changed, 512 insertions(+), 7 deletions(-) create mode 100644 instance-applications/510-550-ibm-mas-suite-app-config/templates/04-postsync-maximoit-verify.yaml diff --git a/instance-applications/510-550-ibm-mas-suite-app-config/templates/04-postsync-maximoit-sanity.yaml b/instance-applications/510-550-ibm-mas-suite-app-config/templates/04-postsync-maximoit-sanity.yaml index dcb0a638..1daed52c 100644 --- a/instance-applications/510-550-ibm-mas-suite-app-config/templates/04-postsync-maximoit-sanity.yaml +++ b/instance-applications/510-550-ibm-mas-suite-app-config/templates/04-postsync-maximoit-sanity.yaml @@ -142,7 +142,6 @@ data: kubernetes openshift requests==2.31.0 - assertpy==1.1 urllib3==1.26.18 tests.py: |- @@ -158,7 +157,6 @@ data: import logging import tempfile import base64 - from assertpy import assert_that import string @@ -176,10 +174,6 @@ data: if manageNamespace is None: raise Exception(f"Required MANAGE_NAMESPACE environment variable is not set") - masNamespace = os.getenv("MAS_NAMESPACE") - if masNamespace is None: - raise Exception(f"Required MAS_NAMESPACE environment variable is not set") - MANAGE_URL = f'https://{mas_instance_id}-{mas_workspace_id}.mas-{mas_instance_id}-manage.svc' # Use for cluster # MANAGE_URL = 'https://localhost:9443' # Use for local @@ -912,7 +906,6 @@ data: #***********************Test Functions*************** #TC#1 Test to verify system info has Maximo IT def test_system_info(manage_host_ca_filepath, headers): - print(headers) response = requests.get(MANAGE_URL + '/maximo/api/systeminfo', headers=headers, verify=manage_host_ca_filepath) # Check if the response code is 200 OK assert response.status_code == 200, "System Info fetch failed." diff --git a/instance-applications/510-550-ibm-mas-suite-app-config/templates/04-postsync-maximoit-verify.yaml b/instance-applications/510-550-ibm-mas-suite-app-config/templates/04-postsync-maximoit-verify.yaml new file mode 100644 index 00000000..983f50d7 --- /dev/null +++ b/instance-applications/510-550-ibm-mas-suite-app-config/templates/04-postsync-maximoit-verify.yaml @@ -0,0 +1,512 @@ +{{- if eq .Values.mas_app_id "manage" }} +{{- if .Values.mas_appws_spec.components.icd }} + +# A sanity test is one that can be disruptive i.e. it can create new users, call authenticated apis, creates resources +# in the application. This type of test should only be run in a downstream environment such as a dev or staging env +# The control over if these tests run or not is controlled by the `run_sanity_test` boolean in the values + +{{ $ns := .Values.mas_app_namespace }} +{{ $np_name := "postsync-verify-maximoit-np" }} +{{ $role_name := "postsync-verify-maximoit-role" }} +{{ $sa_name := "postsync-verify-maximoit-sa" }} +{{ $rb_name := "postsync-verify-maximoit-rb" }} +{{ $tests_cm_name := "postsync-verify-tests-maximoit-cm" }} +{{ $record_cm_name := "postsync-verify-tests-maximoit-record-cm" }} +{{ $job_name := "postsync-verify-maximoit-job" }} + + +--- +# Permit outbound communication by the Job pod +# (Needed to communicate with the K8S HTTP API, PyPI, manage Route) +kind: NetworkPolicy +apiVersion: networking.k8s.io/v1 +metadata: + name: {{ $np_name }} + namespace: {{ $ns }} + annotations: + argocd.argoproj.io/sync-wave: "600" + argocd.argoproj.io/hook: PostSync + argocd.argoproj.io/hook-delete-policy: HookSucceeded,BeforeHookCreation +{{- if .Values.custom_labels }} + labels: +{{ .Values.custom_labels | toYaml | indent 4 }} +{{- end }} +spec: + podSelector: + matchLabels: + app: {{ $job_name }} + egress: + - {} + policyTypes: + - Egress + + +--- +kind: ServiceAccount +apiVersion: v1 +metadata: + name: {{ $sa_name }} + namespace: {{ $ns }} + annotations: + argocd.argoproj.io/sync-wave: "600" + argocd.argoproj.io/hook: PostSync + argocd.argoproj.io/hook-delete-policy: HookSucceeded,BeforeHookCreation +{{- if .Values.custom_labels }} + labels: +{{ .Values.custom_labels | toYaml | indent 4 }} +{{- end }} + + + +--- +# ------------------------------------- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: {{ $role_name }} + namespace: {{ $ns }} + annotations: + argocd.argoproj.io/sync-wave: "600" + argocd.argoproj.io/hook: PostSync + argocd.argoproj.io/hook-delete-policy: HookSucceeded,BeforeHookCreation +{{- if .Values.custom_labels }} + labels: +{{ .Values.custom_labels | toYaml | indent 4 }} +{{- end }} +rules: + - verbs: + - get + - list + apiGroups: + - "route.openshift.io" + - "" + resources: + - routes + - secrets + + - verbs: + - get + - list + - patch + apiGroups: + - "" + resources: + - configmaps +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: {{ $rb_name }} + namespace: {{ $ns }} + annotations: + argocd.argoproj.io/sync-wave: "601" + argocd.argoproj.io/hook: PostSync + argocd.argoproj.io/hook-delete-policy: HookSucceeded,BeforeHookCreation +{{- if .Values.custom_labels }} + labels: +{{ .Values.custom_labels | toYaml | indent 4 }} +{{- end }} +subjects: + - kind: ServiceAccount + name: {{ $sa_name }} + namespace: {{ $ns }} +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: {{ $role_name }} +# ------------------------------------- + +--- +kind: ConfigMap +apiVersion: v1 +metadata: + name: {{ $tests_cm_name }} + namespace: {{ $ns }} + annotations: + argocd.argoproj.io/sync-wave: "602" + argocd.argoproj.io/hook: PostSync + argocd.argoproj.io/hook-delete-policy: HookSucceeded,BeforeHookCreation +{{- if .Values.custom_labels }} + labels: +{{ .Values.custom_labels | toYaml | indent 4 }} +{{- end }} +immutable: false +data: + requirements.txt: |- + pytest + kubernetes + openshift + requests==2.31.0 + urllib3==1.26.18 + tests.py: |- + + from kubernetes import client,config + from kubernetes.client import Configuration + from openshift.dynamic import DynamicClient + import pytest + import os + import urllib3 + import requests + import certifi + import logging + import tempfile + import base64 + + + logger = logging.getLogger() + + mas_instance_id = os.getenv("MAS_INSTANCE_ID") + if mas_instance_id is None: + raise Exception(f"Required MAS_INSTANCE_ID environment variable is not set") + + mas_workspace_id = os.getenv("MAS_WORKSPACE_ID") + if mas_workspace_id is None: + raise Exception(f"Required MAS_WORKSPACE_ID environment variable is not set") + + manageNamespace = os.getenv("MANAGE_NAMESPACE") + if manageNamespace is None: + raise Exception(f"Required MANAGE_NAMESPACE environment variable is not set") + + + MANAGE_URL = f'https://{mas_instance_id}-{mas_workspace_id}.mas-{mas_instance_id}-manage.svc' # Use for cluster + # MANAGE_URL = 'https://localhost:9443' # Use for local + + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + session = requests.Session() + + + @pytest.fixture(scope="session") + def dyn_client(): + if "KUBERNETES_SERVICE_HOST" in os.environ: + config.load_incluster_config() + k8s_config = Configuration.get_default_copy() + k8s_client = client.ApiClient(configuration=k8s_config) + else: + k8s_client = config.new_client_from_config() + + dyn_client = DynamicClient(k8s_client) + dyn_client.namespace = manageNamespace + yield dyn_client + + @pytest.fixture(scope="session") + def v1_routes(dyn_client): + yield dyn_client.resources.get(api_version='route.openshift.io/v1', kind='Route') + + @pytest.fixture(scope="session") + def manage_route(v1_routes): + yield v1_routes.get(name=f"{mas_instance_id}-manage-{mas_workspace_id}", namespace=manageNamespace) + + @pytest.fixture(scope="session") + def manage_host_ca_filepath(manage_route): + + # Read the certificate field from the Manage Route. + # This may include CA certificates that we need in order to trust the certificates presented by the external Manage endpoint. + try: + manage_route_certificate = manage_route['spec']['tls']['certificate'] + except KeyError as e: + pass + + # Read the caCertificate field from the Manage Route. + # This may include CA certificates that we need in order to trust the certificates presented by the external Manage endpoint. + try: + manage_route_caCertificate = manage_route['spec']['tls']['caCertificate'] + except KeyError as e: + pass + + # Read the destinationCACertificate field from the Manage Route. + # This may include CA certificates that we need in order to trust the certificates presented by the internal Manage services. + try: + manage_route_destinationCACertificate = manage_route['spec']['tls']['destinationCACertificate'] + except KeyError as e: + pass + + # Load default CA bundle. This will include certs for well-known CAs. This ensures that we will + # trust the certificates presented by the external Manage endpoints when MAS is configured to use + # an external frontend like CIS. + with open(certifi.where(), 'rb') as default_ca: + default_ca_content = default_ca.read() + + # Combine all of the above into a single .pem file that we can use when issuing HTTP requests + chain_file = tempfile.NamedTemporaryFile(delete=False) + try: + + if manage_route_certificate: + chain_file.write(manage_route_certificate.encode()) + + if manage_route_caCertificate: + chain_file.write(manage_route_certificate.encode()) + + if manage_route_destinationCACertificate: + chain_file.write(manage_route_destinationCACertificate.encode()) + + chain_file.write(default_ca_content) + + chain_file.flush() + chain_file.close() + + yield chain_file.name + + finally: + os.remove(chain_file.name) + + + @pytest.fixture(scope="session") + def client_cert(dyn_client): + certSecretObj = dyn_client.resources.get(api_version='v1', kind='Secret') + certSecret = certSecretObj.get( + f"{mas_instance_id}-internal-manage-tls", f"mas-{mas_instance_id}-manage" + ) + internal_cert = certSecret.data["tls.crt"] + internal_key = certSecret.data["tls.key"] + decoded_internal_cert = base64.b64decode(internal_cert).decode('utf-8') + decoded_internal_key = base64.b64decode(internal_key).decode('utf-8') + + with open("internal.crt", "w") as f: + f.write(decoded_internal_cert) + with open("internal.key", "w") as f: + f.write(decoded_internal_key) + client_cert = ("internal.crt", "internal.key") + + with open("internal.crt", "r") as f: + cert_content = f.read() + logger.info("Contents of cert:") + logger.info(cert_content) + with open("internal.key", "r") as f: + cert_content = f.read() + logger.info("Contents of cert:") + logger.info(cert_content) + return client_cert + + + @pytest.fixture() + def user_api_key(client_cert, manage_host_ca_filepath): + user_id="maxadmin" + api_key = get_api_key(user_id, client_cert, session, manage_host_ca_filepath) + if api_key is None: + url = MANAGE_URL + '/maximo/api/os/mxapiapikey?ccm=1&lean=1' + logger.info("generate_api_key_URL: " + url) + logger.info("generate_api_key_CERT" + str(client_cert)) + resp = "" + try: + # resp=requests.post(url , data ={'expiration':'-1', 'user_id': user_id}, cert=client_cert, verify=manage_host_ca_filepath) + headers = {'content-type': 'application/json'} + payload = {'expiration': '-1', 'userid': user_id} + resp = session.post(url, headers=headers, + json=payload, cert=client_cert, timeout=600, verify=manage_host_ca_filepath) + if resp.status_code <= 201: + api_key = get_api_key(user_id, client_cert, session, manage_host_ca_filepath) + logger.info(f"GENERATED MXAPIKEY for {user_id} is: " + api_key) + yield api_key + else: + logger.info("Failed to Create APIKEY: " + user_id) + logger.info(resp.status_code) + logger.info(resp.text) + yield None + + except Exception as ex: + logger.info("Something wrong here") + logger.info(ex) + else: + logger.info(f"RETRIEVED MXAPIKEY for {user_id} is: " + api_key) + yield api_key + + + def get_api_key(user_id, client_cert, session, manage_host_ca_filepath): + api_key = None + try: + get_api_key_url=f'''{MANAGE_URL}/maximo/api/os/mxapiapikey?lean=1&ccm=1&oslc.select=*&oslc.where=userid%3D%22{user_id}%22''' + logger.info("get APIKEY URL: " + get_api_key_url) + resp = session.get(get_api_key_url, cert=client_cert, timeout=600, verify=manage_host_ca_filepath) + if resp.status_code == 200: + data = resp.json() + api_key = data['member'][0]['apikey'] + else: + logger.info("Failed to Get APIKEY for: " + user_id) + logger.info(resp.status_code) + logger.info(resp.text) + except Exception as ex: + logger.info(ex) + return api_key + + #Variables + # Headers for API requests + @pytest.fixture + def headers(user_api_key): + headers = { + 'Accept': 'application/json', + 'apikey': user_api_key + } + yield headers + + + #***********************Test Functions*************** + #TC#1 Test to verify system info has Maximo IT + def test_system_info(manage_host_ca_filepath, headers): + response = requests.get(MANAGE_URL + '/maximo/api/systeminfo', headers=headers, verify=manage_host_ca_filepath) + # Check if the response code is 200 OK + assert response.status_code == 200, "System Info fetch failed." + + # Check if 'Maximo IT' is present in the response text + assert 'Maximo IT' in response.text, "'Maximo IT' not found in the response." + print("'Maximo IT' found in the response.") + + #TC#2 Test to verify Self Serve is installed + def test_self_serve_installed_insystem(manage_host_ca_filepath, headers): + + response = requests.get(f'{MANAGE_URL}/maximo/api/os/mxapimaxapp?oslc.select=*', headers=headers, verify=manage_host_ca_filepath) + print("Response Code ", response.status_code) + assert response.status_code == 200, "Failed to Fetch system information." + systemsinfo = response.json() + systemsinfo.get('oslc:responseInfo') + app_count = systemsinfo['oslc:responseInfo']['oslc:totalCount'] + + # iterate through the application list + for i in range(app_count): + j=0 + app_name = systemsinfo['rdfs:member'][i].get('spi:app') + if app_name == "SELFSERVE": + print("SELFSERVE : Installed in System") + j=1 + break + assert j == 1, "Self Serve is not installed" + + #TC#3 Test to verify Service View is installed + def test_service_view_installed_insystem(manage_host_ca_filepath, headers): + + response = requests.get(f'{MANAGE_URL}/maximo/api/os/mxapimaxapp?oslc.select=*', headers=headers, verify=manage_host_ca_filepath) + print("Response Code ", response.status_code) + assert response.status_code == 200, "Failed to Fetch system information." + systemsinfo = response.json() + systemsinfo.get('oslc:responseInfo') + app_count = systemsinfo['oslc:responseInfo']['oslc:totalCount'] + + # iterate through the application list + + for i in range(app_count): + j=0 + app_name = systemsinfo['rdfs:member'][i].get('spi:app') + if app_name == "SERVICEVIEW": + print("SERVICEVIEW : Installed in System") + j=1 + break + assert j == 1, "SERVICEVIEW is not installed" + + + +--- +kind: ConfigMap +apiVersion: v1 +metadata: + name: {{ $record_cm_name }} + namespace: {{ $ns }} + annotations: + argocd.argoproj.io/sync-wave: "604" + argocd.argoproj.io/hook: PostSync + argocd.argoproj.io/hook-delete-policy: BeforeHookCreation + labels: + type: mas-app-sanity-record +{{- if .Values.custom_labels }} +{{ .Values.custom_labels | toYaml | indent 4 }} +{{- end }} +immutable: false +data: + mas_app: "manage" + mas_catalog_version: "{{ .Values.mas_catalog_version }}" + test_passed: "unknown" + +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: {{ $job_name }} + namespace: {{ $ns }} + annotations: + argocd.argoproj.io/sync-wave: "605" + argocd.argoproj.io/hook: PostSync + argocd.argoproj.io/hook-delete-policy: HookSucceeded,BeforeHookCreation +{{- if .Values.custom_labels }} + labels: +{{ .Values.custom_labels | toYaml | indent 4 }} +{{- end }} +spec: + template: + metadata: + labels: + app: {{ $job_name }} +{{- if .Values.custom_labels }} +{{ .Values.custom_labels | toYaml | indent 8 }} +{{- end }} + spec: + imagePullSecrets: [] + containers: + - name: run + image: quay.io/ibmmas/cli:latest + imagePullPolicy: IfNotPresent + resources: + limits: + cpu: 200m + memory: 512Mi + requests: + cpu: 10m + memory: 64Mi + env: + - name: MAS_INSTANCE_ID + value: "{{ .Values.instance_id }}" + - name: MAS_WORKSPACE_ID + value: "{{ .Values.mas_workspace_id }}" + - name: MANAGE_NAMESPACE + value: "{{ .Values.mas_app_namespace }}" + - name: MAS_NAMESPACE + value: "mas-{{ .Values.instance_id }}-core" + - name: TEST_RECORD_CM + value: "{{ $record_cm_name }}" + volumeMounts: + - name: tests + mountPath: /tmp/tests + command: + - /bin/sh + - -c + - | + python -m venv .venv + source .venv/bin/activate + pip install -r /tmp/tests/requirements.txt + + set -o pipefail + echo "Running tests..." + pytest -v --junit-xml=junitxml_test_output.xml -o cache_dir=/tmp/__pycache__ /tmp/tests/tests.py 2>&1 | tee test_log.txt + if [[ $? -ne 0 ]]; then + TEST_PASSED=false + else + TEST_PASSED=true + fi + echo "Test Result Passed: $TEST_PASSED" + set +o pipefail + + set -e + echo "Updating $TEST_RECORD_CM configmap with test result" + oc set data cm $TEST_RECORD_CM test_passed=$TEST_PASSED + oc set data cm $TEST_RECORD_CM --from-file=junitxml_test_output.xml + oc set data cm $TEST_RECORD_CM --from-file=test_log.txt + + if [[ $TEST_PASSED == "false" ]]; then + echo "Test Result failed, exit 1" + exit 1 + fi + + restartPolicy: Never + serviceAccountName: {{ $sa_name }} + volumes: + - name: tests + configMap: + name: {{ $tests_cm_name }} + items: + - key: requirements.txt + path: requirements.txt + - key: tests.py + path: tests.py + defaultMode: 420 + optional: false + backoffLimit: 10 + +{{- end }} +{{- end }}