Skip to content

Commit

Permalink
[tst] unittests
Browse files Browse the repository at this point in the history
  • Loading branch information
grindsa committed Oct 30, 2024
1 parent 428b0ce commit dc88e12
Show file tree
Hide file tree
Showing 3 changed files with 1,284 additions and 78 deletions.
53 changes: 34 additions & 19 deletions examples/ca_handler/entrust_ca_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ def _certificates_get_from_serial(self, cert_serial: str) -> List[str]:
if code == 200 and 'certificates' in content:
cert_list = content['certificates']
else:
self.logger.error('CAhandler._certificates_get_from_serial() for %s failed with code: %s', cert_serial, code)
cert_list = []

self.logger.debug('CAhandler._certificates_get_from_serial() ended with code: %s and %s certificate', code, len(cert_list))
Expand All @@ -184,8 +185,15 @@ def _config_load(self):
if 'CAhandler' in config_dic:
cfg_dic = dict(config_dic['CAhandler'])
self.api_url = cfg_dic.get('api_url', 'https://api.entrust.net/enterprise/v2')
self.request_timeout = int(cfg_dic.get('request_timeout', 10))
self.cert_validity_days = int(cfg_dic.get('cert_validity_days', 365))
try:
self.request_timeout = int(cfg_dic.get('request_timeout', 10))
except Exception as err:
self.logger.error('CAhandler._config_load(): failed to parse request_timeout %s', err)
try:
self.cert_validity_days = int(cfg_dic.get('cert_validity_days', 365))
except Exception as err:
self.logger.error('CAhandler._config_load(): failed to parse cert_validity_days %s', err)

self.username = cfg_dic.get('username', None)
self.password = cfg_dic.get('password', None)
self.organization_name = cfg_dic.get('organization_name', None)
Expand All @@ -198,8 +206,8 @@ def _config_load(self):
except Exception as err:
self.logger.error('CAhandler._config_load(): failed to parse allowed_domainlist: %s', err)
self.allowed_domainlist = 'failed to parse'
# load root CA
self._config_root_load(config_dic)
# load root CA
self._config_root_load(config_dic)

# load profiling
self.eab_profiling, self.eab_handler = config_eab_profile_load(self.logger, config_dic)
Expand Down Expand Up @@ -229,7 +237,6 @@ def _config_passphrase_load(self, config_dic: Dict[str, str]):
def _config_root_load(self, config_dic: Dict[str, str]):
""" load root CA """
self.logger.debug('CAhandler._config_root_load()')

if 'entrust_root_cert' in config_dic['CAhandler']:
if os.path.isfile(config_dic['CAhandler']['entrust_root_cert']):
self.logger.debug('CAhandler._config_root_load(): load root CA from config file')
Expand Down Expand Up @@ -304,13 +311,15 @@ def _organizations_get(self) -> Dict[str, str]:
self.logger.debug('CAhandler._organizations_get()')

code, content = self._api_get(self.api_url + '/organizations')

org_dic = {}
if code == 200 and 'organizations' in content:
self.logger.debug('CAhandler._organizations_get() ended with code: 200')
for org in content['organizations']:
if org['verificationStatus'] == 'APPROVED':
org_dic[org['name']] = org['clientId']
if 'verificationStatus' in org and org['verificationStatus'] == 'APPROVED':
if 'name' in org and 'clientId' in org:
org_dic[org['name']] = org['clientId']
else:
self.logger.error('CAhandler._organizations_get(): malformed response')

self.logger.debug('CAhandler._organizations_get() ended with code: %s', code)
return org_dic
Expand All @@ -326,8 +335,11 @@ def _domains_get(self, org_id: str) -> List[str]:
self.logger.debug('CAhandler._domains_get() ended with code: 200')

for domain in content['domains']:
if domain['verificationStatus'] == 'APPROVED':
api_domain_list.append(domain['domainName'])
if 'verificationStatus' in domain and domain['verificationStatus'] == 'APPROVED':
if 'domainName' in domain:
api_domain_list.append(domain['domainName'])
else:
self.logger.error('CAhandler._domains_get(): malformed response')

self.logger.debug('CAhandler._domains_get() ended with code: %s', code)
return api_domain_list
Expand Down Expand Up @@ -388,16 +400,20 @@ def _trackingid_get(self, cert_raw: str) -> int:
tracking_id = None
# we misuse header_info_get() to get the tracking id from database
pid_list = header_info_get(self.logger, csr=cert_raw, vlist=['poll_identifier'], field_name='cert_raw')
if pid_list and len(pid_list) > 0:
tracking_id = pid_list[0]['poll_identifier']
for ele in pid_list:
if 'poll_identifier' in ele:
tracking_id = ele['poll_identifier']
break

if not tracking_id:
# lookup through Entrust API
self.logger.info('CAhandler._trackingid_get(): tracking_id not found in database. Lookup trough Entrust API')
cert_serial = cert_serial_get(self.logger, cert_raw, hexformat=True)
certificate_list = self._certificates_get_from_serial(cert_serial)
if certificate_list and len(certificate_list) > 0:
tracking_id = certificate_list[0]['trackingId']
for ele in certificate_list:
if 'trackingId' in ele:
tracking_id = ele['trackingId']
break

self.logger.debug('CAhandler._trackingid_get() ended with %s', tracking_id)
return tracking_id
Expand All @@ -421,8 +437,10 @@ def _response_parse(self, content: Dict[str, str]) -> Tuple[str, str]:
cert_bundle += ca_cert + '\n'

# add Entrust Root CA
cert_bundle += self.entrust_root_cert + '\n'

if cert_bundle:
cert_bundle += self.entrust_root_cert + '\n'
else:
cert_bundle = self.entrust_root_cert + '\n'
self.logger.debug('CAhandler._response_parse() ended')
return cert_bundle, cert_raw, poll_indentifier

Expand Down Expand Up @@ -455,9 +473,7 @@ def _enroll(self, csr: str) -> Tuple[str, str]:
code, content = self._api_post(self.api_url + '/certificates', data_dic)

if code == 201:

cert_bundle, cert_raw, poll_indentifier = self._response_parse(content)

else:
if 'errors' in content:
error = f"Error during order creation: {code} - {content['errors']}"
Expand Down Expand Up @@ -507,7 +523,6 @@ def revoke(self, certificate_raw: str, _rev_reason: str = 'unspecified', _rev_da

# get tracking id as input for revocation call
tracking_id = self._trackingid_get(certificate_raw)
# tracking_id = 7347070

if tracking_id:
code, content = self._api_post(self.api_url + f'/certificates/{tracking_id}/revocations', {'crlReason': _rev_reason, 'revocationComment': 'revoked by acme2certifier'})
Expand Down
Loading

0 comments on commit dc88e12

Please sign in to comment.