diff --git a/instance-applications/510-550-ibm-mas-suite-app-config/templates/04-postsync-manage-sanity.yaml b/instance-applications/510-550-ibm-mas-suite-app-config/templates/04-postsync-manage-sanity.yaml index 0c5effb2..16bdb94f 100644 --- a/instance-applications/510-550-ibm-mas-suite-app-config/templates/04-postsync-manage-sanity.yaml +++ b/instance-applications/510-550-ibm-mas-suite-app-config/templates/04-postsync-manage-sanity.yaml @@ -207,11 +207,14 @@ data: import os import urllib3 import requests + import certifi import random import logging + import tempfile import base64 from assertpy import assert_that import time + logger = logging.getLogger() authToken="" MANAGE_URL = "" @@ -235,7 +238,6 @@ data: asset="sanity"+str(random.randint(0,1000)) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) session = requests.Session() - session.verify = False mas_instance_id = os.getenv("MAS_INSTANCE_ID") if mas_instance_id is None: @@ -248,7 +250,7 @@ data: manageNamespace = f"mas-{mas_instance_id}-manage" masNamespace = f"mas-{mas_instance_id}-core" - manage_route_name = f"{mas_instance_id}-{mas_workspace_id}" + manage_route_name = f"{mas_instance_id}-manage-{mas_workspace_id}" @pytest.fixture(scope="session") def dyn_client(): @@ -325,6 +327,60 @@ data: except KeyError as e: assert False, f"Unable to determine Manage host; spec.host key not present in {manage_route_name}/{manageNamespace}: {manage_route}. Error details: {e}" + @pytest.fixture(scope="session") + def manage_host_ca_filepath(manage_route): + + # Read the certificate field from the Manage Route. + # This may include CA certificates that we need in order to trust the certificates presented by the external Manage endpoint. + try: + manage_route_certificate = manage_route['spec']['tls']['certificate'] + except KeyError as e: + pass + + # Read the caCertificate field from the Manage Route. + # This may include CA certificates that we need in order to trust the certificates presented by the external Manage endpoint. + try: + manage_route_caCertificate = manage_route['spec']['tls']['caCertificate'] + except KeyError as e: + pass + + # Read the destinationCACertificate field from the Manage Route. + # This may include CA certificates that we need in order to trust the certificates presented by the internal Manage services. + try: + manage_route_destinationCACertificate = manage_route['spec']['tls']['destinationCACertificate'] + except KeyError as e: + pass + + # Load default CA bundle. This will include certs for well-known CAs. This ensures that we will + # trust the certificates presented by the external Manage endpoints when MAS is configured to use + # an external frontend like CIS. + with open(certifi.where(), 'rb') as default_ca: + default_ca_content = default_ca.read() + + # Combine all of the above into a single .pem file that we can use when issuing HTTP requests + chain_file = tempfile.NamedTemporaryFile(delete=False) + try: + + if manage_route_certificate: + chain_file.write(manage_route_certificate.encode()) + + if manage_route_caCertificate: + chain_file.write(manage_route_certificate.encode()) + + if manage_route_destinationCACertificate: + chain_file.write(manage_route_destinationCACertificate.encode()) + + chain_file.write(default_ca_content) + + chain_file.flush() + chain_file.close() + + yield chain_file.name + + finally: + os.remove(chain_file.name) + + # Obtain Manage Details def test_getManageComponents(dyn_client): manageWorkspaces = dyn_client.resources.get(api_version='apps.mas.ibm.com/v1', kind='ManageWorkspace') @@ -555,7 +611,7 @@ data: # execute_query(connection, update_query) # logger.info(f"Permissions for {application} applied successfully.") - def test_create_super_user_token(): + def test_create_super_user_token(manage_host_ca_filepath): url = MAS_URL+"logininitial" querystring = {"verify":"False"} @@ -567,12 +623,12 @@ data: "Content-Type": "application/json" } - response = requests.post(url, json=payload, headers=headers, params=querystring) + response = requests.post(url, json=payload, headers=headers, params=querystring, verify=manage_host_ca_filepath) global authToken authToken = response.json()["token"] logger.info(authToken) - def test_v3_create_user(): + def test_v3_create_user(manage_host_ca_filepath): # GET Same User first and verify whether it is already present or NOT! url = MAS_APIURL + "v3/users/" + manage_user querystring = {"lean": "1"} @@ -580,34 +636,34 @@ data: "Content-Type": "application/json", "x-access-token": authToken } - response = requests.get(url, headers=headers, params=querystring) + response = requests.get(url, headers=headers, params=querystring, verify=manage_host_ca_filepath) if response.status_code == 404: logger.info("User was not present and being created.....") - create_user() + create_user(manage_host_ca_filepath) elif response.status_code == 200: logger.info("User was already present, deleting first.....") # Keep deleting the user until a 404 status code is received while True: - delete_response = delete_user() + delete_response = delete_user(manage_host_ca_filepath) if delete_response.status_code == 404: break # create the user as its deleted now - create_user() + create_user(manage_host_ca_filepath) - def delete_user(): + def delete_user(manage_host_ca_filepath): url = MAS_APIURL + "v3/users/" + manage_user querystring = {"lean": "1"} headers = { "Content-Type": "application/json", "x-access-token": authToken } - response = requests.delete(url, headers=headers, params=querystring) + response = requests.delete(url, headers=headers, params=querystring, verify=manage_host_ca_filepath) logger.info(response.text) assert_that(response.status_code).is_in(204, 404) return response - def create_user(): + def create_user(manage_host_ca_filepath): url = MAS_APIURL + "v3/users" querystring = {"lean": "1"} payload = { @@ -642,11 +698,11 @@ data: "Content-Type": "application/json", "x-access-token": authToken } - response = requests.post(url, json=payload, headers=headers, params=querystring) + response = requests.post(url, json=payload, headers=headers, params=querystring, verify=manage_host_ca_filepath) logger.info(response.text) assert_that(response.status_code).is_equal_to(201) - def test_v3_user_link_to_idp(): + def test_v3_user_link_to_idp(manage_host_ca_filepath): url = MAS_APIURL+"v3/users/"+manage_user+"/idps/local" # url = "https://api.fvtsaas.ibmmasfvt.com/v3/users/saasuser/idps/local" querystring = {"emailPassword":"False"} payload = {"idpUserId": manage_user} @@ -654,43 +710,43 @@ data: "Content-Type": "application/json", "x-access-token": authToken } - response = requests.put(url, json=payload, headers=headers, params=querystring) + response = requests.put(url, json=payload, headers=headers, params=querystring, verify=manage_host_ca_filepath) logger.info(response.text) assert_that(response.status_code).is_equal_to(200) - def test_v3_user_add_user_to_workspace(): + def test_v3_user_add_user_to_workspace(manage_host_ca_filepath): url = MAS_APIURL+"workspaces/"+mas_workspace_id+"/users/"+manage_user #url = "https://api.fvtsaas.ibmmasfvt.com/workspaces/masdev/users/saasUser" payload = {"permissions": {"workspaceAdmin": False}} headers = { "Content-Type": "application/json", "x-access-token": authToken } - response = requests.put(url, json=payload, headers=headers) + response = requests.put(url, json=payload, headers=headers, verify=manage_host_ca_filepath) logger.info(response.text) assert_that(response.status_code).is_equal_to(200) - def test_v3_user_application_permission(): + def test_v3_user_application_permission(manage_host_ca_filepath): url = MAS_APIURL+"workspaces/"+mas_workspace_id+"/applications/manage/users/"+manage_user #url = "https://api.fvtsaas.ibmmasfvt.com/workspaces/masdev/applications/manage/users/saasUser" payload = {"role": "MANAGEUSER"} headers = { "Content-Type": "application/json", "x-access-token": authToken } - response = requests.put(url, json=payload, headers=headers) + response = requests.put(url, json=payload, headers=headers, verify=manage_host_ca_filepath) logger.info(response.text) assert_that(response.status_code).is_equal_to(200) - def test_check_user_sync(): + def test_check_user_sync(manage_host_ca_filepath): headers = {"Authorization": f"Bearer {authToken}"} - def get_user(): + def get_user(manage_host_ca_filepath): url = f"{MAS_APIURL}/v3/users/{manage_user}" - response = requests.get(url, headers=headers) + response = requests.get(url, headers=headers, verify=manage_host_ca_filepath) response.raise_for_status() # Raises an exception for HTTP errors return response.json() # Obtain user just created to check sync status - test_user = get_user() + test_user = get_user(manage_host_ca_filepath) # Wait for sync status to become SUCCESS within 10 minutes logger.debug("Waiting for user coordinator to update sync status") @@ -703,7 +759,7 @@ data: logger.debug(f"User sync has not been completed yet for app manage: {t_end - time.time():.2f} seconds remaining") time.sleep(5) # Re-check the user sync status - test_user = get_user() + test_user = get_user(manage_host_ca_filepath) # Verify the sync status is SUCCESS after timeout period assert test_user["applications"]["manage"]["sync"]["state"] == "SUCCESS" @@ -735,24 +791,24 @@ data: logger.info(cert_content) return client_cert - def test_generate_api_key_for_new_added_user(client_cert): + def test_generate_api_key_for_new_added_user(client_cert, manage_host_ca_filepath): #user_id="maxadmin" global user_api_key user_id=manage_user - api_key = get_api_key(user_id, client_cert, session) + api_key = get_api_key(user_id, client_cert, session, manage_host_ca_filepath) if api_key is None: url = MANAGE_URL + '/maximo/api/os/mxapiapikey?ccm=1&lean=1' logger.info("generate_api_key_URL: " + url) logger.info("generate_api_key_CERT" + str(client_cert)) resp = "" try: - # resp=requests.post(url , data ={'expiration':'-1', 'user_id': user_id}, cert=client_cert) + # resp=requests.post(url , data ={'expiration':'-1', 'user_id': user_id}, cert=client_cert, verify=manage_host_ca_filepath) headers = {'content-type': 'application/json'} payload = {'expiration': '-1', 'userid': user_id} resp = session.post(url, headers=headers, - json=payload, cert=client_cert, timeout=600) + json=payload, cert=client_cert, timeout=600, verify=manage_host_ca_filepath) if resp.status_code <= 201: - api_key = get_api_key(user_id, client_cert, session) + api_key = get_api_key(user_id, client_cert, session, manage_host_ca_filepath) logger.info(f"GENERATED MXAPIKEY for {user_id} is: " + api_key) user_api_key = api_key return api_key @@ -770,22 +826,22 @@ data: user_api_key = api_key return api_key - def test_get_api_key_for_admin(client_cert): + def test_get_api_key_for_admin(client_cert, manage_host_ca_filepath): global admin_api_key - api_key = get_api_key(user_id, client_cert, session) + api_key = get_api_key(user_id, client_cert, session, manage_host_ca_filepath) if api_key is None: url = MANAGE_URL + '/maximo/api/os/mxapiapikey?ccm=1&lean=1' logger.info("generate_api_key_URL: " + url) logger.info("generate_api_key_CERT" + str(client_cert)) resp = "" try: - # resp=requests.post(url , data ={'expiration':'-1', 'user_id': user_id}, cert=client_cert) + # resp=requests.post(url , data ={'expiration':'-1', 'user_id': user_id}, cert=client_cert, verify=manage_host_ca_filepath) headers = {'content-type': 'application/json'} payload = {'expiration': '-1', 'userid': user_id} resp = session.post(url, headers=headers, - json=payload, cert=client_cert, timeout=600) + json=payload, cert=client_cert, timeout=600, verify=manage_host_ca_filepath) if resp.status_code <= 201: - api_key = get_api_key(user_id, client_cert, session) + api_key = get_api_key(user_id, client_cert, session, manage_host_ca_filepath) logger.info(f"GENERATED MXAPIKEY for {user_id} is: " + api_key) admin_api_key = api_key return api_key @@ -804,12 +860,12 @@ data: return api_key - def get_api_key(user_id, client_cert, session): + def get_api_key(user_id, client_cert, session, manage_host_ca_filepath): api_key = None try: get_api_key_url=f'''{MANAGE_URL}/maximo/api/os/mxapiapikey?lean=1&ccm=1&oslc.select=*&oslc.where=userid%3D%22{user_id}%22''' logger.info("get APIKEY URL: " + get_api_key_url) - resp = session.get(get_api_key_url, cert=client_cert, timeout=600) + resp = session.get(get_api_key_url, cert=client_cert, timeout=600, verify=manage_host_ca_filepath) if resp.status_code == 200: data = resp.json() api_key = data['member'][0]['apikey'] @@ -821,14 +877,14 @@ data: logger.info(ex) return api_key - def test_add_new_user_to_admin_group(): - groupid= get_group_id(user_id, session) + def test_add_new_user_to_admin_group(manage_host_ca_filepath): + groupid = get_group_id(user_id, session, manage_host_ca_filepath) user_group_url=f'''{MANAGE_URL}/maximo/api/os/mxapigroup/{groupid}?lean=1''' payload = { "groupuser": [ { - "userid": f"{manage_user}" + "userid": f"{manage_user}" } ] } @@ -838,16 +894,16 @@ data: "patchtype": "MERGE", "apikey": admin_api_key } - response = session.post(user_group_url, json=payload, headers=headers) + response = session.post(user_group_url, json=payload, headers=headers, verify=manage_host_ca_filepath) assert_that(response.status_code).is_equal_to(204) - def get_group_id(user_id, session): + def get_group_id(user_id, session, manage_host_ca_filepath): group_url = f'''{MANAGE_URL}/maximo/api/os/mxapigroup?lean=1&oslc.select=*&oslc.where=groupname%3D%22{user_id}%22''' headers = { "Content-Type": "application/json", "apikey": admin_api_key } - response = session.get(group_url, headers=headers) + response = session.get(group_url, headers=headers, verify=manage_host_ca_filepath) if response.status_code == 200: data = response.json() group_id = data['member'][0]['maxgroupid'] @@ -856,31 +912,31 @@ data: ###ASSET Object CRUD Operation using Newly created user as User is now added to maxadmin group - def test_asset_crud_operation_using_new_added_user(): + def test_asset_crud_operation_using_new_added_user(manage_host_ca_filepath): #Check if asset exists, delete it if present - assetid = get_asset(asset,session) + assetid = get_asset(asset, session, manage_host_ca_filepath) if assetid is not None: logger.info("assetid not none") delete_asset(assetid,session) # Create Asset - create_asset(asset,session) + create_asset(asset, session, manage_host_ca_filepath) # Get Asset id for newly created Asset time.sleep(20) - assetid = get_asset(asset,session) + assetid = get_asset(asset, session, manage_host_ca_filepath) logger.info(f"ASSETID+{assetid}") time.sleep(20) # Update newly created Asset - update_asset(assetid,session) + update_asset(assetid, session, manage_host_ca_filepath) # Delete newly created Asset - delete_asset(assetid,session) + delete_asset(assetid, session, manage_host_ca_filepath) - def get_asset(asset,session): + def get_asset(asset, session, manage_host_ca_filepath): asset_url = f'''{MANAGE_URL}/maximo/api/os/mxapiasset?lean=1&oslc.select=assetnum&oslc.where=assetnum%3D%22{asset}%22''' headers = { "Content-Type": "application/json", "apikey": user_api_key } - response = session.get(asset_url, headers=headers) + response = session.get(asset_url, headers=headers, verify=manage_host_ca_filepath) assert_that(response.status_code).is_equal_to(200) # Check if the member array is empty @@ -893,7 +949,7 @@ data: # Asset does not exist return None - def create_asset(asset,session): + def create_asset(asset, session, manage_host_ca_filepath): asset_url = f'''{MANAGE_URL}/maximo/api/os/mxapiasset?lean=1''' payload = { "_action": "AddChange", @@ -906,10 +962,10 @@ data: "Content-Type": "application/json", "apikey": user_api_key } - response = session.post(asset_url, json=payload, headers=headers) + response = session.post(asset_url, json=payload, headers=headers, verify=manage_host_ca_filepath) assert_that(response.status_code).is_equal_to(201) - def update_asset(assetid,session): + def update_asset(assetid, session, manage_host_ca_filepath): asset_url = MANAGE_URL+"/maximo/api/os/mxapiasset/"+assetid payload = { "_action": "AddChange", @@ -922,11 +978,11 @@ data: "patchtype": "MERGE", "apikey": user_api_key } - response = session.post(asset_url, json=payload, headers=headers, params=querystring) + response = session.post(asset_url, json=payload, headers=headers, params=querystring, verify=manage_host_ca_filepath) logger.info(response.text) assert_that(response.status_code).is_equal_to(204) - def delete_asset(assetid,session): + def delete_asset(assetid, session, manage_host_ca_filepath): asset_url = MANAGE_URL+"/maximo/api/os/mxapiasset/"+assetid querystring = {"lean":"1"} headers = { @@ -935,7 +991,7 @@ data: "patchtype": "MERGE", "apikey": user_api_key } - response = session.delete(asset_url, headers=headers, params=querystring) + response = session.delete(asset_url, headers=headers, params=querystring, verify=manage_host_ca_filepath) assert_that(response.status_code).is_equal_to(204)