From 27f34dc1deb26378b0be6c8c1bf2c9ca3a3d5dd3 Mon Sep 17 00:00:00 2001 From: grindsa Date: Sat, 23 Nov 2024 09:06:16 +0000 Subject: [PATCH] [fix] revocation in nclm_ca_handler --- examples/ca_handler/ejbca_ca_handler.py | 2 +- examples/ca_handler/nclm_ca_handler.py | 288 ++-- test/test_nclm_ca_handler.py | 1766 +++++++---------------- 3 files changed, 724 insertions(+), 1332 deletions(-) diff --git a/examples/ca_handler/ejbca_ca_handler.py b/examples/ca_handler/ejbca_ca_handler.py index 9482ed75..6b15389e 100644 --- a/examples/ca_handler/ejbca_ca_handler.py +++ b/examples/ca_handler/ejbca_ca_handler.py @@ -231,7 +231,7 @@ def _status_get(self) -> Dict[str, str]: self.logger.error('CAhandler._ca_get() returned error: %s', str(err_)) api_response = {'status': 'nok', 'error': str(err_)} else: - self.logger.error('CAhandler._status_get(): api_host parameter is misisng in configuration') + self.logger.error('CAhandler._status_get(): api_host parameter is missing in configuration') api_response = {} self.logger.debug('CAhandler._status_get() ended') diff --git a/examples/ca_handler/nclm_ca_handler.py b/examples/ca_handler/nclm_ca_handler.py index 53229c29..e3c75334 100644 --- a/examples/ca_handler/nclm_ca_handler.py +++ b/examples/ca_handler/nclm_ca_handler.py @@ -4,10 +4,10 @@ import os import time import json -from typing import Tuple, Dict +from typing import List, Tuple, Dict import requests # pylint: disable=e0401, r0913 -from acme_srv.helper import load_config, build_pem_file, b64_encode, b64_url_recode, convert_string_to_byte, cert_serial_get, uts_now, parse_url, proxy_check, error_dic_get, uts_to_date_utc +from acme_srv.helper import load_config, build_pem_file, b64_encode, b64_url_recode, convert_string_to_byte, cert_serial_get, uts_now, parse_url, proxy_check, error_dic_get, uts_to_date_utc, header_info_get, eab_profile_header_info_check, config_eab_profile_load, config_headerinfo_load class CAhandler(object): @@ -28,6 +28,9 @@ def __init__(self, _debug=None, logger=None): self.wait_interval = 5 self.proxy = None self.request_timeout = 20 + self.header_info_field = False + self.eab_handler = None + self.eab_profiling = False def __enter__(self): """ Makes CAhandler a Context Manager """ @@ -83,7 +86,6 @@ def _ca_policylink_id_lookup(self) -> int: self.logger.debug('CAhandler._ca_policylink_id_lookup()') # query CAs - # ca_list = requests.get(self.api_host + f'/policy/ca?entityRef=CONTAINER&entityId={self.container_info_dic["id"]}&allowedOnly=true&withTemplateById=0&enrollWithImportedCSR=true&csrHasPrivateKey=false&csrTemplateVersion=0', headers=self.headers, verify=self.ca_bundle, proxies=self.proxy, timeout=self.request_timeout).json() ca_list = requests.get(f'{self.api_host}{self.api_version}/containers/{self.container_info_dic["id"]}/issuers', headers=self.headers, verify=self.ca_bundle, proxies=self.proxy, timeout=self.request_timeout).json() if 'items' in ca_list: ca_id = self._ca_id_get(ca_list) @@ -98,6 +100,54 @@ def _ca_policylink_id_lookup(self) -> int: self.logger.debug('CAhandler._ca_policylink_id_lookup() ended with: %s', ca_id) return ca_id + def _cert_enroll(self, csr: str, policylink_id: int) -> Tuple[str, str, str]: + """ enroll operation """ + self.logger.debug('CAhandler._cert_enroll()') + + error = None + cert_bundle = None + cert_raw = None + cert_id = None + + # post csr + job_id = self._csr_post(csr, policylink_id) + + if job_id: + cert_id = self._cert_id_get(job_id) + if cert_id: + (error, cert_bundle, cert_raw) = self._cert_bundle_build(cert_id) + else: + self.logger.error('CAhandler.eroll(): certifcate_id lookup failed for job: %s', job_id) + error = 'Certifcate_id lookup failed' + else: + self.logger.error('CAhandler.eroll(): job_id lookup failed for job') + error = 'job_id lookup failed' + + self.logger.debug('CAhandler._cert_enroll() ended with error: %s', error) + return (error, cert_bundle, cert_raw, cert_id) + + def _csr_post(self, csr: str, policylink_id: int) -> Dict[str, str]: + """ post csr """ + self.logger.debug('CAhandler._csr_post()') + + job_id = None + # build_pem_file + csr = build_pem_file(self.logger, None, csr, 64, True) + csr = b64_encode(self.logger, convert_string_to_byte(csr)) + data_dic = {'allowDuplicateCn': True, 'request': {'pkcs10': csr}} + + # add template if correctly configured + if 'id' in self.template_info_dic and self.template_info_dic['id']: + data_dic['template'] = {'id': self.template_info_dic['id']} + + response = self._api_post(f"{self.api_host}{self.api_version}/containers/{self.container_info_dic['id']}/issuers/{policylink_id}/csr", data_dic) + + if 'id' in response: + job_id = response['id'] + + self.logger.debug('CAhandler._csr_post() ended with: %s', job_id) + return job_id + def _issuer_certid_get(self, cert_dic: Tuple[str, str]) -> Tuple[str, bool]: """ get cert id of issuer """ self.logger.debug('CAhandler._issuer_certid_get()') @@ -167,6 +217,51 @@ def _cert_id_get(self, job_id: int) -> int: self.logger.debug('CAhandler._cert_id_get() ended with: %s', cert_id) return cert_id + def _certid_get_from_serial(self, cert_raw: str) -> List[str]: + """ get certificates """ + self.logger.debug('CAhandler._certid_get_from_serial()') + + cert_serial = cert_serial_get(self.logger, cert_raw, hexformat=True) + + # search for certificate + try: + cert_list = requests.get(f"{self.api_host}{self.api_version}/certificates?freeText=={cert_serial}&containerId={self.container_info_dic['id']}", headers=self.headers, verify=self.ca_bundle, proxies=self.proxy, timeout=self.request_timeout).json() + except Exception as err_: + self.logger.error('CAhandler._certid_get_from_serial(): request get aborted with err: %s', err_) + cert_list = [] + + if cert_list and 'items' in cert_list and len(cert_list['items']) > 0 and 'id' in cert_list['items'][0]: + cert_id = cert_list['items'][0]['id'] + else: + cert_id = None + self.logger.error('CAhandler._certid_get_from_serial(): no certificate found for serial: %s', cert_serial) + + self.logger.debug('CAhandler._certid_get_from_serial() ended with code: %s', cert_id) + return cert_id + + def _cert_id_lookup(self, cert_raw: str) -> int: + """ get tracking id """ + self.logger.debug('CAhandler._cert_id_lookup()') + + cert_id = None + + # we misuse header_info_get() to get the tracking id from database + cert_recode = b64_url_recode(self.logger, cert_raw) + pid_list = header_info_get(self.logger, csr=cert_recode, vlist=['poll_identifier'], field_name='cert_raw') + + for ele in pid_list: + if 'poll_identifier' in ele: + cert_id = ele['poll_identifier'] + break + + if not cert_id: + # lookup through NCLM API + self.logger.info('CAhandler._cert_id_lookup(): cert_id not found in database. Lookup trough NCLM API') + cert_id = self._certid_get_from_serial(cert_raw) + + self.logger.debug('CAhandler._cert_id_lookup() ended with %s', cert_id) + return cert_id + def _config_api_access_check(self): """ check config for consitency """ self.logger.debug('CAhandler._config_api_access_check()') @@ -316,9 +411,34 @@ def _config_load(self): self._config_timer_load(config_dic) self._config_proxy_load(config_dic) + # load profiling + self.eab_profiling, self.eab_handler = config_eab_profile_load(self.logger, config_dic) + # load header info + self.header_info_field = config_headerinfo_load(self.logger, config_dic) self.logger.debug('CAhandler._config_load() ended') + def _container_id_lookup(self): + """ get target system id based on name """ + self.logger.debug('CAhandler._container_id_lookup() for tsg: %s', self.container_info_dic['name']) + try: + tsg_list = requests.get(self.api_host + '/containers?freeText=' + str(self.container_info_dic['name']) + '&offset=0&limit=50&fetchPath=true', headers=self.headers, verify=self.ca_bundle, proxies=self.proxy, timeout=self.request_timeout).json() + except Exception as err_: + self.logger.error('CAhandler._container_id_lookup() returned error: %s', err_) + tsg_list = [] + + if 'items' in tsg_list: + for tsg in tsg_list['items']: + if 'name' in tsg and 'id' in tsg: + if self.container_info_dic['name'] == tsg['name']: + self.container_info_dic['id'] = tsg['id'] + break + else: + self.logger.error('CAhandler._container_id_lookup() incomplete response: %s', tsg) + else: + self.logger.error('CAhandler._container_id_lookup() no target-system-groups found for filter: %s.', self.container_info_dic['name']) + self.logger.debug('CAhandler._container_id_lookup() ended with: %s', str(self.container_info_dic['id'])) + def _login(self): """ _login into NCLM API """ self.logger.debug('CAhandler._login()') @@ -344,20 +464,48 @@ def _login(self): _realms = json_dic.get('realms', None) self.logger.debug('login response:\n user: %s\n token: %s\n realms: %s\n', _username, json_dic['access_token'], _realms) else: - self.logger.error('CAhandler._login(): No token returned. Aborting...') + self.logger.error('CAhandler._login(): No token returned. Aborting.') else: self.logger.error('CAhandler._login() error during post: %s', api_response.status_code) else: # If response code is not ok (200), print the resulting http error code with description self.logger.error('CAhandler._login() error during get: %s', api_response.status_code) + def _revocation_status_poll(self, job_id: int, err_dic: Dict[str, str]) -> Tuple[int, str, str]: + """ poll status of revocation job """ + self.logger.debug('CAhandler._revocation_status_poll()') + + cnt = 0 + while cnt < 10: + response = requests.get(f"{self.api_host}{self.api_version}/jobs/{job_id}", headers=self.headers, verify=self.ca_bundle, proxies=self.proxy, timeout=self.request_timeout).json() + if 'status' in response and response['status'] in ['done', 'failed']: + if response['status'] == 'done': + code = 200 + message = None + detail = None + elif response['status'] == 'failed': + code = 500 + message = err_dic['serverinternal'] + detail = 'Revocation operation failed: error from API' + break + time.sleep(self.wait_interval) + cnt += 1 + + if cnt == 10: + code = 500 + message = err_dic['serverinternal'] + detail = 'Revocation operation failed: Timeout' + + self.logger.debug('CAhandler._revocation_status_poll() ended with: %s', code) + return (code, message, detail) + def _template_list_get(self, ca_id: int) -> Dict[str, str]: """ get list of templates """ self.logger.debug('CAhandler._template_list_get(%s)', ca_id) try: template_list = requests.get(f"{self.api_host}{self.api_version}/containers/{self.container_info_dic['id']}/issuers/{ca_id}/templates", headers=self.headers, verify=self.ca_bundle, proxies=self.proxy, timeout=self.request_timeout).json() except Exception as err_: - self.logger.error('CAhandler._template_id_lookup() returned error: %s', err_) + self.logger.error('CAhandler._template_list_get() returned error: %s', err_) template_list = [] if 'items' in template_list: @@ -389,74 +537,27 @@ def _template_id_lookup(self, ca_id: int): if 'items' in template_list: self._templates_enumerate(template_list) else: - self.logger.error('CAhandler._template_id_lookup() no templates found for filter: %s...', self.template_info_dic['name']) + self.logger.error('CAhandler._template_id_lookup() no templates found for filter: %s.', self.template_info_dic['name']) self.logger.debug('CAhandler._template_id_lookup() ended with: %s', str(self.template_info_dic['id'])) - def _container_id_lookup(self): - """ get target system id based on name """ - self.logger.debug('CAhandler._container_id_lookup() for tsg: %s', self.container_info_dic['name']) - try: - tsg_list = requests.get(self.api_host + '/containers?freeText=' + str(self.container_info_dic['name']) + '&offset=0&limit=50&fetchPath=true', headers=self.headers, verify=self.ca_bundle, proxies=self.proxy, timeout=self.request_timeout).json() - except Exception as err_: - self.logger.error('CAhandler._container_id_lookup() returned error: %s', err_) - tsg_list = [] - - if 'items' in tsg_list: - for tsg in tsg_list['items']: - if 'name' in tsg and 'id' in tsg: - if self.container_info_dic['name'] == tsg['name']: - self.container_info_dic['id'] = tsg['id'] - break - else: - self.logger.error('CAhandler._container_id_lookup() incomplete response: %s', tsg) - else: - self.logger.error('CAhandler._container_id_lookup() no target-system-groups found for filter: %s...', self.container_info_dic['name']) - self.logger.debug('CAhandler._container_id_lookup() ended with: %s', str(self.container_info_dic['id'])) - - def _csr_post(self, csr: str, policylink_id: int) -> Dict[str, str]: - """ post csr """ - self.logger.debug('CAhandler._csr_post()') - - job_id = None - # build_pem_file - csr = build_pem_file(self.logger, None, csr, 64, True) - csr = b64_encode(self.logger, convert_string_to_byte(csr)) - data_dic = {'allowDuplicateCn': True, 'request': {'pkcs10': csr}} - - # add template if correctly configured - if 'id' in self.template_info_dic and self.template_info_dic['id']: - data_dic['template'] = {'id': self.template_info_dic['id']} - - response = self._api_post(f"{self.api_host}{self.api_version}/containers/{self.container_info_dic['id']}/issuers/{policylink_id}/csr", data_dic) - - if 'id' in response: - job_id = response['id'] - - self.logger.debug('CAhandler._csr_post() ended with: %s', job_id) - return job_id - - def _cert_enroll(self, csr: str, policylink_id: int) -> Tuple[str, str, str]: - """ enroll operation """ - self.logger.debug('CAhandler._cert_enroll()') + def _enroll(self, csr: str, ca_id: int) -> Tuple[str, str, str, str]: + """ enroll certificate from NCLM """ + self.logger.debug('CAhandler._enroll()') error = None cert_bundle = None cert_raw = None cert_id = None - # post csr - job_id = self._csr_post(csr, policylink_id) - - if job_id: - cert_id = self._cert_id_get(job_id) - - if cert_id: - (error, cert_bundle, cert_raw) = self._cert_bundle_build(cert_id) + if ca_id and self.container_info_dic['id']: + # enroll operation + (error, cert_bundle, cert_raw, cert_id) = self._cert_enroll(csr, ca_id) else: - self.logger.error('CAhandler.eroll(): certifcate_id lookup failed for job: %s', job_id) + error = f'Enrollment aborted. ca: {ca_id}, tsg_id: {self.container_info_dic["id"]}' + self.logger.error('CAhandler.eroll(): Enrollment aborted. ca_id: %s, container: %s', ca_id, self.container_info_dic['id']) - self.logger.debug('CAhandler._cert_enroll() ended with error: %s', error) + self.logger.debug('CAhandler._enroll() ended with: %s', error) return (error, cert_bundle, cert_raw, cert_id) def enroll(self, csr: str) -> Tuple[str, str, str, str]: @@ -480,15 +581,16 @@ def enroll(self, csr: str) -> Tuple[str, str, str, str]: if ca_id and self.template_info_dic['name'] and not self.template_info_dic['id']: self._template_id_lookup(ca_id) - if ca_id and self.container_info_dic['id']: - # enroll operation - (error, cert_bundle, cert_raw, cert_id) = self._cert_enroll(csr, ca_id) + # check for eab profiling and header_info + error = eab_profile_header_info_check(self.logger, self, csr, 'profile_id') + if not error: + (error, cert_bundle, cert_raw, cert_id) = self._enroll(csr, ca_id) else: - error = f'enrollment aborted. ca: {ca_id}, tsg_id: {self.container_info_dic["id"]}' - self.logger.error('CAhandler.eroll(): enrollment aborted. ca_id: %s, tsg_id: %s', ca_id, self.container_info_dic['id']) + self.logger.error('CAhandler.eroll(): EAB profile lookup failed with error: %s', error) else: - error = f'CAhandler.eroll(): ID lookup for targetSystemGroup "{self.container_info_dic["name"]}" failed.' + error = f'CAhandler.eroll(): ID lookup for container"{self.container_info_dic["name"]}" failed.' else: + error = self.error self.logger.error(self.error) self.logger.debug('CAhandler.enroll() ended') @@ -510,40 +612,28 @@ def revoke(self, cert: str, rev_reason: str = 'unspecified', rev_date: str = uts """ revoke certificate """ self.logger.debug('CAhandler.revoke()') - # get serial from pem file and convert to formated hex - serial = f'{cert_serial_get(self.logger, cert, hexformat=True)}' - hex_serial = ':'.join(serial[i:i + 2] for i in range(0, len(serial), 2)) + err_dic = error_dic_get(self.logger) - # search for certificate - try: - cert_list = requests.get(self.api_host + '/certificates?freeText==' + str(hex_serial) + '&stateCurrent=false&stateHistory=false&stateWaiting=false&stateManual=false&stateUnattached=false&expiresAfter=%22%22&expiresBefore=%22%22&sortAttribute=createdAt&sortOrder=desc&containerId=' + str(self.container_info_dic['id']), headers=self.headers, verify=self.ca_bundle, proxies=self.proxy, timeout=self.request_timeout).json() - except Exception as err_: - self.logger.error('CAhandler.revoke(): request get aborted with err: %s', err_) - cert_list = [] + code = 500 + message = err_dic['serverinternal'] + detail = 'Revocation operation failed' - err_dic = error_dic_get(self.logger) - if 'certificates' in cert_list: - try: - cert_id = cert_list['certificates'][0]['certificateId'] - data_dic = {'reason': rev_reason, 'time': rev_date} - try: - detail = self._api_post(self.api_host + '/certificates/' + str(cert_id) + '/revocationrequest', data_dic) - code = 200 - message = None - except Exception as err: - self.logger.error('CAhandler.revoke(): _api_post got aborted with err: %s', err) - code = 500 - message = err_dic['serverinternal'] - detail = 'Revocation operation failed' - except Exception: - code = 404 - message = err_dic['serverinternal'] - detail = 'CertificateID could not be found' - else: - code = 404 - message = err_dic['serverinternal'] - detail = 'Cert could not be found' + # get tracking id as input for revocation call + cert_id = self._cert_id_lookup(cert) + + if cert_id: + data_dic = {'reason': rev_reason, 'time': rev_date} + response = self._api_post(f"{self.api_host}{self.api_version}/certificates/{cert_id}/revoke", data_dic) + if 'urls' in response and 'job' in response['urls']: + job_id = response['urls']['job'].replace('/v2/jobs/', '') + else: + job_id = None + self.logger.error('CAhandler.revoke(): job_id lookup failed for certificate: %s', cert_id) + + if job_id: + (code, message, detail) = self._revocation_status_poll(job_id, err_dic) + self.logger.debug('CAhandler.revoke() ended with: %s', code) return (code, message, detail) def trigger(self, _payload: str) -> Tuple[str, str, str]: diff --git a/test/test_nclm_ca_handler.py b/test/test_nclm_ca_handler.py index acb2b990..ee83c2c6 100644 --- a/test/test_nclm_ca_handler.py +++ b/test/test_nclm_ca_handler.py @@ -55,335 +55,6 @@ def test_004__api_post(self, mock_req): mockresponse.json = Exception('json_exc') self.assertEqual({'status': 'status_code'}, self.cahandler._api_post('url', 'data')) - @patch('requests.get') - def test_005_ca_id_lookup(self, mock_req): - """ CAhandler._ca_id_lookup() returns nothing """ - self.cahandler.api_host = 'api_host' - self.cahandler.ca_name = 'ca_name' - self.cahandler.headers = 'headers' - mockresponse = Mock() - mock_req.return_value = mockresponse - mockresponse.json = lambda: {} - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.assertFalse(self.cahandler._ca_id_lookup()) - self.assertIn('ERROR:test_a2c:ca_id.lookup() no CAs found in response ...', lcm.output) - - @patch('requests.get') - def test_006_ca_id_lookup(self, mock_req): - """ CAhandler._ca_id_lookup() returns wrong data """ - self.cahandler.api_host = 'api_host' - self.cahandler.ca_name = 'ca_name' - self.cahandler.headers = 'headers' - mockresponse = Mock() - mock_req.return_value = mockresponse - mockresponse.json = lambda: {'foo': 'bar'} - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.assertFalse(self.cahandler._ca_id_lookup()) - self.assertIn('ERROR:test_a2c:ca_id.lookup() no CAs found in response ...', lcm.output) - - @patch('requests.get') - def test_007_ca_id_lookup(self, mock_req): - """ CAhandler._ca_id_lookup() returns wrong data """ - self.cahandler.api_host = 'api_host' - self.cahandler.ca_name = 'ca_name' - self.cahandler.headers = 'headers' - mockresponse = Mock() - mock_req.return_value = mockresponse - mockresponse.json = lambda: {'foo': 'bar'} - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.assertFalse(self.cahandler._ca_id_lookup()) - self.assertIn('ERROR:test_a2c:ca_id.lookup() no CAs found in response ...', lcm.output) - - @patch('requests.get') - def test_008_ca_id_lookup(self, mock_req): - """ CAhandler._ca_id_lookup() returns empty ca-list """ - self.cahandler.api_host = 'api_host' - self.cahandler.ca_name = 'ca_name' - self.cahandler.headers = 'headers' - mockresponse = Mock() - mock_req.return_value = mockresponse - mockresponse.json = lambda: {'CAs': []} - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.assertFalse(self.cahandler._ca_id_lookup()) - self.assertIn('ERROR:test_a2c:_ca_id_lookup(): no ca id found for ca_name', lcm.output) - - @patch('requests.get') - def test_009_ca_id_lookup(self, mock_req): - """ CAhandler._ca_id_lookup() returns wrong ca-list """ - self.cahandler.api_host = 'api_host' - self.cahandler.ca_name = 'ca_name' - self.cahandler.headers = 'headers' - mockresponse = Mock() - mock_req.return_value = mockresponse - mockresponse.json = lambda: {'CAs': [{'foo': 'foo'}]} - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.assertFalse(self.cahandler._ca_id_lookup()) - self.assertIn('ERROR:test_a2c:_ca_id_lookup(): no ca id found for ca_name', lcm.output) - - @patch('requests.get') - def test_010_ca_id_lookup(self, mock_req): - """ CAhandler._ca_id_lookup() returns ca-list with name not matching """ - self.cahandler.api_host = 'api_host' - self.cahandler.ca_name = 'ca_name' - self.cahandler.headers = 'headers' - mockresponse = Mock() - mock_req.return_value = mockresponse - mockresponse.json = lambda: {'CAs': [{'name': 'foo'}]} - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.assertFalse(self.cahandler._ca_id_lookup()) - self.assertIn('ERROR:test_a2c:_ca_id_lookup(): no ca id found for ca_name', lcm.output) - - @patch('requests.get') - def test_011_ca_id_lookup(self, mock_req): - """ CAhandler._ca_id_lookup() returns ca-list with name matching but no id """ - self.cahandler.api_host = 'api_host' - self.cahandler.ca_name = 'ca_name' - self.cahandler.headers = 'headers' - mockresponse = Mock() - mock_req.return_value = mockresponse - mockresponse.json = lambda: {'CAs': [{'name': 'ca_name'}]} - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.assertFalse(self.cahandler._ca_id_lookup()) - self.assertIn('ERROR:test_a2c:_ca_id_lookup(): no ca id found for ca_name', lcm.output) - - @patch('requests.get') - def test_012_ca_id_lookup(self, mock_req): - """ CAhandler._ca_id_lookup() returns ca-list with name matching and id """ - self.cahandler.api_host = 'api_host' - self.cahandler.ca_name = 'ca_name' - self.cahandler.headers = 'headers' - mockresponse = Mock() - mock_req.return_value = mockresponse - mockresponse.json = lambda: {'CAs': [{'name': 'ca_name', 'id': 'id'}]} - self.assertEqual('id', self.cahandler._ca_id_lookup()) - - @patch('requests.get') - def test_013_ca_id_lookup(self, mock_req): - """ CAhandler._ca_id_lookup() returns ca-list with desc not matching """ - self.cahandler.api_host = 'api_host' - self.cahandler.ca_name = 'ca_name' - self.cahandler.headers = 'headers' - mockresponse = Mock() - mock_req.return_value = mockresponse - mockresponse.json = lambda: {'CAs': [{'desc': 'foo'}]} - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.assertFalse(self.cahandler._ca_id_lookup()) - self.assertIn('ERROR:test_a2c:_ca_id_lookup(): no ca id found for ca_name', lcm.output) - - @patch('requests.get') - def test_014_ca_id_lookup(self, mock_req): - """ CAhandler._ca_id_lookup() returns ca-list with desc matching but no id """ - self.cahandler.api_host = 'api_host' - self.cahandler.ca_name = 'ca_name' - self.cahandler.headers = 'headers' - mockresponse = Mock() - mock_req.return_value = mockresponse - mockresponse.json = lambda: {'CAs': [{'desc': 'ca_name'}]} - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.assertFalse(self.cahandler._ca_id_lookup()) - self.assertIn('ERROR:test_a2c:_ca_id_lookup(): no ca id found for ca_name', lcm.output) - - @patch('requests.get') - def test_015_ca_id_lookup(self, mock_req): - """ CAhandler._ca_id_lookup() returns ca-list with desc matching and id """ - self.cahandler.api_host = 'api_host' - self.cahandler.ca_name = 'ca_name' - self.cahandler.headers = 'headers' - mockresponse = Mock() - mock_req.return_value = mockresponse - mockresponse.json = lambda: {'CAs': [{'desc': 'ca_name', 'id': 'id'}]} - self.assertEqual('id', self.cahandler._ca_id_lookup()) - - @patch('requests.get') - def test_016_ca_id_lookup(self, mock_get): - """ CAhandler._cert_bundle_build() returns everything with one ca cert """ - self.cahandler.api_host = 'api_host' - mockresponse1 = Mock() - mockresponse1.json = lambda: {'certificate': {'der': 'der', 'pem': 'pem', 'issuerInfo': {'id': 'id'}}} - mockresponse2 = Mock() - mockresponse2.json = lambda: {'certificate': {'pem': 'pemca'}} - mock_get.side_effect = [mockresponse1, mockresponse2] - self.assertEqual((None, 'pempemca', 'der'), self.cahandler._cert_bundle_build('foo')) - - @patch('requests.get') - def test_017_ca_id_lookup(self, mock_get): - """ CAhandler._cert_bundle_build() returns everything with two ca cert """ - self.cahandler.api_host = 'api_host' - mockresponse1 = Mock() - mockresponse1.json = lambda: {'certificate': {'der': 'der', 'pem': 'pem', 'issuerInfo': {'id': 'id1'}}} - mockresponse2 = Mock() - mockresponse2.json = lambda: {'certificate': {'pem': 'pemca1', 'issuerInfo': {'id': 'id'}}} - mockresponse3 = Mock() - mockresponse3.json = lambda: {'certificate': {'pem': 'pemca2', 'issuerInfo': {'id': 'id'}}} - mock_get.side_effect = [mockresponse1, mockresponse2, mockresponse3] - self.assertEqual((None, 'pempemca1pemca2', 'der'), self.cahandler._cert_bundle_build('foo')) - - @patch('requests.get') - def test_018_ca_id_lookup(self, mock_get): - """ CAhandler._cert_bundle_build() returns everything without der in cert_dic """ - self.cahandler.api_host = 'api_host' - mockresponse1 = Mock() - mockresponse1.json = lambda: {'certificate': {'pem': 'pem', 'issuerInfo': {'id': 'id'}}} - mockresponse2 = Mock() - mockresponse2.json = lambda: {'certificate': {'pem': 'pemca'}} - mock_get.side_effect = [mockresponse1, mockresponse2] - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.assertEqual(('no der certificate returned for id foo', 'pempemca', None), self.cahandler._cert_bundle_build('foo')) - self.assertIn('ERROR:test_a2c:CAhandler._cert_bundle_build(): no der certificate returned for id: foo', lcm.output) - - @patch('requests.get') - def test_019_ca_id_lookup(self, mock_get): - """ CAhandler._cert_bundle_build() returns everything without pem in cert_dic """ - self.cahandler.api_host = 'api_host' - self.cahandler.ca_id_list = [1] - mockresponse1 = Mock() - mockresponse1.json = lambda: {'certificate': {'der': 'der', 'issuerInfo': {'id': 'id'}}} - mockresponse2 = Mock() - mockresponse2.json = lambda: {'certificate': {'pem': 'pemca'}} - mock_get.side_effect = [mockresponse1, mockresponse2] - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.assertEqual(('no pem certificate returned for id foo', 'pemca', 'der'), self.cahandler._cert_bundle_build('foo')) - self.assertIn('ERROR:test_a2c:CAhandler._cert_bundle_build(): no pem certificate returned for id: foo', lcm.output) - - @patch('requests.get') - def test_020_ca_id_lookup(self, mock_get): - """ CAhandler._cert_bundle_build() returns everything without pem in ca_dic """ - self.cahandler.api_host = 'api_host' - self.cahandler.ca_id_list = [1] - mockresponse1 = Mock() - mockresponse1.json = lambda: {'certificate': {'der': 'der', 'pem': 'pem', 'issuerInfo': {'id': 'id'}}} - mockresponse2 = Mock() - mockresponse2.json = lambda: {'certificate': {'foo': 'bar'}} - mock_get.side_effect = [mockresponse1, mockresponse2] - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.assertEqual(('no pem certificate returned for id id', 'pem', 'der'), self.cahandler._cert_bundle_build('foo')) - self.assertIn('ERROR:test_a2c:CAhandler._cert_bundle_build(): no pem certificate returned for id: id', lcm.output) - - @patch('requests.get') - def test_021_ca_id_lookup(self, mock_get): - """ CAhandler._cert_bundle_build() returns wrong ca_dic """ - self.cahandler.api_host = 'api_host' - self.cahandler.ca_id_list = [1] - mockresponse1 = Mock() - mockresponse1.json = lambda: {'certificate': {'der': 'der', 'pem': 'pem', 'issuerInfo': {'id': 'id'}}} - mockresponse2 = Mock() - mockresponse2.json = lambda: {'foo': 'bar'} - mock_get.side_effect = [mockresponse1, mockresponse2] - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.assertEqual(('invalid reponse returned for id: id', 'pem', 'der'), self.cahandler._cert_bundle_build('foo')) - self.assertIn('ERROR:test_a2c:CAhandler._cert_bundle_build(): invalid reponse returned for id: id', lcm.output) - - @patch('requests.get') - def test_022_ca_id_lookup(self, mock_get): - """ CAhandler._cert_bundle_build() returns wrong cert_dic """ - self.cahandler.api_host = 'api_host' - self.cahandler.ca_id_list = [1] - mockresponse1 = Mock() - mockresponse1.json = lambda: {'foo': 'bar'} - mockresponse2 = Mock() - mockresponse2.json = lambda: {'foo': 'bar'} - mock_get.side_effect = [mockresponse1, mockresponse2] - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.assertEqual(('invalid reponse returned for id: foo', None, None), self.cahandler._cert_bundle_build('foo')) - self.assertIn('ERROR:test_a2c:CAhandler._cert_bundle_build(): invalid reponse returned for id: foo', lcm.output) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._san_compare') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_list_fetch') - def test_023_cert_id_lookup(self, mock_fetch, mock_comp): - """ CAhandler._cert_id_lookup() - one cert - ok """ - self.cahandler.api_host = 'api_host' - mock_fetch.return_value = [{'subjectAltName': 'subjectAltName', 'certificateId': 1}] - mock_comp.return_value = True - self.assertEqual(1, self.cahandler._cert_id_lookup('csr_cn', 'san_list')) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._san_compare') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_list_fetch') - def test_024_cert_id_lookup(self, mock_fetch, mock_comp): - """ CAhandler._cert_id_lookup() - two certs - match 2nd entry in list """ - self.cahandler.api_host = 'api_host' - mock_fetch.return_value = [{'subjectAltName': 'subjectAltName1', 'certificateId': 1}, {'subjectAltName': 'subjectAltName2', 'certificateId': 2}] - mock_comp.side_effect = [True, False] - self.assertEqual(2, self.cahandler._cert_id_lookup('csr_cn', 'san_list')) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._san_compare') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_list_fetch') - def test_025_cert_id_lookup(self, mock_fetch, mock_comp): - """ CAhandler._cert_id_lookup() - no cn two certs - match 2nd entry in list """ - self.cahandler.api_host = 'api_host' - mock_fetch.return_value = [{'subjectAltName': 'subjectAltName1', 'certificateId': 1}, {'subjectAltName': 'subjectAltName2', 'certificateId': 2}] - mock_comp.side_effect = [True, False] - self.assertEqual(2, self.cahandler._cert_id_lookup(None, 'san_list')) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._san_compare') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_list_fetch') - def test_026_cert_id_lookup(self, mock_fetch, mock_comp): - """ CAhandler._cert_id_lookup() - two certs - match 1st entry in list """ - self.cahandler.api_host = 'api_host' - mock_fetch.return_value = [{'subjectAltName': 'subjectAltName1', 'certificateId': 1}, {'subjectAltName': 'subjectAltName2', 'certificateId': 2}] - mock_comp.side_effect = [False, True] - self.assertEqual(1, self.cahandler._cert_id_lookup('csr_cn', 'san_list')) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._san_compare') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_list_fetch') - def test_027_cert_id_lookup(self, mock_fetch, mock_comp): - """ CAhandler._cert_id_lookup() - no certificateid return from nclm """ - self.cahandler.api_host = 'api_host' - mockresponse = Mock() - mock_fetch.return_value = [{'subjectAltName': 'subjectAltName', 'foo': 'bar'}] - mock_comp.return_value = True - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.assertFalse(self.cahandler._cert_id_lookup('csr_cn', 'san_list')) - self.assertIn("ERROR:test_a2c:_cert_id_lookup(): response incomplete: 'certificateId'", lcm.output) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._san_compare') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_list_fetch') - def test_028_cert_id_lookup(self, mock_fetch, mock_comp): - """ CAhandler._cert_id_lookup() - two certs match """ - self.cahandler.api_host = 'api_host' - mock_fetch.return_value = [{'subjectAltName': 'subjectAltName1', 'certificateId': 1}, {'subjectAltName': 'subjectAltName2', 'certificateId': 2}] - mock_comp.side_effect = [True, True] - self.assertEqual(2, self.cahandler._cert_id_lookup('csr_cn', 'san_list')) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._san_compare') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_list_fetch') - def test_029_cert_id_lookup(self, mock_fetch, mock_comp): - """ CAhandler._cert_id_lookup() - one cert - no san in """ - self.cahandler.api_host = 'api_host' - mock_fetch.return_value = [{'foo': 'bar', 'certificateId': 'certificateId'}] - mock_comp.return_value = True - self.assertFalse(self.cahandler._cert_id_lookup('csr_cn', 'san_list')) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._san_compare') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_list_fetch') - def test_030_cert_id_lookup(self, mock_req, mock_comp): - """ CAhandler._cert_id_lookup() - no san_list in function """ - self.cahandler.api_host = 'api_host' - mockresponse = Mock() - mockresponse.json = lambda: {'certificates': [{'subjectAltName': 'subjectAltName', 'certificateId': 'certificateId'}]} - mock_req.return_value = mockresponse - mock_comp.return_value = True - self.assertFalse(self.cahandler._cert_id_lookup('csr_cn', None)) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._san_compare') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_list_fetch') - def test_031_cert_id_lookup(self, mock_fetch, mock_comp,): - """ CAhandler._cert_id_lookup() - _cert_list_fetch() does not return anything """ - self.cahandler.api_host = 'api_host' - mock_fetch.return_value = None - mock_comp.return_value = True - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.assertFalse(self.cahandler._cert_id_lookup('csr_cn', 'san_list')) - self.assertIn('ERROR:test_a2c:_cert_id_lookup(): no certificates found for csr_cn', lcm.output) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_list_fetch') - def test_032_cert_id_lookup(self, mock_fetch): - """ CAhandler._cert_id_lookup() - request raises exception """ - self.cahandler.api_host = 'api_host' - mock_fetch.side_effect = Exception('req_exc') - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.assertFalse(self.cahandler._cert_id_lookup('csr_cn', 'san_list')) - self.assertIn('ERROR:test_a2c:CAhandler._cert_id_lookup() returned error: req_exc', lcm.output) - def test_033__config_check(self): """ CAhandler._config.check() no api_host """ with self.assertLogs('test_a2c', level='INFO') as lcm: @@ -439,7 +110,7 @@ def test_039__config_check(self): """ CAhandler._config.check() no tsg_name """ self.cahandler.api_host = 'api_host' self.cahandler.credential_dic = {'api_user': 'api_user', 'api_password': 'api_password'} - self.cahandler.tsg_info_dic = {'name': False} + self.cahandler.container_info_dic = {'name': False} with self.assertLogs('test_a2c', level='INFO') as lcm: self.cahandler._config_check() self.assertEqual('tsg_name to be set in config file', self.cahandler.error) @@ -449,7 +120,7 @@ def test_040__config_check(self): """ CAhandler._config.check() no ca_name """ self.cahandler.api_host = 'api_host' self.cahandler.credential_dic = {'api_user': 'api_user', 'api_password': 'api_password'} - self.cahandler.tsg_info_dic = {'name': 'name'} + self.cahandler.container_info_dic = {'name': 'name'} with self.assertLogs('test_a2c', level='INFO') as lcm: self.cahandler._config_check() self.assertEqual('ca_name to be set in config file', self.cahandler.error) @@ -459,7 +130,7 @@ def test_041__config_check(self): """ CAhandler._config.check() ca_bundle False """ self.cahandler.api_host = 'api_host' self.cahandler.credential_dic = {'api_user': 'api_user', 'api_password': 'api_password'} - self.cahandler.tsg_info_dic = {'name': 'name'} + self.cahandler.container_info_dic = {'name': 'name'} self.cahandler.ca_name = 'ca_name' self.cahandler.ca_id_list = ['id1', 'id2'] self.cahandler.ca_bundle = False @@ -475,11 +146,9 @@ def test_042_config_load(self, mock_load_cfg): self.cahandler._config_load() self.assertFalse(self.cahandler.api_host) self.assertEqual({'api_user': None, 'api_password': None}, self.cahandler.credential_dic) - self.assertEqual({'name': None, 'id': None}, self.cahandler.tsg_info_dic) self.assertFalse(self.cahandler.ca_name) self.assertTrue(self.cahandler.ca_bundle) self.assertEqual({'name': None, 'id': None}, self.cahandler.template_info_dic) - self.assertEqual(300, self.cahandler.request_delta_treshold) self.assertEqual(20, self.cahandler.request_timeout) @patch('examples.ca_handler.nclm_ca_handler.load_config') @@ -489,11 +158,9 @@ def test_043_config_load(self, mock_load_cfg): self.cahandler._config_load() self.assertEqual('api_host', self.cahandler.api_host) self.assertEqual({'api_user': None, 'api_password': None}, self.cahandler.credential_dic) - self.assertEqual({'name': None, 'id': None}, self.cahandler.tsg_info_dic) self.assertFalse(self.cahandler.ca_name) self.assertTrue(self.cahandler.ca_bundle) self.assertEqual({'name': None, 'id': None}, self.cahandler.template_info_dic) - self.assertEqual(300, self.cahandler.request_delta_treshold) self.assertEqual(20, self.cahandler.request_timeout) @patch('examples.ca_handler.nclm_ca_handler.load_config') @@ -503,11 +170,9 @@ def test_044_config_load(self, mock_load_cfg): self.cahandler._config_load() self.assertFalse(self.cahandler.api_host) self.assertEqual({'api_user': 'api_user', 'api_password': None}, self.cahandler.credential_dic) - self.assertEqual({'name': None, 'id': None}, self.cahandler.tsg_info_dic) self.assertFalse(self.cahandler.ca_name) self.assertTrue(self.cahandler.ca_bundle) self.assertEqual({'name': None, 'id': None}, self.cahandler.template_info_dic) - self.assertEqual(300, self.cahandler.request_delta_treshold) self.assertEqual(20, self.cahandler.request_timeout) @patch('examples.ca_handler.nclm_ca_handler.load_config') @@ -517,11 +182,9 @@ def test_045_config_load(self, mock_load_cfg): self.cahandler._config_load() self.assertFalse(self.cahandler.api_host) self.assertEqual({'api_user': None, 'api_password': 'api_password'}, self.cahandler.credential_dic) - self.assertEqual({'name': None, 'id': None}, self.cahandler.tsg_info_dic) self.assertFalse(self.cahandler.ca_name) self.assertTrue(self.cahandler.ca_bundle) self.assertEqual({'name': None, 'id': None}, self.cahandler.template_info_dic) - self.assertEqual(300, self.cahandler.request_delta_treshold) self.assertEqual(20, self.cahandler.request_timeout) @patch('examples.ca_handler.nclm_ca_handler.load_config') @@ -531,11 +194,9 @@ def test_046_config_load(self, mock_load_cfg): self.cahandler._config_load() self.assertFalse(self.cahandler.api_host) self.assertEqual({'api_user': None, 'api_password': None}, self.cahandler.credential_dic) - self.assertEqual({'name': None, 'id': None}, self.cahandler.tsg_info_dic) self.assertEqual('ca_name', self.cahandler.ca_name) self.assertTrue(self.cahandler.ca_bundle) self.assertEqual({'name': None, 'id': None}, self.cahandler.template_info_dic) - self.assertEqual(300, self.cahandler.request_delta_treshold) self.assertEqual(20, self.cahandler.request_timeout) @patch('examples.ca_handler.nclm_ca_handler.load_config') @@ -545,11 +206,9 @@ def test_047_config_load(self, mock_load_cfg): self.cahandler._config_load() self.assertFalse(self.cahandler.api_host) self.assertEqual({'api_user': None, 'api_password': None}, self.cahandler.credential_dic) - self.assertEqual({'name': 'tsg_name', 'id': None}, self.cahandler.tsg_info_dic) self.assertFalse(self.cahandler.ca_name) self.assertTrue(self.cahandler.ca_bundle) self.assertEqual({'name': None, 'id': None}, self.cahandler.template_info_dic) - self.assertEqual(300, self.cahandler.request_delta_treshold) self.assertEqual(20, self.cahandler.request_timeout) @patch('examples.ca_handler.nclm_ca_handler.load_config') @@ -559,11 +218,9 @@ def test_048_config_load(self, mock_load_cfg): self.cahandler._config_load() self.assertFalse(self.cahandler.api_host) self.assertEqual({'api_user': None, 'api_password': None}, self.cahandler.credential_dic) - self.assertEqual({'name': None, 'id': None}, self.cahandler.tsg_info_dic) self.assertFalse(self.cahandler.ca_name) self.assertEqual('ca_bundle', self.cahandler.ca_bundle) self.assertEqual({'name': None, 'id': None}, self.cahandler.template_info_dic) - self.assertEqual(300, self.cahandler.request_delta_treshold) self.assertEqual(20, self.cahandler.request_timeout) @patch('examples.ca_handler.nclm_ca_handler.load_config') @@ -573,11 +230,9 @@ def test_049_config_load(self, mock_load_cfg): self.cahandler._config_load() self.assertFalse(self.cahandler.api_host) self.assertEqual({'api_user': None, 'api_password': None}, self.cahandler.credential_dic) - self.assertEqual({'name': None, 'id': None}, self.cahandler.tsg_info_dic) self.assertFalse(self.cahandler.ca_name) self.assertFalse(self.cahandler.ca_bundle) self.assertEqual({'name': None, 'id': None}, self.cahandler.template_info_dic) - self.assertEqual(300, self.cahandler.request_delta_treshold) self.assertEqual(20, self.cahandler.request_timeout) @patch('examples.ca_handler.nclm_ca_handler.load_config') @@ -587,10 +242,8 @@ def test_050_config_load(self, mock_load_cfg): self.cahandler._config_load() self.assertFalse(self.cahandler.api_host) self.assertEqual({'api_user': None, 'api_password': None}, self.cahandler.credential_dic) - self.assertEqual({'name': None, 'id': None}, self.cahandler.tsg_info_dic) self.assertFalse(self.cahandler.ca_name) self.assertEqual({'name': 'template_name', 'id': None}, self.cahandler.template_info_dic) - self.assertEqual(300, self.cahandler.request_delta_treshold) self.assertEqual(20, self.cahandler.request_timeout) @patch.dict('os.environ', {'api_user_var': 'user_var'}) @@ -601,7 +254,6 @@ def test_051_config_load(self, mock_load_cfg): self.cahandler._config_load() self.assertFalse(self.cahandler.api_host) self.assertEqual({'api_user': 'user_var', 'api_password': None}, self.cahandler.credential_dic) - self.assertEqual(300, self.cahandler.request_delta_treshold) @patch.dict('os.environ', {'api_user_var': 'user_var'}) @patch('examples.ca_handler.nclm_ca_handler.load_config') @@ -613,7 +265,6 @@ def test_052_config_load(self, mock_load_cfg): self.assertFalse(self.cahandler.api_host) self.assertEqual({'api_user': None, 'api_password': None}, self.cahandler.credential_dic) self.assertIn("ERROR:test_a2c:CAhandler._config_load() could not load user_variable:'does_not_exist'", lcm.output) - self.assertEqual(300, self.cahandler.request_delta_treshold) @patch.dict('os.environ', {'api_user_var': 'user_var'}) @patch('examples.ca_handler.nclm_ca_handler.load_config') @@ -625,7 +276,6 @@ def test_053_config_load(self, mock_load_cfg): self.assertFalse(self.cahandler.api_host) self.assertEqual({'api_user': 'api_user', 'api_password': None}, self.cahandler.credential_dic) self.assertIn('INFO:test_a2c:CAhandler._config_load() overwrite api_user', lcm.output) - self.assertEqual(300, self.cahandler.request_delta_treshold) @patch.dict('os.environ', {'api_password_var': 'password_var'}) @patch('examples.ca_handler.nclm_ca_handler.load_config') @@ -635,7 +285,6 @@ def test_054_config_load(self, mock_load_cfg): self.cahandler._config_load() self.assertFalse(self.cahandler.api_host) self.assertEqual({'api_user': None, 'api_password': 'password_var'}, self.cahandler.credential_dic) - self.assertEqual(300, self.cahandler.request_delta_treshold) @patch.dict('os.environ', {'api_password_var': 'password_var'}) @patch('examples.ca_handler.nclm_ca_handler.load_config') @@ -647,7 +296,6 @@ def test_055_config_load(self, mock_load_cfg): self.assertFalse(self.cahandler.api_host) self.assertEqual({'api_user': None, 'api_password': None}, self.cahandler.credential_dic) self.assertIn("ERROR:test_a2c:CAhandler._config_load() could not load password_variable:'does_not_exist'", lcm.output) - self.assertEqual(300, self.cahandler.request_delta_treshold) @patch.dict('os.environ', {'api_password_var': 'password_var'}) @patch('examples.ca_handler.nclm_ca_handler.load_config') @@ -659,7 +307,6 @@ def test_056_config_load(self, mock_load_cfg): self.assertFalse(self.cahandler.api_host) self.assertEqual({'api_password': 'api_password', 'api_user': None}, self.cahandler.credential_dic) self.assertIn('INFO:test_a2c:CAhandler._config_load() overwrite api_password', lcm.output) - self.assertEqual(300, self.cahandler.request_delta_treshold) @patch('examples.ca_handler.nclm_ca_handler.parse_url') @patch('json.loads') @@ -672,7 +319,6 @@ def test_057_config_load(self, mock_load_cfg, mock_json, mock_url): self.cahandler._config_load() self.assertTrue(mock_json.called) self.assertTrue(mock_url.called) - self.assertEqual(300, self.cahandler.request_delta_treshold) @patch('examples.ca_handler.nclm_ca_handler.proxy_check') @patch('examples.ca_handler.nclm_ca_handler.parse_url') @@ -689,7 +335,6 @@ def test_058_config_load(self, mock_load_cfg, mock_json, mock_url, mock_chk): self.assertTrue(mock_url.called) self.assertTrue(mock_chk.called) self.assertEqual({'http': 'proxy.bar.local', 'https': 'proxy.bar.local'},self.cahandler.proxy ) - self.assertEqual(300, self.cahandler.request_delta_treshold) @patch('examples.ca_handler.nclm_ca_handler.proxy_check') @patch('examples.ca_handler.nclm_ca_handler.parse_url') @@ -708,31 +353,6 @@ def test_059_config_load(self, mock_load_cfg, mock_json, mock_url, mock_chk): self.assertFalse(mock_chk.called) self.assertFalse(self.cahandler.proxy ) self.assertIn('WARNING:test_a2c:Challenge._config_load() proxy_server_list failed with error: not enough values to unpack (expected 2, got 1)', lcm.output) - self.assertEqual(300, self.cahandler.request_delta_treshold) - - @patch('examples.ca_handler.nclm_ca_handler.load_config') - def test_060_config_load(self, mock_load_cfg): - """ CAhandler._config_load request_delta_treshold """ - mock_load_cfg.return_value = {'CAhandler': {'request_delta_treshold': 60}} - self.cahandler._config_load() - self.assertFalse(self.cahandler.api_host) - self.assertEqual({'api_user': None, 'api_password': None}, self.cahandler.credential_dic) - self.assertEqual({'name': None, 'id': None}, self.cahandler.tsg_info_dic) - self.assertFalse(self.cahandler.ca_name) - self.assertEqual(60, self.cahandler.request_delta_treshold) - - @patch('examples.ca_handler.nclm_ca_handler.load_config') - def test_061_config_load(self, mock_load_cfg): - """ CAhandler._config_load request_delta_treshold string """ - mock_load_cfg.return_value = {'CAhandler': {'request_delta_treshold': 'aaa'}} - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.cahandler._config_load() - self.assertFalse(self.cahandler.api_host) - self.assertEqual({'api_user': None, 'api_password': None}, self.cahandler.credential_dic) - self.assertEqual({'name': None, 'id': None}, self.cahandler.tsg_info_dic) - self.assertFalse(self.cahandler.ca_name) - self.assertEqual(300, self.cahandler.request_delta_treshold) - self.assertIn('ERROR:test_a2c:CAhandler._config_load() could not load request_delta_treshold:aaa', lcm.output) @patch('examples.ca_handler.nclm_ca_handler.load_config') def test_062_config_load(self, mock_load_cfg): @@ -748,241 +368,27 @@ def test_063_config_load(self, mock_load_cfg): self.cahandler._config_load() self.assertEqual(20, self.cahandler.request_timeout) - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._lastrequests_get') - @patch('examples.ca_handler.nclm_ca_handler.date_to_uts_utc') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._unusedrequests_get') - @patch('examples.ca_handler.nclm_ca_handler.uts_now') - def test_064__csr_id_lookup(self, mock_utsnow, mock_unureq, mock_uts, mock_lastreq): - """ CAhandler._csr_id_lookup - all ok """ - mock_utsnow.return_value = 1000 - mock_uts.return_value = 900 - mock_unureq.return_value = [{'addedAt': 'addedAt', 'subjectName': 'CN=csr_cn, O=bar', 'requestID': 'requestID'}] - self.assertEqual('requestID', self.cahandler._csr_id_lookup('csr_cn', ['csr_san_list'])) - self.assertFalse(mock_lastreq.called) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._lastrequests_get') - @patch('examples.ca_handler.nclm_ca_handler.date_to_uts_utc') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._unusedrequests_get') - @patch('examples.ca_handler.nclm_ca_handler.uts_now') - def test_065__csr_id_lookup(self, mock_utsnow, mock_unureq, mock_uts, mock_lastreq): - """ CAhandler._csr_id_lookup - no requestID in list exception triggered """ - mock_utsnow.return_value = 1000 - mock_uts.return_value = 900 - mock_unureq.return_value = [{'addedAt': 'addedAt', 'subjectName': 'CN=csr_cn, O=bar'}] - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.assertFalse(self.cahandler._csr_id_lookup('csr_cn', ['csr_san_list'])) - self.assertFalse(mock_lastreq.called) - self.assertIn("ERROR:test_a2c:_csr_id_lookup(): response incomplete: 'requestID'", lcm.output) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._lastrequests_get') - @patch('examples.ca_handler.nclm_ca_handler.date_to_uts_utc') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._unusedrequests_get') - @patch('examples.ca_handler.nclm_ca_handler.uts_now') - def test_066__csr_id_lookup(self, mock_utsnow, mock_unureq, mock_uts, mock_lastreq): - """ CAhandler._csr_id_lookup - cn in mock_unureq not correctly ordered """ - mock_utsnow.return_value = 1000 - mock_uts.return_value = 900 - mock_unureq.return_value = [{'addedAt': 'addedAt', 'subjectName': 'O=bar, CN=csr_cn', 'requestID': 'requestID'}] - self.assertEqual('requestID', self.cahandler._csr_id_lookup('csr_cn', ['csr_san_list'])) - self.assertFalse(mock_lastreq.called) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._lastrequests_get') - @patch('examples.ca_handler.nclm_ca_handler.date_to_uts_utc') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._unusedrequests_get') - @patch('examples.ca_handler.nclm_ca_handler.uts_now') - def test_067__csr_id_lookup(self, mock_utsnow, mock_unureq, mock_uts, mock_lastreq): - """ CAhandler._csr_id_lookup - empty subjectName """ - mock_utsnow.return_value = 1000 - mock_uts.return_value = 900 - mock_unureq.return_value = [{'addedAt': 'addedAt', 'subjectName': '', 'requestID': 'requestID'}] - self.assertFalse(self.cahandler._csr_id_lookup('csr_cn', ['csr_san_list'])) - self.assertFalse(mock_lastreq.called) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._lastrequests_get') - @patch('examples.ca_handler.nclm_ca_handler.date_to_uts_utc') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._unusedrequests_get') - @patch('examples.ca_handler.nclm_ca_handler.uts_now') - def test_068__csr_id_lookup(self, mock_utsnow, mock_unureq, mock_uts, mock_lastreq): - """ CAhandler._csr_id_lookup - no subjectName """ - mock_utsnow.return_value = 1000 - mock_uts.return_value = 900 - mock_unureq.return_value = [{'addedAt': 'addedAt', 'requestID': 'requestID'}] - self.assertFalse(self.cahandler._csr_id_lookup('csr_cn', ['csr_san_list'])) - self.assertFalse(mock_lastreq.called) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._lastrequests_get') - @patch('examples.ca_handler.nclm_ca_handler.date_to_uts_utc') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._unusedrequests_get') - @patch('examples.ca_handler.nclm_ca_handler.uts_now') - def test_069__csr_id_lookup(self, mock_utsnow, mock_unureq, mock_uts, mock_lastreq): - """ CAhandler._csr_id_lookup - requests to old """ - mock_utsnow.return_value = 1000 - mock_uts.return_value = 100 - mock_unureq.return_value = [{'addedAt': 'addedAt', 'subjectName': 'O=bar, CN=csr_cn', 'requestID': 'requestID'}] - self.assertFalse(self.cahandler._csr_id_lookup('csr_cn', ['csr_san_list'])) - self.assertFalse(mock_lastreq.called) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._lastrequests_get') - @patch('examples.ca_handler.nclm_ca_handler.csr_san_get') - @patch('examples.ca_handler.nclm_ca_handler.date_to_uts_utc') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._unusedrequests_get') - @patch('examples.ca_handler.nclm_ca_handler.uts_now') - def test_070__csr_id_lookup(self, mock_utsnow, mock_unureq, mock_uts, mock_san, mock_lastreq): - """ CAhandler._csr_id_lookup - no csr_cn one san """ - self.cahandler.api_host = 'api_host' - mock_utsnow.return_value = 1000 - mock_uts.return_value = 900 - mock_unureq.return_value = [{'addedAt': 'addedAt', 'requestID': 'requestID'}] - mock_lastreq.return_value = [{'pkcs10': 'pkcs10', 'requestId': 1}] - mock_san.return_value = ['csr_san_list'] - self.assertEqual(1, self.cahandler._csr_id_lookup(None, ['csr_san_list'], 'pkcs10')) - self.assertTrue(mock_lastreq.called) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._lastrequests_get') - @patch('examples.ca_handler.nclm_ca_handler.csr_san_get') - @patch('examples.ca_handler.nclm_ca_handler.date_to_uts_utc') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._unusedrequests_get') - @patch('examples.ca_handler.nclm_ca_handler.uts_now') - def test_071__csr_id_lookup(self, mock_utsnow, mock_unureq, mock_uts, mock_san, mock_lastreq): - """ CAhandler._csr_id_lookup - no csr_cn two sans """ - self.cahandler.api_host = 'api_host' - mock_utsnow.return_value = 1000 - mock_uts.return_value = 900 - mock_unureq.return_value = [{'addedAt': 'addedAt', 'requestID': 'requestID'}] - mock_lastreq.return_value = [{'pkcs10': 'pkcs10', 'requestId': 1}] - mock_san.return_value = ['san1', 'san2'] - self.assertEqual(1, self.cahandler._csr_id_lookup(None, ['san1', 'san2'], 'pkcs10')) - self.assertTrue(mock_lastreq.called) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._lastrequests_get') - @patch('examples.ca_handler.nclm_ca_handler.csr_san_get') - @patch('examples.ca_handler.nclm_ca_handler.date_to_uts_utc') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._unusedrequests_get') - @patch('examples.ca_handler.nclm_ca_handler.uts_now') - def test_072__csr_id_lookup(self, mock_utsnow, mock_unureq, mock_uts, mock_san, mock_lastreq): - """ CAhandler._csr_id_lookup - no csr_cn two sans to be reordered """ - self.cahandler.api_host = 'api_host' - mock_utsnow.return_value = 1000 - mock_uts.return_value = 900 - mock_unureq.return_value = [{'addedAt': 'addedAt', 'requestID': 'requestID'}] - mock_lastreq.return_value = [{'pkcs10': 'pkcs10', 'requestId': 1}] - mock_san.return_value = ['san1', 'san2'] - self.assertEqual(1, self.cahandler._csr_id_lookup(None, ['san2', 'san1'], 'pkcs10')) - self.assertTrue(mock_lastreq.called) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._lastrequests_get') - @patch('examples.ca_handler.nclm_ca_handler.csr_san_get') - @patch('examples.ca_handler.nclm_ca_handler.date_to_uts_utc') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._unusedrequests_get') - @patch('examples.ca_handler.nclm_ca_handler.uts_now') - def test_073__csr_id_lookup(self, mock_utsnow, mock_unureq, mock_uts, mock_san, mock_lastreq): - """ CAhandler._csr_id_lookup - no csr_cn no requestID """ - self.cahandler.api_host = 'api_host' - mock_utsnow.return_value = 1000 - mock_uts.return_value = 900 - mock_unureq.return_value = [{'addedAt': 'addedAt', 'requestID': 'requestID'}] - mock_lastreq.return_value = [{'pkcs10': 'pkcs10', 'foo': 'bar'}] - mock_san.return_value = ['san2', 'san1'] - self.assertFalse(self.cahandler._csr_id_lookup(None, ['san1', 'san2'], 'pkcs10')) - self.assertTrue(mock_lastreq.called) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._lastrequests_get') - @patch('examples.ca_handler.nclm_ca_handler.csr_san_get') - @patch('examples.ca_handler.nclm_ca_handler.date_to_uts_utc') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._unusedrequests_get') - @patch('examples.ca_handler.nclm_ca_handler.uts_now') - def test_074__csr_id_lookup(self, mock_utsnow, mock_unureq, mock_uts, mock_san, mock_lastreq): - """ CAhandler._csr_id_lookup - no csr_cn sans are not matching """ - self.cahandler.api_host = 'api_host' - mock_utsnow.return_value = 1000 - mock_uts.return_value = 900 - mock_unureq.return_value = [{'addedAt': 'addedAt', 'requestID': 'requestID'}] - mock_lastreq.return_value = [{'pkcs10': 'pkcs10', 'requestID': 'requestID'}] - mock_san.return_value = ['san1'] - self.assertFalse(self.cahandler._csr_id_lookup(None, ['san1', 'san2'], 'pkcs10')) - self.assertTrue(mock_lastreq.called) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._lastrequests_get') - @patch('examples.ca_handler.nclm_ca_handler.csr_san_get') - @patch('examples.ca_handler.nclm_ca_handler.date_to_uts_utc') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._unusedrequests_get') - @patch('examples.ca_handler.nclm_ca_handler.uts_now') - def test_075__csr_id_lookup(self, mock_utsnow, mock_unureq, mock_uts, mock_san, mock_lastreq): - """ CAhandler._csr_id_lookup - no csr_cn no pkcs10 """ - self.cahandler.api_host = 'api_host' - mock_utsnow.return_value = 1000 - mock_uts.return_value = 900 - mock_unureq.return_value = [{'addedAt': 'addedAt', 'requestID': 'requestID'}] - mock_lastreq.return_value = [{'pkcs10': 'pkcs10', 'requestId': 1}] - mock_san.return_value = ['san1'] - self.assertFalse(self.cahandler._csr_id_lookup(None, ['san1', 'san2'])) - self.assertTrue(mock_lastreq.called) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._lastrequests_get') - @patch('examples.ca_handler.nclm_ca_handler.csr_san_get') - @patch('examples.ca_handler.nclm_ca_handler.date_to_uts_utc') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._unusedrequests_get') - @patch('examples.ca_handler.nclm_ca_handler.uts_now') - def test_076__csr_id_lookup(self, mock_utsnow, mock_unureq, mock_uts, mock_san, mock_lastreq): - """ CAhandler._csr_id_lookup - no csr_cn trigger excption in loop """ - self.cahandler.api_host = 'api_host' - mock_utsnow.return_value = 1000 - mock_uts.return_value = 900 - mock_unureq.return_value = [{'addedAt': 'addedAt', 'requestID': 1}] - mock_lastreq.return_value = [{'foo': 'bar', 'requestID': 'requestID'}] - - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.assertFalse(self.cahandler._csr_id_lookup(None, ['san1', 'san2'], 'pkcs10')) - self.assertTrue(mock_lastreq.called) - self.assertIn("ERROR:test_a2c:_csr_id_lookup(): response incomplete: 'requestId'", lcm.output) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._api_post') - def test_077__request_import(self, mock_req): - """ CAhandler._request_import """ - self.cahandler.api_host = 'api_host' - mock_req.return_value = 'foo' - self.assertEqual('foo', self.cahandler._request_import('csr')) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._api_post') - def test_078__request_import(self, mock_req): - """ CAhandler._request_import - req raises an exception """ - self.cahandler.api_host = 'api_host' - mock_req.side_effect = Exception('exc_req_import') - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.assertFalse(self.cahandler._request_import('csr')) - self.assertIn('ERROR:test_a2c:CAhandler._request_import() returned error: exc_req_import', lcm.output) - - @patch('requests.get') - def test_079__unusedrequests_get(self, mock_req): - """ CAhandler._unusedrequests_get """ - self.cahandler.api_host = 'api_host' - mockresponse = Mock() - mockresponse.json = lambda: {'foo': 'bar'} - mock_req.return_value = mockresponse - self.assertEqual({'foo': 'bar'}, self.cahandler._unusedrequests_get()) - - @patch('requests.get') - def test_080__unusedrequests_get(self, mock_req): - """ CAhandler._unusedrequests_get """ - self.cahandler.api_host = 'api_host' - mock_req.side_effect = Exception('exc_req_unused') - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.assertFalse(self.cahandler._unusedrequests_get()) - self.assertIn('ERROR:test_a2c:CAhandler._unusedrequests_get() returned error: exc_req_unused', lcm.output) + @patch('examples.ca_handler.nclm_ca_handler.load_config') + def test_064_config_load(self, mock_load_cfg): + """ CAhandler._config_load """ + mock_load_cfg.return_value = {'CAhandler': {'container_name': 'container_name'}} + self.cahandler._config_load() + self.assertEqual({'name': 'container_name', 'id': None}, self.cahandler.container_info_dic) + @patch('requests.post') @patch('requests.get') - def test_081__login(self, mock_get): + def test_081__login(self, mock_get, mock_post): """ CAhandler._unusedrequests_get """ self.cahandler.api_host = 'api_host' mockresponse1 = Mock() - mockresponse1.status_code = 'foo' + mockresponse1.status_code = '500' mockresponse1.ok = None - # mockresponse1.raise_for_status = Mock(return_value='status') mock_get.return_value = mockresponse1 with self.assertLogs('test_a2c', level='INFO') as lcm: self.cahandler._login() + self.assertIn('ERROR:test_a2c:CAhandler._login() error during get: 500', lcm.output) + self.assertFalse(mock_post.called) self.assertFalse(self.cahandler.headers) - self.assertIn('ERROR:test_a2c:CAhandler._login() error during get: foo', lcm.output) @patch('requests.post') @patch('requests.get') @@ -990,99 +396,64 @@ def test_082__login(self, mock_get, mock_post): """ CAhandler._unusedrequests_get """ self.cahandler.api_host = 'api_host' mockresponse1 = Mock() - mockresponse1.status_code = lambda: 'foo' + mockresponse1.status_code = '200' + mockresponse1.json = lambda: {'versionNumber': 'versionNumber'} + mockresponse1.ok = True mock_get.return_value = mockresponse1 mockresponse2 = Mock() - mockresponse2.json = lambda: {'access_token': 'access_token', 'username': 'username', 'realms': 'realms'} + mockresponse2.status_code = '500' + mockresponse2.json = lambda: {'foo': 'bar', 'username': 'username', 'realms': 'realms'} + mockresponse2.ok = None mock_post.return_value = mockresponse2 - self.cahandler._login() - self.assertEqual({'Authorization': 'Bearer access_token'}, self.cahandler.headers) + with self.assertLogs('test_a2c', level='INFO') as lcm: + self.cahandler._login() + self.assertIn('ERROR:test_a2c:CAhandler._login() error during post: 500', lcm.output) + self.assertTrue(mock_post.called) + self.assertFalse(self.cahandler.headers) + self.assertEqual('versionNumber', self.cahandler.nclm_version) @patch('requests.post') @patch('requests.get') def test_083__login(self, mock_get, mock_post): - """ CAhandler._unusedrequests_get mock_post without username""" - self.cahandler.api_host = 'api_host' - mockresponse1 = Mock() - mockresponse1.status_code = lambda: 'foo' - mock_get.return_value = mockresponse1 - mockresponse2 = Mock() - mockresponse2.json = lambda: {'access_token': 'access_token', 'foo': 'bar', 'realms': 'realms'} - mock_post.return_value = mockresponse2 - self.cahandler._login() - self.assertEqual({'Authorization': 'Bearer access_token'}, self.cahandler.headers) - - @patch('requests.post') - @patch('requests.get') - def test_084__login(self, mock_get, mock_post): - """ CAhandler._unusedrequests_get mock_post without username""" + """ CAhandler._unusedrequests_get """ self.cahandler.api_host = 'api_host' mockresponse1 = Mock() - mockresponse1.status_code = lambda: 'foo' + mockresponse1.status_code = '200' + mockresponse1.json = lambda: {'versionNumber': 'versionNumber'} + mockresponse1.ok = True mock_get.return_value = mockresponse1 mockresponse2 = Mock() - mockresponse2.ok = None - mockresponse2.status_code = 'foo2' + mockresponse2.status_code = '200' + mockresponse2.json = lambda: {'foo': 'bar', 'username': 'username', 'realms': 'realms'} + mockresponse2.ok = True mock_post.return_value = mockresponse2 with self.assertLogs('test_a2c', level='INFO') as lcm: self.cahandler._login() + self.assertIn('ERROR:test_a2c:CAhandler._login(): No token returned. Aborting.', lcm.output) + self.assertTrue(mock_post.called) self.assertFalse(self.cahandler.headers) - self.assertIn('ERROR:test_a2c:CAhandler._login() error during post: foo2', lcm.output) + self.assertEqual('versionNumber', self.cahandler.nclm_version) @patch('requests.post') @patch('requests.get') - def test_085__login(self, mock_get, mock_post): - """ CAhandler._unusedrequests_get mock_post without realms""" + def test_084__login(self, mock_get, mock_post): + """ CAhandler._unusedrequests_get """ self.cahandler.api_host = 'api_host' mockresponse1 = Mock() - mockresponse1.status_code = lambda: 'foo' + mockresponse1.status_code = '200' + mockresponse1.json = lambda: {'versionNumber': 'versionNumber'} + mockresponse1.ok = True mock_get.return_value = mockresponse1 mockresponse2 = Mock() - mockresponse2.json = lambda: {'access_token': 'access_token', 'username': 'username', 'foo': 'bar'} + mockresponse2.status_code = '200' + mockresponse2.json = lambda: {'access_token': 'access_token', 'username': 'username', 'realms': 'realms'} + mockresponse2.ok = True mock_post.return_value = mockresponse2 self.cahandler._login() + self.assertTrue(mock_post.called) self.assertEqual({'Authorization': 'Bearer access_token'}, self.cahandler.headers) + self.assertEqual('versionNumber', self.cahandler.nclm_version) - @patch('requests.post') - @patch('requests.get') - def test_086__login(self, mock_get, mock_post): - """ CAhandler._unusedrequests_get mock_post without access tooken""" - self.cahandler.api_host = 'api_host' - mockresponse1 = Mock() - mockresponse1.status_code = lambda: 'foo' - mockresponse1.ok = 'ok' - mock_get.return_value = mockresponse1 - mockresponse2 = Mock() - mockresponse2.json = lambda: {'foo': 'bar', 'username': 'username', 'realms': 'realms'} - mock_post.return_value = mockresponse2 - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.cahandler._login() - self.assertFalse(self.cahandler.headers) - self.assertIn('ERROR:test_a2c:CAhandler._login(): No token returned. Aborting...', lcm.output) - - def test_087__san_compare(self): - """ CAhandler._san_compare all ok """ - csr_san_list = ['foo:foo'] - cert_san_list = {'foo': ['foo']} - self.assertTrue(self.cahandler._san_compare(csr_san_list, cert_san_list)) - - def test_088__san_compare(self): - """ CAhandler._san_compare multiple """ - csr_san_list = ['foo:foo', 'foo:bar'] - cert_san_list = {'foo': ['foo', 'bar']} - self.assertTrue(self.cahandler._san_compare(csr_san_list, cert_san_list)) - - def test_089__san_compare(self): - """ CAhandler._san_compare multiple """ - csr_san_list = ['foo:foo,foo:bar'] - cert_san_list = {'foo': ['foo', 'bar']} - self.assertTrue(self.cahandler._san_compare(csr_san_list, cert_san_list)) - - def test_090__san_compare(self): - """ CAhandler._san_compare multiple """ - csr_san_list = ['foo:foo,foo:bar1'] - cert_san_list = {'foo': ['foo', 'bar']} - self.assertFalse(self.cahandler._san_compare(csr_san_list, cert_san_list)) def test_091_poll(self): """ CAhandler.poll() """ @@ -1093,244 +464,125 @@ def test_092_trigger(self): self.assertEqual(('Method not implemented.', None, None), self.cahandler.trigger('payload')) @patch('requests.get') - def test_093___tsg_id_lookup(self, mock_get): - """ CAhandler._tsg_id_lookup() - all ok """ - self.cahandler.api_host = 'api_host' - mockresponse = Mock() - mockresponse.json = lambda: {'targetSystemGroups': [{'name': 'name', 'id': 'id'}]} - mock_get.return_value = mockresponse - self.cahandler.tsg_info_dic = {'name': 'name', 'id': None} - self.cahandler._container_id_lookup() - self.assertEqual({'name': 'name', 'id': 'id'}, self.cahandler.tsg_info_dic) - - @patch('requests.get') - def test_094___tsg_id_lookup(self, mock_get): - """ CAhandler._tsg_id_lookup() - multipe returned 1st matches """ - self.cahandler.api_host = 'api_host' - mockresponse = Mock() - mockresponse.json = lambda: {'targetSystemGroups': [{'name': 'name', 'id': 'id'}, {'name': 'name1', 'id': 'id1'}]} - mock_get.return_value = mockresponse - self.cahandler.tsg_info_dic = {'name': 'name', 'id': None} - self.cahandler._container_id_lookup() - self.assertEqual({'name': 'name', 'id': 'id'}, self.cahandler.tsg_info_dic) - - @patch('requests.get') - def test_095___tsg_id_lookup(self, mock_get): - """ CAhandler._tsg_id_lookup() - multipe returned 2nd matches """ + def test_093_container_id_lookup(self, mock_get): + """ CAhandler._container_id_lookup() """ self.cahandler.api_host = 'api_host' mockresponse = Mock() - mockresponse.json = lambda: {'targetSystemGroups': [{'name': 'name1', 'id': 'id1'}, {'name': 'name', 'id': 'id'}]} + mockresponse.json = lambda: {'items': [{'name': 'name1', 'id': 'id1'}, {'name': 'name2', 'id': 'id2'}]} mock_get.return_value = mockresponse - self.cahandler.tsg_info_dic = {'name': 'name', 'id': None} + self.cahandler.container_info_dic = {'name': 'name1', 'id': None} self.cahandler._container_id_lookup() - self.assertEqual({'name': 'name', 'id': 'id'}, self.cahandler.tsg_info_dic) + self.assertEqual({'name': 'name1', 'id': 'id1'}, self.cahandler.container_info_dic) @patch('requests.get') - def test_096___tsg_id_lookup(self, mock_get): - """ CAhandler._tsg_id_lookup() - id is missing """ + def test_094_container_id_lookup(self, mock_get): + """ CAhandler._container_id_lookup() """ self.cahandler.api_host = 'api_host' mockresponse = Mock() - mockresponse.json = lambda: {'targetSystemGroups': [{'name': 'name'}]} + mockresponse.json = lambda: {'items': [{'name1': 'name1', 'id': 'id1'}, {'name': 'name2', 'id': 'id2'}]} mock_get.return_value = mockresponse - self.cahandler.tsg_info_dic = {'name': 'name', 'id': None} + self.cahandler.container_info_dic = {'name': 'name', 'id': None} with self.assertLogs('test_a2c', level='INFO') as lcm: self.cahandler._container_id_lookup() - self.assertEqual({'name': 'name', 'id': None}, self.cahandler.tsg_info_dic) - self.assertIn("ERROR:test_a2c:CAhandler._container_id_lookup() incomplete response: {'name': 'name'}", lcm.output) + self.assertIn("ERROR:test_a2c:CAhandler._container_id_lookup() incomplete response: {'name1': 'name1', 'id': 'id1'}", lcm.output) + self.assertEqual({'name': 'name', 'id': None}, self.cahandler.container_info_dic) @patch('requests.get') - def test_097___tsg_id_lookup(self, mock_get): - """ CAhandler._tsg_id_lookup() - name is missing """ + def test_095_container_id_lookup(self, mock_get): + """ CAhandler._container_id_lookup() """ self.cahandler.api_host = 'api_host' mockresponse = Mock() - mockresponse.json = lambda: {'targetSystemGroups': [{'foo': 'bar', 'id': 'id'}]} + mockresponse.json = lambda: {'foo': [{'name': 'name1', 'id': 'id1'}, {'name': 'name2', 'id': 'id2'}]} mock_get.return_value = mockresponse - self.cahandler.tsg_info_dic = {'name': 'name', 'id': None} + self.cahandler.container_info_dic = {'name': 'name', 'id': None} with self.assertLogs('test_a2c', level='INFO') as lcm: self.cahandler._container_id_lookup() - self.assertEqual({'name': 'name', 'id': None}, self.cahandler.tsg_info_dic) - self.assertIn("ERROR:test_a2c:CAhandler._container_id_lookup() incomplete response: {'foo': 'bar', 'id': 'id'}", lcm.output) + self.assertIn('ERROR:test_a2c:CAhandler._container_id_lookup() no target-system-groups found for filter: name.', lcm.output) + self.assertEqual({'name': 'name', 'id': None}, self.cahandler.container_info_dic) @patch('requests.get') - def test_098___tsg_id_lookup(self, mock_get): - """ CAhandler._tsg_id_lookup() - targetSystemGroups is missing """ + def test_096_container_id_lookup(self, mock_req): + """ CAhandler._container_id_lookup() """ self.cahandler.api_host = 'api_host' - mockresponse = Mock() - mockresponse.json = lambda: {'tsg': [{'foo': 'bar', 'id': 'id'}]} - mock_get.return_value = mockresponse - self.cahandler.tsg_info_dic = {'name': 'name', 'id': None} + mock_req.side_effect = Exception('exc_container_id_lookup') + self.cahandler.container_info_dic = {'name': 'name', 'id': None} with self.assertLogs('test_a2c', level='INFO') as lcm: self.cahandler._container_id_lookup() - self.assertEqual({'name': 'name', 'id': None}, self.cahandler.tsg_info_dic) - self.assertIn('ERROR:test_a2c:CAhandler._container_id_lookup() no target-system-groups found for filter: name...', lcm.output) + self.assertIn('ERROR:test_a2c:CAhandler._container_id_lookup() returned error: exc_container_id_lookup', lcm.output) + self.assertIn('ERROR:test_a2c:CAhandler._container_id_lookup() no target-system-groups found for filter: name.', lcm.output) + self.assertEqual({'name': 'name', 'id': None}, self.cahandler.container_info_dic) - @patch('requests.get') - def test_099__tsg_id_lookup(self, mock_req): - """ CAhandler._request_import - req raises an exception """ - self.cahandler.api_host = 'api_host' - mock_req.side_effect = Exception('exc_container_id_lookup') + + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._templates_enumerate') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._template_list_get') + def test_100__template_id_lookup(self, mock_list, mock_enum): + """ CAhandler._template_id_lookup """ + mock_list.return_value = {'foo': 'bar'} with self.assertLogs('test_a2c', level='INFO') as lcm: - self.assertFalse(self.cahandler._container_id_lookup()) - self.assertIn('ERROR:test_a2c:CAhandler._container_id_lookup() returned error: exc_container_id_lookup', lcm.output) + self.cahandler._template_id_lookup('caid') + self.assertIn('ERROR:test_a2c:CAhandler._template_id_lookup() no templates found for filter: None.', lcm.output) + self.assertFalse(mock_enum.called) - @patch('requests.get') - def test_100__template_id_lookup(self, mock_get): - """ CAhandler._template_id_lookup() - all ok """ - self.cahandler.api_host = 'api_host' - mockresponse = Mock() - mockresponse.json = lambda: {'template': {'items': [{'displayName': 'template_name', 'allowed': True, 'policyLinkId': 10, 'linkType': 'TEMPLATE'}]}} - mock_get.return_value = mockresponse - self.cahandler.template_info_dic = {'name': 'template_name', 'id': None} - self.cahandler._template_id_lookup() - self.assertEqual({'name': 'template_name', 'id': 10}, self.cahandler.template_info_dic) + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._templates_enumerate') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._template_list_get') + def test_101__template_id_lookup(self, mock_list, mock_enum): + """ CAhandler._template_id_lookup """ + mock_list.return_value = {'items': ['foo', 'bar']} + self.cahandler._template_id_lookup('caid') + self.assertTrue(mock_enum.called) @patch('requests.get') - def test_101__template_id_lookup(self, mock_get): - """ CAhandler._template_id_lookup() - linkId None """ + def test_102__template_list_get(self, mock_get): + """ CAhandler._template_id_lookup() """ self.cahandler.api_host = 'api_host' mockresponse = Mock() - mockresponse.json = lambda: {'template': {'items': [{'displayName': 'template_name', 'allowed': True, 'linkId': None, 'linkType': 'TEMPLATE'}]}} + mockresponse.json = lambda: {'foo': 'bar'} mock_get.return_value = mockresponse - self.cahandler.template_info_dic = {'name': 'template_name', 'id': None} - self.cahandler._template_id_lookup() - self.assertEqual({'name': 'template_name', 'id': None}, self.cahandler.template_info_dic) + self.assertEqual({'foo': 'bar'}, self.cahandler._template_list_get(6)) @patch('requests.get') - def test_102__template_id_lookup(self, mock_get): - """ CAhandler._template_id_lookup() - No linkId """ + def test_103__template_list_get(self, mock_get): + """ CAhandler._template_id_lookup() """ self.cahandler.api_host = 'api_host' - mockresponse = Mock() - mockresponse.json = lambda: {'template': {'items': [{'displayName': 'template_name', 'allowed': True, 'linkType': 'TEMPLATE'}]}} - mock_get.return_value = mockresponse - self.cahandler.template_info_dic = {'name': 'template_name', 'id': None} - self.cahandler._template_id_lookup() - self.assertEqual({'name': 'template_name', 'id': None}, self.cahandler.template_info_dic) + mock_get.side_effect = Exception('req_exc') + with self.assertLogs('test_a2c', level='INFO') as lcm: + self.assertFalse(self.cahandler._template_list_get(6)) + self.assertIn('ERROR:test_a2c:CAhandler._template_list_get() returned error: req_exc', lcm.output) @patch('requests.get') - def test_103__template_id_lookup(self, mock_get): - """ CAhandler._template_id_lookup() - no match in template names """ + def test_104__template_list_get(self, mock_get): + """ CAhandler._template_id_lookup() """ self.cahandler.api_host = 'api_host' mockresponse = Mock() - mockresponse.json = lambda: {'template': {'items': [{'displayName': 'nomatch', 'allowed': True, 'linkId': 10, 'linkType': 'TEMPLATE'}]}} + mockresponse.json = lambda: {'items': 'bar'} mock_get.return_value = mockresponse - self.cahandler.template_info_dic = {'name': 'template_name', 'id': None} - self.cahandler._template_id_lookup() - self.assertEqual({'name': 'template_name', 'id': None}, self.cahandler.template_info_dic) - - @patch('requests.get') - def test_104__template_id_lookup(self, mock_get): - """ CAhandler._template_id_lookup() - allowed false """ - self.cahandler.api_host = 'api_host' - mockresponse = Mock() - mockresponse.json = lambda: {'template': {'items': [{'displayName': 'template_name', 'allowed': False, 'linkId': 10, 'linkType': 'TEMPLATE'}]}} - mock_get.return_value = mockresponse - self.cahandler.template_info_dic = {'name': 'template_name', 'id': None} - self.cahandler._template_id_lookup() - self.assertEqual({'name': 'template_name', 'id': None}, self.cahandler.template_info_dic) - - @patch('requests.get') - def test_105__template_id_lookup(self, mock_get): - """ CAhandler._template_id_lookup() - template in lower cases """ - self.cahandler.api_host = 'api_host' - mockresponse = Mock() - mockresponse.json = lambda: {'template': {'items': [{'displayName': 'template_name', 'allowed': True, 'policyLinkId': 10, 'linkType': 'template'}]}} - mock_get.return_value = mockresponse - self.cahandler.template_info_dic = {'name': 'template_name', 'id': None} - self.cahandler._template_id_lookup() - self.assertEqual({'name': 'template_name', 'id': 10}, self.cahandler.template_info_dic) - - @patch('requests.get') - def test_106__template_id_lookup(self, mock_get): - """ CAhandler._template_id_lookup() - no template """ - self.cahandler.api_host = 'api_host' - mockresponse = Mock() - mockresponse.json = lambda: {'template': {'items': [{'displayName': 'template_name', 'allowed': True, 'linkId': 10, 'linkType': 'linkType'}]}} - mock_get.return_value = mockresponse - self.cahandler.template_info_dic = {'name': 'template_name', 'id': None} - self.cahandler._template_id_lookup() - self.assertEqual({'name': 'template_name', 'id': None}, self.cahandler.template_info_dic) - - @patch('requests.get') - def test_107__template_id_lookup(self, mock_get): - """ CAhandler._template_id_lookup() - no linktype """ - self.cahandler.api_host = 'api_host' - mockresponse = Mock() - mockresponse.json = lambda: {'template': {'items': [{'displayName': 'template_name', 'allowed': True, 'linkId': 10}]}} - mock_get.return_value = mockresponse - self.cahandler.template_info_dic = {'name': 'template_name', 'id': None} - self.cahandler._template_id_lookup() - self.assertEqual({'name': 'template_name', 'id': None}, self.cahandler.template_info_dic) - - @patch('requests.get') - def test_108__template_id_lookup(self, mock_get): - """ CAhandler._template_id_lookup() - empty list """ - self.cahandler.api_host = 'api_host' - mockresponse = Mock() - mockresponse.json = lambda: {'template': {'items': []}} - mock_get.return_value = mockresponse - self.cahandler.template_info_dic = {'name': 'template_name', 'id': None} - self.cahandler._template_id_lookup() - self.assertEqual({'name': 'template_name', 'id': None}, self.cahandler.template_info_dic) - - @patch('requests.get') - def test_109__template_id_lookup(self, mock_get): - """ CAhandler._template_id_lookup() - no items """ - self.cahandler.api_host = 'api_host' - mockresponse = Mock() - mockresponse.json = lambda: {'template': {'blank': []}} - mock_get.return_value = mockresponse - self.cahandler.template_info_dic = {'name': 'template_name', 'id': None} - self.cahandler._template_id_lookup() - self.assertEqual({'name': 'template_name', 'id': None}, self.cahandler.template_info_dic) - - @patch('requests.get') - def test_110__template_id_lookup(self, mock_get): - """ CAhandler._template_id_lookup() - wrong dict """ - self.cahandler.api_host = 'api_host' - mockresponse = Mock() - mockresponse.json = lambda: {'foo': 'bar'} - mock_get.return_value = mockresponse - self.cahandler.template_info_dic = {'name': 'template_name', 'id': None} - self.cahandler._template_id_lookup() - self.assertEqual({'name': 'template_name', 'id': None}, self.cahandler.template_info_dic) - - @patch('requests.get') - def test_111__template_id_lookup(self, mock_get): - """ CAhandler._template_id_lookup() - wrong dict """ - self.cahandler.api_host = 'api_host' - mockresponse = Mock() - mockresponse.json = lambda: {'foo': 'bar'} - mock_get.return_value = mockresponse - self.cahandler.template_info_dic = {'name': 'template_name', 'id': None} - self.cahandler._template_id_lookup() - self.assertEqual({'name': 'template_name', 'id': None}, self.cahandler.template_info_dic) - - @patch('requests.get') - def test_112__template_id_lookup(self, mock_get): - """ CAhandler._template_id_lookup() - empty response """ - self.cahandler.api_host = 'api_host' - mockresponse = Mock() - mockresponse.json = None - mock_get.return_value = mockresponse - self.cahandler.template_info_dic = {'name': 'template_name', 'id': None} - self.cahandler._template_id_lookup() - self.assertEqual({'name': 'template_name', 'id': None}, self.cahandler.template_info_dic) - - @patch('requests.get') - def test_113__template_id_lookup(self, mock_req): - """ CAhandler._cert_id_lookup() - request raises exception """ - self.cahandler.api_host = 'api_host' - mock_req.side_effect = Exception('req_exc') - self.cahandler.template_info_dic = {'name': 'template_name', 'id': None} - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.cahandler._template_id_lookup() - self.assertEqual({'name': 'template_name', 'id': None}, self.cahandler.template_info_dic) - self.assertIn('ERROR:test_a2c:CAhandler._template_id_lookup() returned error: req_exc', lcm.output) + self.assertEqual({'items': 'bar'}, self.cahandler._template_list_get(6)) + + def test_105__templates_enumerate(self): + """ CAhandler._templates_enumerate() """ + template_list = {'items': [{'name': 'foo', 'id': 'id'}, {'name': 'foo1', 'id': 'id1'}]} + self.cahandler.template_info_dic = {'name': 'foo'} + self.cahandler._templates_enumerate(template_list) + self.assertEqual({'id': 'id', 'name': 'foo'}, self.cahandler.template_info_dic) + + def test_106__templates_enumerate(self): + """ CAhandler._templates_enumerate() """ + template_list = {'items': [{'name': 'foo', 'id': 'id'}, {'name': 'foo1', 'id': 'id1'}]} + self.cahandler.template_info_dic = {'name': 'foo1'} + self.cahandler._templates_enumerate(template_list) + self.assertEqual({'id': 'id1', 'name': 'foo1'}, self.cahandler.template_info_dic) + + def test_107__templates_enumerate(self): + """ CAhandler._templates_enumerate() """ + template_list = {'items': [{'name': 'foo', 'id': 'id'}, {'name': 'foo1', 'id': 'id1'}, {'name': 'foo', 'id': 'id2'}]} + self.cahandler.template_info_dic = {'name': 'foo'} + self.cahandler._templates_enumerate(template_list) + self.assertEqual({'id': 'id', 'name': 'foo'}, self.cahandler.template_info_dic) @patch('examples.ca_handler.nclm_ca_handler.CAhandler._config_load') @patch('examples.ca_handler.nclm_ca_handler.CAhandler._config_check') @patch('examples.ca_handler.nclm_ca_handler.CAhandler._login') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._tsg_id_lookup') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._container_id_lookup') def test_114__enter__(self, mock_lookup, mock_login, mock_check, mock_load): """ test enter """ self.cahandler.__enter__() @@ -1342,7 +594,7 @@ def test_114__enter__(self, mock_lookup, mock_login, mock_check, mock_load): @patch('examples.ca_handler.nclm_ca_handler.CAhandler._config_load') @patch('examples.ca_handler.nclm_ca_handler.CAhandler._config_check') @patch('examples.ca_handler.nclm_ca_handler.CAhandler._login') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._tsg_id_lookup') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._container_id_lookup') def test_115__enter__(self, mock_lookup, mock_login, mock_check, mock_load): """ test enter with host already defined """ self.cahandler.api_host = 'api_host' @@ -1355,7 +607,7 @@ def test_115__enter__(self, mock_lookup, mock_login, mock_check, mock_load): @patch('examples.ca_handler.nclm_ca_handler.CAhandler._config_load') @patch('examples.ca_handler.nclm_ca_handler.CAhandler._config_check') @patch('examples.ca_handler.nclm_ca_handler.CAhandler._login') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._tsg_id_lookup') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._container_id_lookup') def test_116__enter__(self, mock_lookup, mock_login, mock_check, mock_load): """ test enter with header defined """ self.cahandler.headers = 'header' @@ -1368,7 +620,7 @@ def test_116__enter__(self, mock_lookup, mock_login, mock_check, mock_load): @patch('examples.ca_handler.nclm_ca_handler.CAhandler._config_load') @patch('examples.ca_handler.nclm_ca_handler.CAhandler._config_check') @patch('examples.ca_handler.nclm_ca_handler.CAhandler._login') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._tsg_id_lookup') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._container_id_lookup') def test_117__enter__(self, mock_lookup, mock_login, mock_check, mock_load): """ test enter with error defined """ self.cahandler.error = 'error' @@ -1381,10 +633,10 @@ def test_117__enter__(self, mock_lookup, mock_login, mock_check, mock_load): @patch('examples.ca_handler.nclm_ca_handler.CAhandler._config_load') @patch('examples.ca_handler.nclm_ca_handler.CAhandler._config_check') @patch('examples.ca_handler.nclm_ca_handler.CAhandler._login') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._tsg_id_lookup') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._container_id_lookup') def test_118__enter__(self, mock_lookup, mock_login, mock_check, mock_load): """ test enter with tst_info_dic defined """ - self.cahandler.tsg_info_dic = {'id': 'foo'} + self.cahandler.container_info_dic = {'id': 'foo'} self.cahandler.__enter__() self.assertTrue(mock_load.called) self.assertTrue(mock_check.called) @@ -1394,10 +646,10 @@ def test_118__enter__(self, mock_lookup, mock_login, mock_check, mock_load): @patch('examples.ca_handler.nclm_ca_handler.CAhandler._config_load') @patch('examples.ca_handler.nclm_ca_handler.CAhandler._config_check') @patch('examples.ca_handler.nclm_ca_handler.CAhandler._login') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._tsg_id_lookup') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._container_id_lookup') def test_119__enter__(self, mock_lookup, mock_login, mock_check, mock_load): """ test enter with error defined """ - self.cahandler.tsg_info_dic = {'id': 'foo'} + self.cahandler.container_info_dic = {'id': 'foo'} self.cahandler.error = 'error' self.cahandler.__enter__() self.assertTrue(mock_load.called) @@ -1405,351 +657,6 @@ def test_119__enter__(self, mock_lookup, mock_login, mock_check, mock_load): self.assertFalse(mock_login.called) self.assertFalse(mock_lookup.called) - @patch('examples.ca_handler.nclm_ca_handler.cert_serial_get') - @patch('requests.get') - def test_120_revoke(self, mock_get, mock_serial): - """ test revoke empty certificate list has been returned """ - self.cahandler.api_host = 'api_host' - mock_serial.return_value = 11 - mockresponse = Mock() - mockresponse.json = lambda: {'foo': 'bar'} - mock_get.return_value = mockresponse - self.assertEqual((404, 'urn:ietf:params:acme:error:serverInternal', 'Cert could not be found'), self.cahandler.revoke('cert', 'rev_reason', 'rev_date')) - - @patch('examples.ca_handler.nclm_ca_handler.cert_serial_get') - @patch('requests.get') - def test_121_revoke(self, mock_get, mock_serial): - """ test revoke request get aborted with exception """ - self.cahandler.api_host = 'api_host' - mock_serial.return_value = 11 - mock_get.side_effect = Exception('ex_req_get') - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.assertEqual((404, 'urn:ietf:params:acme:error:serverInternal', 'Cert could not be found'), self.cahandler.revoke('cert', 'rev_reason', 'rev_date')) - self.assertIn('ERROR:test_a2c:CAhandler.revoke(): request get aborted with err: ex_req_get', lcm.output) - - @patch('examples.ca_handler.nclm_ca_handler.cert_serial_get') - @patch('requests.get') - def test_122_revoke(self, mock_get, mock_serial): - """ test revoke certificates in certificate_list but content is bogus """ - self.cahandler.api_host = 'api_host' - mock_serial.return_value = 11 - mockresponse = Mock() - mockresponse.json = lambda: {'certificates': [{'foo': 'bar'}]} - mock_get.return_value = mockresponse - self.assertEqual((404, 'urn:ietf:params:acme:error:serverInternal', 'CertificateID could not be found'), self.cahandler.revoke('cert', 'rev_reason', 'rev_date')) - - @patch('requests.post') - @patch('examples.ca_handler.nclm_ca_handler.cert_serial_get') - @patch('requests.get') - def test_123_revoke(self, mock_get, mock_serial, mock_post): - """ test revoke certificates in certificate_list all good """ - self.cahandler.api_host = 'api_host' - mock_serial.return_value = 11 - mockresponse = Mock() - mockresponse.json = lambda: {'certificates': [{'certificateId': 100}]} - mock_get.return_value = mockresponse - mockresponse2 = Mock() - mockresponse2.json = lambda: {'foo': 'bar'} - mock_post.return_value = mockresponse2 - self.assertEqual((200, None, {'foo': 'bar'}), self.cahandler.revoke('cert', 'rev_reason', 'rev_date')) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._api_post') - @patch('examples.ca_handler.nclm_ca_handler.cert_serial_get') - @patch('requests.get') - def test_124_revoke(self, mock_get, mock_serial, mock_post): - """ test revoke certificates in certificate_list but request.post returns execption """ - self.cahandler.api_host = 'api_host' - mock_serial.return_value = 11 - mockresponse = Mock() - mockresponse.json = lambda: {'certificates': [{'certificateId': 100}]} - mock_get.return_value = mockresponse - mock_post.side_effect = Exception('ex_req_post') - self.assertEqual((500, 'urn:ietf:params:acme:error:serverInternal', 'Revocation operation failed'), self.cahandler.revoke('cert', 'rev_reason', 'rev_date')) - - def test_125_enroll(self): - """ enroll() if there is an error """ - self.cahandler.error = 'foo' - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.assertEqual((None, None, None, None), self.cahandler.enroll('csr')) - self.assertIn('ERROR:test_a2c:foo', lcm.output) - - def test_126_enroll(self): - """ enroll() no target-system-id """ - self.cahandler.tsg_info_dic = {'id': None, 'name': 'name'} - self.assertEqual(('CAhandler.eroll(): ID lookup for targetSystemGroup "name" failed.', None, None, None), self.cahandler.enroll('csr')) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_enroll') - @patch('examples.ca_handler.nclm_ca_handler.csr_san_get') - @patch('examples.ca_handler.nclm_ca_handler.csr_cn_get') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._template_id_lookup') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._ca_policylink_id_lookup') - def test_127_enroll(self, mock_lookup, mock_tmpl_lookup, mock_cn_get, mock_san_get, mock_enroll): - """ enroll() with certid """ - self.cahandler.api_host = 'api_host' - self.cahandler.tsg_info_dic = {'id': 10, 'name': 'name'} - self.cahandler.wait_interval = 0 - mock_lookup.return_value = 10 - mock_cn_get.return_value = 'cn' - mock_san_get.return_value = ['foo.bar.local'] - mock_enroll.return_value = ['error', 'cert_bundle', 'cert_raw'] - self.assertEqual(('error', 'cert_bundle', 'cert_raw', None), self.cahandler.enroll('csr')) - self.assertFalse(mock_tmpl_lookup.called) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_enroll') - @patch('examples.ca_handler.nclm_ca_handler.csr_san_get') - @patch('examples.ca_handler.nclm_ca_handler.csr_cn_get') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._template_id_lookup') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._ca_policylink_id_lookup') - def test_128_enroll(self, mock_lookup, mock_tmpl_lookup, mock_cn_get, mock_san_get, mock_enroll): - """ enroll() with certid """ - self.cahandler.api_host = 'api_host' - self.cahandler.tsg_info_dic = {'id': 10, 'name': 'name'} - self.cahandler.template_info_dic = {'name': 'name', 'id': None} - self.cahandler.wait_interval = 0 - mock_lookup.return_value = 10 - mock_cn_get.return_value = 'cn' - mock_san_get.return_value = ['foo.bar.local'] - mock_enroll.return_value = ['error', 'cert_bundle', 'cert_raw'] - self.assertEqual(('error', 'cert_bundle', 'cert_raw', None), self.cahandler.enroll('csr')) - self.assertTrue(mock_tmpl_lookup.called) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_bundle_build') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._template_id_lookup') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_id_lookup') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._api_post') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._csr_id_lookup') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._request_import') - @patch('examples.ca_handler.nclm_ca_handler.csr_san_get') - @patch('examples.ca_handler.nclm_ca_handler.csr_cn_get') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._ca_policylink_id_lookup') - def test_129_enroll(self, mock_lookup, mock_cn_get, mock_san_get, mock_reqimp, mock_csr_lookup, mock_post, mock_cert_lookup, mock_tmpl_lookup, mock_bundle): - """ enroll() tmpload """ - self.cahandler.api_host = 'api_host' - self.cahandler.tsg_info_dic = {'id': 'id', 'name': 'name'} - self.cahandler.wait_interval = 0 - self.cahandler.template_info_dic = {'name': 'name', 'id': None} - mock_lookup.return_value = 0 - mock_cn_get.return_value = 'cn' - mock_san_get.return_value = ['foo.bar.local'] - mock_reqimp.return_value = True - mock_csr_lookup.return_value = 10 - mock_post.return_value = True - mock_cert_lookup.return_value = 10 - mock_bundle.return_value = ('error', 'cert_bundle', 'cert_raw') - self.assertEqual(('enrollment aborted. policylink_id: 0, tsg_id: id', None, None, None), self.cahandler.enroll('csr')) - self.assertFalse(mock_tmpl_lookup.called) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_bundle_build') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_id_lookup') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._api_post') - @patch('examples.ca_handler.nclm_ca_handler.b64_encode') - @patch('examples.ca_handler.nclm_ca_handler.convert_string_to_byte') - @patch('examples.ca_handler.nclm_ca_handler.build_pem_file') - def test_130_cert_enroll(self, mock_build, mock_byte, mock_b64, mock_post, mock_cert_lookup, mock_bundle): - """ test cert_enroll() """ - self.cahandler.wait_interval = 0 - self.cahandler.api_host = 'api_host' - self.cahandler.tsg_info_dic = {'id': 10, 'name': 'name'} - mock_post.return_value = 'mock_post' - self.assertEqual(('enrollment failed: mock_post', None, None), self.cahandler._cert_enroll('csr', 'cn', ['san'], 'policylink_id'))# - self.assertTrue(mock_build.called) - self.assertTrue(mock_byte.called) - self.assertTrue(mock_b64.called) - self.assertFalse(mock_cert_lookup.called) - self.assertFalse(mock_bundle.called) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_bundle_build') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_id_lookup') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._api_post') - @patch('examples.ca_handler.nclm_ca_handler.b64_encode') - @patch('examples.ca_handler.nclm_ca_handler.convert_string_to_byte') - @patch('examples.ca_handler.nclm_ca_handler.build_pem_file') - def test_131_cert_enroll(self, mock_build, mock_byte, mock_b64, mock_post, mock_cert_lookup, mock_bundle): - """ test cert_enroll() """ - self.cahandler.wait_interval = 0 - self.cahandler.api_host = 'api_host' - self.cahandler.tsg_info_dic = {'id': 10, 'name': 'name'} - self.cahandler.template_info_dic = {'name': 'name', 'id': 'id'} - mock_post.return_value = 'mock_post' - self.assertEqual(('enrollment failed: mock_post', None, None), self.cahandler._cert_enroll('csr', 'cn', ['san'], 'policylink_id'))# - self.assertTrue(mock_build.called) - self.assertTrue(mock_byte.called) - self.assertTrue(mock_b64.called) - self.assertFalse(mock_cert_lookup.called) - self.assertFalse(mock_bundle.called) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_bundle_build') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_id_lookup') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._api_post') - @patch('examples.ca_handler.nclm_ca_handler.b64_encode') - @patch('examples.ca_handler.nclm_ca_handler.convert_string_to_byte') - @patch('examples.ca_handler.nclm_ca_handler.build_pem_file') - def test_132_cert_enroll(self, mock_build, mock_byte, mock_b64, mock_post, mock_cert_lookup, mock_bundle): - """ test cert_enroll() """ - self.cahandler.wait_interval = 0 - self.cahandler.api_host = 'api_host' - self.cahandler.tsg_info_dic = {'id': 10, 'name': 'name'} - self.cahandler.template_info_dic = {'name': 'name', 'id': 'id'} - mock_post.return_value = {'status': 400} - self.assertEqual(("enrollment failed: {'status': 400}", None, None), self.cahandler._cert_enroll('csr', 'cn', ['san'], 'policylink_id'))# - self.assertTrue(mock_build.called) - self.assertTrue(mock_byte.called) - self.assertTrue(mock_b64.called) - self.assertFalse(mock_cert_lookup.called) - self.assertFalse(mock_bundle.called) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_bundle_build') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_id_lookup') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._api_post') - @patch('examples.ca_handler.nclm_ca_handler.b64_encode') - @patch('examples.ca_handler.nclm_ca_handler.convert_string_to_byte') - @patch('examples.ca_handler.nclm_ca_handler.build_pem_file') - def test_133_cert_enroll(self, mock_build, mock_byte, mock_b64, mock_post, mock_cert_lookup, mock_bundle): - """ test cert_enroll() """ - self.cahandler.wait_interval = 0 - self.cahandler.api_host = 'api_host' - self.cahandler.tsg_info_dic = {'id': 10, 'name': 'name'} - self.cahandler.template_info_dic = {'name': 'name', 'id': 'id'} - mock_post.return_value = {'status': 400, 'message': 'message'} - self.assertEqual(("enrollment failed: message", None, None), self.cahandler._cert_enroll('csr', 'cn', ['san'], 'policylink_id'))# - self.assertTrue(mock_build.called) - self.assertTrue(mock_byte.called) - self.assertTrue(mock_b64.called) - self.assertFalse(mock_cert_lookup.called) - self.assertFalse(mock_bundle.called) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_bundle_build') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_id_lookup') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._api_post') - @patch('examples.ca_handler.nclm_ca_handler.b64_encode') - @patch('examples.ca_handler.nclm_ca_handler.convert_string_to_byte') - @patch('examples.ca_handler.nclm_ca_handler.build_pem_file') - def test_134_cert_enroll(self, mock_build, mock_byte, mock_b64, mock_post, mock_cert_lookup, mock_bundle): - """ test cert_enroll() """ - self.cahandler.wait_interval = 0 - self.cahandler.api_host = 'api_host' - self.cahandler.tsg_info_dic = {'id': 10, 'name': 'name'} - self.cahandler.template_info_dic = {'name': 'name', 'id': 'id'} - mock_post.return_value = {'status': 200, 'message': None} - mock_cert_lookup.return_value = None - self.assertEqual(("certifcate id lookup failed for: cn, ['san']", None, None), self.cahandler._cert_enroll('csr', 'cn', ['san'], 'policylink_id'))# - self.assertTrue(mock_build.called) - self.assertTrue(mock_byte.called) - self.assertTrue(mock_b64.called) - self.assertTrue(mock_cert_lookup.called) - self.assertFalse(mock_bundle.called) - - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_bundle_build') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_id_lookup') - @patch('examples.ca_handler.nclm_ca_handler.CAhandler._api_post') - @patch('examples.ca_handler.nclm_ca_handler.b64_encode') - @patch('examples.ca_handler.nclm_ca_handler.convert_string_to_byte') - @patch('examples.ca_handler.nclm_ca_handler.build_pem_file') - def test_135_cert_enroll(self, mock_build, mock_byte, mock_b64, mock_post, mock_cert_lookup, mock_bundle): - """ test cert_enroll() """ - self.cahandler.wait_interval = 0 - self.cahandler.api_host = 'api_host' - self.cahandler.tsg_info_dic = {'id': 10, 'name': 'name'} - self.cahandler.template_info_dic = {'name': 'name', 'id': 'id'} - mock_post.return_value = {'status': 200, 'message': None} - mock_cert_lookup.return_value = 10 - mock_bundle.return_value = ('error', 'cert_bundle', 'cert_raw') - self.assertEqual(('error', 'cert_bundle', 'cert_raw'), self.cahandler._cert_enroll('csr', 'cn', ['san'], 'policylink_id'))# - self.assertTrue(mock_build.called) - self.assertTrue(mock_byte.called) - self.assertTrue(mock_b64.called) - self.assertTrue(mock_cert_lookup.called) - self.assertTrue(mock_bundle.called) - - @patch('requests.get') - def test_136__cert_list_fetch(self, mock_req): - """ _cert_list_fetch() - response without next """ - self.cahandler.api_host = 'api_host' - mockresponse = Mock() - mockresponse.json = lambda: {'certificates': ['foo', 'bar']} - mock_req.return_value = mockresponse - self.assertEqual(['foo', 'bar'], self.cahandler._cert_list_fetch('url')) - - @patch('requests.get') - def test_137__cert_list_fetch(self, mock_req): - """ _cert_list_fetch() - response 1x pagination """ - self.cahandler.api_host = 'api_host' - mockresponse1 = Mock() - mockresponse1.json = lambda: {'certificates': ['foo1', 'bar1'], 'next': 'url'} - mockresponse2 = Mock() - mockresponse2.json = lambda: {'certificates': ['foo2', 'bar2']} - mock_req.side_effect = [mockresponse1, mockresponse2] - self.assertEqual(['foo1', 'bar1', 'foo2', 'bar2'], self.cahandler._cert_list_fetch('url')) - - @patch('requests.get') - def test_138__cert_list_fetch(self, mock_req): - """ _cert_list_fetch() - response 2x pagination """ - self.cahandler.api_host = 'api_host' - mockresponse1 = Mock() - mockresponse1.json = lambda: {'certificates': ['foo1', 'bar1'], 'next': 'url'} - mockresponse2 = Mock() - mockresponse2.json = lambda: {'certificates': ['foo2', 'bar2'], 'next': 'url'} - mockresponse3 = Mock() - mockresponse3.json = lambda: {'certificates': ['foo3', 'bar3']} - mock_req.side_effect = [mockresponse1, mockresponse2, mockresponse3] - self.assertEqual(['foo1', 'bar1', 'foo2', 'bar2', 'foo3', 'bar3'], self.cahandler._cert_list_fetch('url')) - - @patch('requests.get') - def test_139__cert_list_fetch(self, mock_req): - """ _cert_list_fetch() - empty response """ - self.cahandler.api_host = 'api_host' - mockresponse = Mock() - mockresponse.json = lambda: {} - mock_req.return_value = mockresponse - self.assertFalse(self.cahandler._cert_list_fetch('url')) - - @patch('requests.get') - def test_140__cert_list_fetch(self, mock_req): - """ _cert_list_fetch() - request.get triggers execption """ - self.cahandler.api_host = 'api_host' - mock_req.side_effect = Exception('foo') - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.assertFalse(self.cahandler._cert_list_fetch('url')) - self.assertIn('ERROR:test_a2c:CAhandler._cert_list_fetch() returned error: foo', lcm.output) - - @patch('requests.get') - def test_141__lastrequests_get(self, mock_req): - """ test_132__lastrequests_get() - all ok """ - self.cahandler.api_host = 'api_host' - mockresponse = Mock() - mockresponse.json = lambda: {'requests': ['foo', 'bar', 'foo', 'bar']} - mock_req.return_value = mockresponse - self.assertEqual(['foo', 'bar', 'foo', 'bar'], self.cahandler._lastrequests_get()) - - @patch('requests.get') - def test_142__lastrequests_get(self, mock_req): - """ test_132__lastrequests_get() - all ok """ - self.cahandler.api_host = 'api_host' - mockresponse = Mock() - mockresponse.json = lambda: {'requests': ['foo', 'bar', 'foo', 'bar']} - mock_req.return_value = mockresponse - self.assertEqual(['foo', 'bar', 'foo', 'bar'], self.cahandler._lastrequests_get()) - - @patch('requests.get') - def test_143__lastrequests_get(self, mock_req): - """ test_132__lastrequests_get() - no request list in response """ - self.cahandler.api_host = 'api_host' - mockresponse = Mock() - mockresponse.json = lambda: {'foo': 'bar'} - mock_req.return_value = mockresponse - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.assertFalse(self.cahandler._lastrequests_get()) - self.assertIn('ERROR:test_a2c:_lastrequests_get(): response incomplete:', lcm.output) - - @patch('requests.get') - def test_144__cert_list_fetch(self, mock_req): - """ _cert_list_fetch() - request.get triggers execption """ - self.cahandler.api_host = 'api_host' - mock_req.side_effect = Exception('foo') - with self.assertLogs('test_a2c', level='INFO') as lcm: - self.assertFalse(self.cahandler._lastrequests_get()) - self.assertIn('ERROR:test_a2c:CAhandler._lastrequests_get() returned error: foo', lcm.output) - def test_145__ca_id_get(self): """ test _ca_id_get() """ ca_list = {} @@ -1762,24 +669,24 @@ def test_146__ca_id_get(self): def test_147__ca_id_get(self): """ test _ca_id_get() """ - ca_list = {'ca': {'items': 'bar'}} + ca_list = {'items': 'bar'} self.assertFalse(self.cahandler._ca_id_get(ca_list)) def test_148__ca_id_get(self): """ test _ca_id_get() """ - ca_list = {'ca': {'items': [{'foo': 'bar'}]}} + ca_list = {'items': [{'foo': 'bar'}]} self.assertFalse(self.cahandler._ca_id_get(ca_list)) def test_149__ca_id_get(self): """ test _ca_id_get() """ self.cahandler.ca_name = 'ca_name' - ca_list = {'ca': {'items': [{'displayName': 'ca_name', 'policyLinkId': 'id'}]}} + ca_list = {'items': [{'name': 'ca_name', 'id': 'id'}]} self.assertEqual('id', self.cahandler._ca_id_get(ca_list)) def test_150__ca_id_get(self): """ test _ca_id_get() """ self.cahandler.ca_name = 'ca_name' - ca_list = {'ca': {'items': [{'displayName': 'ca_name', 'foo': 'id'}]}} + ca_list = {'items': [{'name': 'ca_name', 'id1': 'id'}]} with self.assertLogs('test_a2c', level='INFO') as lcm: self.assertFalse(self.cahandler._ca_id_get(ca_list)) self.assertIn('ERROR:test_a2c:ca_id.lookup() policyLinkId field is missing ...', lcm.output) @@ -1787,7 +694,7 @@ def test_150__ca_id_get(self): def test_151__ca_id_get(self): """ test _ca_id_get() """ self.cahandler.ca_name = 'ca_name1' - ca_list = {'ca': {'items': [{'displayName': 'ca_name', 'policyLinkId': 'id'}]}} + ca_list = {'items': [{'name': 'ca_name', 'id': 'id'}]} self.assertFalse(self.cahandler._ca_id_get(ca_list)) @patch('examples.ca_handler.nclm_ca_handler.CAhandler._ca_id_get') @@ -1795,9 +702,9 @@ def test_151__ca_id_get(self): def test_152__ca_policylink_id_lookup(self, mock_req, mock_caid): """ test _ca_policylink_id_lookup() """ self.cahandler.api_host = 'api_host' - self.cahandler.tsg_info_dic = {'id': 'id'} + self.cahandler.container_info_dic = {'id': 'id'} mockresponse = Mock() - mockresponse.json = lambda: {'ca': ['foo', 'bar', 'foo', 'bar']} + mockresponse.json = lambda: {'items': ['foo', 'bar', 'foo', 'bar']} mock_req.return_value = mockresponse mock_caid.return_value = 10 self.assertEqual(10, self.cahandler._ca_policylink_id_lookup()) @@ -1808,9 +715,9 @@ def test_152__ca_policylink_id_lookup(self, mock_req, mock_caid): def test_153__ca_policylink_id_lookup(self, mock_req, mock_caid): """ test _ca_policylink_id_lookup() """ self.cahandler.api_host = 'api_host' - self.cahandler.tsg_info_dic = {'id': 'id'} + self.cahandler.container_info_dic = {'id': 'id'} mockresponse = Mock() - mockresponse.json = lambda: {'ca': ['foo', 'bar', 'foo', 'bar']} + mockresponse.json = lambda: {'items': ['foo', 'bar', 'foo', 'bar']} mock_req.return_value = mockresponse mock_caid.return_value = None with self.assertLogs('test_a2c', level='INFO') as lcm: @@ -1823,7 +730,7 @@ def test_153__ca_policylink_id_lookup(self, mock_req, mock_caid): def test_154__ca_policylink_id_lookup(self, mock_req, mock_caid): """ test _ca_policylink_id_lookup() """ self.cahandler.api_host = 'api_host' - self.cahandler.tsg_info_dic = {'id': 'id'} + self.cahandler.container_info_dic = {'id': 'id'} mockresponse = Mock() mockresponse.json = lambda: {'foo': ['foo', 'bar', 'foo', 'bar']} mock_req.return_value = mockresponse @@ -1834,6 +741,401 @@ def test_154__ca_policylink_id_lookup(self, mock_req, mock_caid): self.assertIn('ERROR:test_a2c:ca_id.lookup() no CAs found in response ...', lcm.output) self.assertFalse(mock_caid.called) + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_bundle_build') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_id_get') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._csr_post') + def test_0155__cert_enroll(self, mock_post, mock_idget, mock_build): + """ test _cert_enroll() """ + mock_post.return_value = 'mock_post' + mock_idget.return_value = 'mock_idget' + mock_build.return_value = ('error', 'bundle', 'raw') + self.assertEqual(('error', 'bundle', 'raw', 'mock_idget'), self.cahandler._cert_enroll('cr', 'policylink_id')) + self.assertTrue(mock_post.called) + self.assertTrue(mock_idget.called) + self.assertTrue(mock_build.called) + + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_bundle_build') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_id_get') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._csr_post') + def test_0156__cert_enroll(self, mock_post, mock_idget, mock_build): + """ test _cert_enroll() """ + mock_post.return_value = 'mock_post' + mock_idget.return_value = None + mock_build.return_value = ('error', 'bundle', 'raw') + with self.assertLogs('test_a2c', level='INFO') as lcm: + self.assertEqual(('Certifcate_id lookup failed', None, None, None), self.cahandler._cert_enroll('cr', 'policylink_id')) + self.assertIn('ERROR:test_a2c:CAhandler.eroll(): certifcate_id lookup failed for job: mock_post', lcm.output) + self.assertTrue(mock_post.called) + self.assertTrue(mock_idget.called) + self.assertFalse(mock_build.called) + + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_bundle_build') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_id_get') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._csr_post') + def test_0157__cert_enroll(self, mock_post, mock_idget, mock_build): + """ test _cert_enroll() """ + mock_post.return_value = None + mock_idget.return_value = 'mock_idget' + mock_build.return_value = ('error', 'bundle', 'raw') + with self.assertLogs('test_a2c', level='INFO') as lcm: + self.assertEqual(('job_id lookup failed', None, None, None), self.cahandler._cert_enroll('cr', 'policylink_id')) + self.assertIn('ERROR:test_a2c:CAhandler.eroll(): job_id lookup failed for job', lcm.output) + self.assertTrue(mock_post.called) + self.assertFalse(mock_idget.called) + self.assertFalse(mock_build.called) + + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._api_post') + @patch('examples.ca_handler.nclm_ca_handler.convert_string_to_byte') + @patch('examples.ca_handler.nclm_ca_handler.b64_encode') + @patch('examples.ca_handler.nclm_ca_handler.build_pem_file') + def test_0158__csr_post(self, mock_pem, mock_enc, mock_convert, mock_post): + """ test _csr_post() """ + mock_pem.return_value = 'mock_pem' + mock_enc.return_value = 'mock_enc' + mock_convert.return_value = 'mock_convert' + mock_post.return_value = {'id': 'id', 'foo': 'bar'} + self.assertEqual('id', self.cahandler._csr_post('csr', 'policylink_id')) + self.assertTrue(mock_convert.called) + self.assertTrue(mock_enc.called) + self.assertTrue(mock_pem.called) + + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._api_post') + @patch('examples.ca_handler.nclm_ca_handler.convert_string_to_byte') + @patch('examples.ca_handler.nclm_ca_handler.b64_encode') + @patch('examples.ca_handler.nclm_ca_handler.build_pem_file') + def test_0159__csr_post(self, mock_pem, mock_enc, mock_convert, mock_post): + """ test _csr_post() """ + mock_pem.return_value = 'mock_pem' + mock_enc.return_value = 'mock_enc' + mock_convert.return_value = 'mock_convert' + mock_post.return_value = {'foo': 'bar'} + self.cahandler.template_info_dic = {'id': 'id'} + self.assertFalse(self.cahandler._csr_post('csr', 'policylink_id')) + self.assertTrue(mock_convert.called) + self.assertTrue(mock_enc.called) + self.assertTrue(mock_pem.called) + + @patch('requests.get') + def test_0160__issuer_certid_get(self, mock_req): + """ test _issuer_certid_get() """ + cert_dic = {'urls': {'issuer': 'issuer'}} + self.cahandler.api_host = 'api_host' + mockresponse = Mock() + mockresponse.json = lambda: {'urls': {'certificate': 'foo/v2/certificates/'}} + mock_req.return_value = mockresponse + self.assertEqual(('foo', True), self.cahandler._issuer_certid_get(cert_dic)) + + @patch('requests.get') + def test_0161__issuer_certid_get(self, mock_req): + """ test _issuer_certid_get() """ + cert_dic = {'urls': {'issuer': 'issuer'}} + self.cahandler.api_host = 'api_host' + mockresponse = Mock() + mockresponse.json = lambda: {'urls': {'bar': 'foo/v2/certificates/'}} + mock_req.return_value = mockresponse + self.assertEqual((None, False), self.cahandler._issuer_certid_get(cert_dic)) + + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._issuer_certid_get') + @patch('examples.ca_handler.nclm_ca_handler.build_pem_file') + @patch('requests.get') + def test_0161__cert_bundle_build(self, mock_req, mock_pem, mock_certid): + """ test _cert_bundle_build() """ + mock_pem.return_value = 'mock_pem' + mock_certid.return_value = ('id', False) + mockresponse = Mock() + mockresponse.json = lambda: {'der': 'der'} + mock_req.return_value = mockresponse + self.assertEqual((None, 'mock_pem', 'der'), self.cahandler._cert_bundle_build('cert_id')) + + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._issuer_certid_get') + @patch('examples.ca_handler.nclm_ca_handler.build_pem_file') + @patch('requests.get') + def test_0162__cert_bundle_build(self, mock_req, mock_pem, mock_certid): + """ test _cert_bundle_build() """ + mock_pem.side_effect = ['mock_pem1', 'mock_pem2'] + mock_certid.side_effect = [('id1', True), ('id2', False)] + mockresponse = Mock() + mockresponse.json = lambda: {'der': 'der'} + mock_req.return_value = mockresponse + self.assertEqual((None, 'mock_pem2', 'der'), self.cahandler._cert_bundle_build('cert_id')) + + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._issuer_certid_get') + @patch('examples.ca_handler.nclm_ca_handler.build_pem_file') + @patch('requests.get') + def test_0163__cert_bundle_build(self, mock_req, mock_pem, mock_certid): + """ test _cert_bundle_build() """ + mock_pem.return_value = '' + mock_certid.return_value = ('id', False) + mockresponse = Mock() + mockresponse.json = lambda: {'der': 'der'} + mock_req.return_value = mockresponse + self.assertEqual((None, None, 'der'), self.cahandler._cert_bundle_build('cert_id')) + + @patch('time.sleep') + @patch('requests.get') + def test_0164__cert_id_get(self, mock_req, mock_sleep): + """ test _cert_id_get() """ + mockresponse = Mock() + mockresponse.json = lambda: {'status': 'done', 'entities': [{'ref': 'certificate', 'url': 'foo/v2/certificates/'}]} + mock_req.return_value = mockresponse + self.assertEqual('foo', self.cahandler._cert_id_get(10)) + + @patch('time.sleep') + @patch('requests.get') + def test_0165__cert_id_get(self, mock_req, mock_sleep): + """ test _cert_id_get() """ + mockresponse1 = Mock() + mockresponse1.json = lambda: {'status': 'note', 'entities': [{'ref': 'certificate', 'url': 'foo1/v2/certificates/'}]} + mockresponse2 = Mock() + mockresponse2.json = lambda: {'status': 'done', 'entities': [{'ref': 'certificate', 'url': 'foo2/v2/certificates/'}]} + mock_req.side_effect = [mockresponse1, mockresponse2] + self.assertEqual('foo2', self.cahandler._cert_id_get(10)) + + @patch('requests.get') + @patch('examples.ca_handler.nclm_ca_handler.cert_serial_get') + def test_0166__certid_get_from_serial(self, mock_serial, mock_req): + """ _certid_get_from_serial() """ + mock_serial.return_value = 'mock_serial' + mockresponse = Mock() + mockresponse.json = lambda: {'items': [{'id': 'id1'}, {'id': 'id2'}]} + mock_req.return_value = mockresponse + self.assertEqual('id1', self.cahandler._certid_get_from_serial('cert_raw')) + + @patch('requests.get') + @patch('examples.ca_handler.nclm_ca_handler.cert_serial_get') + def test_0167__certid_get_from_serial(self, mock_serial, mock_req): + """ _certid_get_from_serial() """ + mock_serial.return_value = 'mock_serial' + mockresponse = Mock() + mockresponse.json = lambda: {'items': [{'di': 'id1'}, {'id': 'id2'}]} + mock_req.return_value = mockresponse + with self.assertLogs('test_a2c', level='INFO') as lcm: + self.assertEqual(None, self.cahandler._certid_get_from_serial('cert_raw')) + self.assertIn('ERROR:test_a2c:CAhandler._certid_get_from_serial(): no certificate found for serial: mock_serial', lcm.output) + + @patch('requests.get') + @patch('examples.ca_handler.nclm_ca_handler.cert_serial_get') + def test_0167__certid_get_from_serial(self, mock_serial, mock_req): + """ _certid_get_from_serial() """ + mock_serial.return_value = 'mock_serial' + mock_req.side_effect = Exception('mock_req') + with self.assertLogs('test_a2c', level='INFO') as lcm: + self.assertEqual(None, self.cahandler._certid_get_from_serial('cert_raw')) + self.assertIn('ERROR:test_a2c:CAhandler._certid_get_from_serial(): request get aborted with err: mock_req', lcm.output) + self.assertIn('ERROR:test_a2c:CAhandler._certid_get_from_serial(): no certificate found for serial: mock_serial', lcm.output) + + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._certid_get_from_serial') + @patch('examples.ca_handler.nclm_ca_handler.header_info_get') + @patch('examples.ca_handler.nclm_ca_handler.b64_url_recode') + def test_0168__cert_id_lookup(self, mock_enc, mock_info, mock_serial): + """ test _cert_id_lookup() """ + mock_enc.return_value = 'mock_enc' + mock_info.return_value = [{'poll_identifier': 'poll_identifier'}] + mock_serial.return_value = 'mock_serial' + self.assertEqual('poll_identifier', self.cahandler._cert_id_lookup('cert_raw')) + self.assertFalse(mock_serial.called) + + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._certid_get_from_serial') + @patch('examples.ca_handler.nclm_ca_handler.header_info_get') + @patch('examples.ca_handler.nclm_ca_handler.b64_url_recode') + def test_0169__cert_id_lookup(self, mock_enc, mock_info, mock_serial): + """ test _cert_id_lookup() """ + mock_enc.return_value = 'mock_enc' + mock_info.return_value = [{'poll_identifier': None}] + mock_serial.return_value = 'mock_serial' + self.assertEqual('mock_serial', self.cahandler._cert_id_lookup('cert_raw')) + self.assertTrue(mock_serial.called) + + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._certid_get_from_serial') + @patch('examples.ca_handler.nclm_ca_handler.header_info_get') + @patch('examples.ca_handler.nclm_ca_handler.b64_url_recode') + def test_0170__cert_id_lookup(self, mock_enc, mock_info, mock_serial): + """ test _cert_id_lookup() """ + mock_enc.return_value = 'mock_enc' + mock_info.return_value = [{'foo': 'bar'}] + mock_serial.return_value = 'mock_serial' + self.assertEqual('mock_serial', self.cahandler._cert_id_lookup('cert_raw')) + self.assertTrue(mock_serial.called) + + @patch('time.sleep') + @patch('requests.get') + def test_0171__revocation_status_poll(self, mock_req, mock_sleep): + """ test _revocation_status_poll() """ + mockresponse = Mock() + mockresponse.json = lambda: {'status': 'done'} + mock_req.return_value = mockresponse + err_dic = {'serverinternal': 'serverinternal'} + self.assertEqual((200, None, None), self.cahandler._revocation_status_poll('cert_id', err_dic)) + + @patch('time.sleep') + @patch('requests.get') + def test_0172__revocation_status_poll(self, mock_req, mock_sleep): + """ test _revocation_status_poll() """ + mockresponse = Mock() + mockresponse.json = lambda: {'status': 'failed'} + mock_req.return_value = mockresponse + err_dic = {'serverinternal': 'serverinternal'} + self.assertEqual((500, 'serverinternal', 'Revocation operation failed: error from API'), self.cahandler._revocation_status_poll('cert_id', err_dic)) + + @patch('time.sleep') + @patch('requests.get') + def test_0173__revocation_status_poll(self, mock_req, mock_sleep): + """ test _revocation_status_poll() """ + mockresponse1 = Mock() + mockresponse1.json = lambda: {'status': 'pending'} + mockresponse2 = Mock() + mockresponse2.json = lambda: {'status': 'done'} + mock_req.side_effect = [mockresponse2, mockresponse2] + err_dic = {'serverinternal': 'serverinternal'} + self.assertEqual((200, None, None), self.cahandler._revocation_status_poll('cert_id', err_dic)) + + @patch('time.sleep') + @patch('requests.get') + def test_0174__revocation_status_poll(self, mock_req, mock_sleep): + """ test _revocation_status_poll() """ + mockresponse1 = Mock() + mockresponse1.json = lambda: {'status': 'pending'} + mockresponse2 = Mock() + mockresponse2.json = lambda: {'status': 'failed'} + mock_req.side_effect = [mockresponse2, mockresponse2] + err_dic = {'serverinternal': 'serverinternal'} + self.assertEqual((500, 'serverinternal', 'Revocation operation failed: error from API'), self.cahandler._revocation_status_poll('cert_id', err_dic)) + + @patch('time.sleep') + @patch('requests.get') + def test_0175__revocation_status_poll(self, mock_req, mock_sleep): + """ test _revocation_status_poll() """ + mockresponse = Mock() + mockresponse.json = lambda: {'status': 'pending'} + mock_req.return_value = mockresponse + err_dic = {'serverinternal': 'serverinternal'} + self.assertEqual((500, 'serverinternal', 'Revocation operation failed: Timeout'), self.cahandler._revocation_status_poll('cert_id', err_dic)) + + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_enroll') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._template_id_lookup') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._ca_policylink_id_lookup') + @patch('examples.ca_handler.nclm_ca_handler.b64_url_recode') + def test_0176_enroll(self, mock_recode, mock_policy, mock_template, mock_enroll): + """ test enroll """ + mock_recode.return_value = 'csr' + mock_policy.return_value = 'policylink_id' + mock_template.return_value = 'template_id' + mock_enroll.return_value = ('error', 'bundle', 'raw', 'cert_id') + self.cahandler.template_info_dic = {'name': 'name', 'id': None} + self.cahandler.container_info_dic = {'name': 'name', 'id': 'id'} + self.assertEqual(('error', 'bundle', 'raw', 'cert_id'), self.cahandler.enroll('csr')) + self.assertTrue(mock_recode.called) + self.assertTrue(mock_policy.called) + self.assertTrue(mock_template.called) + self.assertTrue(mock_enroll.called) + + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_enroll') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._template_id_lookup') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._ca_policylink_id_lookup') + @patch('examples.ca_handler.nclm_ca_handler.b64_url_recode') + def test_0177_enroll(self, mock_recode, mock_policy, mock_template, mock_enroll): + """ test enroll """ + mock_recode.return_value = 'csr' + mock_policy.return_value = 'policylink_id' + mock_template.return_value = 'template_id' + mock_enroll.return_value = ('error', 'bundle', 'raw', 'cert_id') + self.cahandler.template_info_dic = {'name': 'name', 'id': None} + self.cahandler.container_info_dic = {'name': 'name', 'id': None} + self.assertEqual(('CAhandler.eroll(): ID lookup for container"name" failed.', None, None, None), self.cahandler.enroll('csr')) + self.assertTrue(mock_recode.called) + self.assertFalse(mock_policy.called) + self.assertFalse(mock_template.called) + self.assertFalse(mock_enroll.called) + + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_enroll') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._template_id_lookup') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._ca_policylink_id_lookup') + @patch('examples.ca_handler.nclm_ca_handler.b64_url_recode') + def test_0178_enroll(self, mock_recode, mock_policy, mock_template, mock_enroll): + """ test enroll """ + mock_recode.return_value = 'csr' + mock_policy.return_value = None + mock_template.return_value = 'template_id' + mock_enroll.return_value = ('error', 'bundle', 'raw', 'cert_id') + self.cahandler.template_info_dic = {'name': 'name', 'id': None} + self.cahandler.container_info_dic = {'name': 'name', 'id': 'id'} + self.assertEqual(('Enrollment aborted. ca: None, tsg_id: id', None, None, None), self.cahandler.enroll('csr')) + self.assertTrue(mock_recode.called) + self.assertTrue(mock_policy.called) + self.assertFalse(mock_template.called) + self.assertFalse(mock_enroll.called) + + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_enroll') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._template_id_lookup') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._ca_policylink_id_lookup') + @patch('examples.ca_handler.nclm_ca_handler.b64_url_recode') + def test_0179_enroll(self, mock_recode, mock_policy, mock_template, mock_enroll): + """ test enroll """ + mock_recode.return_value = 'csr' + mock_policy.return_value = None + mock_template.return_value = 'template_id' + mock_enroll.return_value = ('error', 'bundle', 'raw', 'cert_id') + self.cahandler.template_info_dic = {'name': 'name', 'id': None} + self.cahandler.container_info_dic = {'name': 'name', 'id': 'id'} + self.cahandler.error = 'error' + self.assertEqual(('error', None, None, None), self.cahandler.enroll('csr')) + self.assertTrue(mock_recode.called) + self.assertFalse(mock_policy.called) + self.assertFalse(mock_template.called) + self.assertFalse(mock_enroll.called) + + @patch('examples.ca_handler.nclm_ca_handler.eab_profile_header_info_check') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_enroll') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._template_id_lookup') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._ca_policylink_id_lookup') + @patch('examples.ca_handler.nclm_ca_handler.b64_url_recode') + def test_0180_enroll(self, mock_recode, mock_policy, mock_template, mock_enroll, mock_eab): + """ test enroll """ + mock_recode.return_value = 'csr' + mock_policy.return_value = 'policylink_id' + mock_template.return_value = 'template_id' + mock_enroll.return_value = ('error', 'bundle', 'raw', 'cert_id') + mock_eab.return_value = 'eab' + self.cahandler.template_info_dic = {'name': 'name', 'id': None} + self.cahandler.container_info_dic = {'name': 'name', 'id': 'id'} + self.assertEqual(('eab', None, None, None), self.cahandler.enroll('csr')) + self.assertTrue(mock_recode.called) + self.assertTrue(mock_policy.called) + self.assertTrue(mock_template.called) + self.assertFalse(mock_enroll.called) + + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._revocation_status_poll') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._api_post') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_id_lookup') + @patch('examples.ca_handler.nclm_ca_handler.error_dic_get') + def test_180_revoke(self, mock_err, mock_idl, mock_post, mock_poll): + """ test revoke """ + mock_err.return_value = {'foo': 'bar', 'serverinternal': 'serverinternal'} + mock_idl.return_value = 'cert_id' + mock_post.return_value = {'urls': {'job': 'foo/v2/jobs/'}} + mock_poll.return_value = (200, 'message', 'detail') + self.assertEqual((200, 'message', 'detail'), self.cahandler.revoke('cert_raw')) + self.assertTrue(mock_err.called) + self.assertTrue(mock_idl.called) + self.assertTrue(mock_post.called) + self.assertTrue(mock_poll.called) + + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._revocation_status_poll') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._api_post') + @patch('examples.ca_handler.nclm_ca_handler.CAhandler._cert_id_lookup') + @patch('examples.ca_handler.nclm_ca_handler.error_dic_get') + def test_181_revoke(self, mock_err, mock_idl, mock_post, mock_poll): + """ test revoke """ + mock_err.return_value = {'foo': 'bar', 'serverinternal': 'serverinternal'} + mock_idl.return_value = 'cert_id' + mock_post.return_value = {'urls': {'foo': 'foo'}} + mock_poll.return_value = (200, 'message', 'detail') + self.assertEqual((500, 'serverinternal', 'Revocation operation failed'), self.cahandler.revoke('cert_raw')) + self.assertTrue(mock_err.called) + self.assertTrue(mock_idl.called) + self.assertTrue(mock_post.called) + self.assertFalse(mock_poll.called) + if __name__ == '__main__': if os.path.exists('acme_test.db'):