Skip to content

Commit

Permalink
[fix] refactor api-calls
Browse files Browse the repository at this point in the history
  • Loading branch information
grindsa committed Nov 27, 2024
1 parent eb5c827 commit 1e86f54
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 138 deletions.
55 changes: 55 additions & 0 deletions acme_srv/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -1861,3 +1861,58 @@ def eab_profile_string_check(logger, cahandler, key, value):
logger.error('Helper.eab_profile_string_check(): ignore string attribute: key: %s value: %s', key, value)

logger.debug('Helper.eab_profile_string_check() ended')

def request_operation(logger: logging.Logger, headers: Dict[str, str], proxy: Dict[str, str] = {}, timeout: int = 20, url: str = None, method: str = 'GET', payload: Dict[str, str] = None):
""" check if a for a string value taken from profile if its a variable inside a class and apply value """
logger.debug('Helper.api_operation(): method: %s', method)

try:
if method.lower() == 'get':
api_response = requests.get(url=url, headers=headers, proxies=proxy, timeout=timeout)
elif method.lower() == 'post':
api_response = requests.post(url=url, headers=headers, proxies=proxy, timeout=timeout, json=payload)
elif method.lower() == 'put':
api_response = requests.put(url=url, headers=headers, proxies=proxy, timeout=timeout, json=payload)
else:
logger.error('unknown request method: %s', method)
api_response = None

code = api_response.status_code
if api_response.text:
try:
content = api_response.json()
except Exception as err_:
logger.error('CAhandler._api_get() returned error during json parsing: %s', err_)
content = str(err_)
else:
content = None

except Exception as err_:
logger.error('CAhandler._api_get() returned error: %s', err_)
code = 500
content = str(err_)

logger.debug('Helper.api_operation() ended with: %s', code)
return code, content


def csr_cn_lookup(logger: logging.Logger, csr: str) -> str:
""" lookup CN/ 1st san from CSR """
logger.debug('CAhandler._csr_cn_lookup()')

csr_cn = csr_cn_get(logger, csr)
if not csr_cn:
# lookup first san
san_list = csr_san_get(logger, csr)
if san_list and len(san_list) > 0:
for san in san_list:
try:
csr_cn = san.split(':')[1]
break
except Exception as err:
logger.error('CAhandler._csr_cn_lookup() split failed: %s', err)
else:
logger.error('CAhandler._csr_cn_lookup() no SANs found in CSR')

logger.debug('CAhandler._csr_cn_lookup() ended with: %s', csr_cn)
return csr_cn
76 changes: 8 additions & 68 deletions examples/ca_handler/digicert_ca_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import json
import requests
# pylint: disable=e0401
from acme_srv.helper import load_config, csr_cn_get, cert_pem2der, b64_encode, allowed_domainlist_check, eab_profile_header_info_check, uts_now, uts_to_date_utc, cert_serial_get, config_eab_profile_load, config_headerinfo_load, csr_san_get
from acme_srv.helper import load_config, cert_pem2der, b64_encode, allowed_domainlist_check, eab_profile_header_info_check, uts_now, uts_to_date_utc, cert_serial_get, config_eab_profile_load, config_headerinfo_load, request_operation, csr_cn_lookup


CONTENT_TYPE = 'application/json'
Expand Down Expand Up @@ -61,20 +61,9 @@ def _api_get(self, url: str) -> Tuple[int, Dict[str, str]]:
'X-DC-DEVKEY': self.api_key,
'Content-Type': CONTENT_TYPE
}
code, content = request_operation(self.logger, method='get', url=url, headers=headers, proxy=self.proxy, timeout=self.request_timeout, payload=None)

try:
api_response = requests.get(url=url, headers=headers, proxies=self.proxy, timeout=self.request_timeout)
code = api_response.status_code
try:
content = api_response.json()
except Exception as err_:
self.logger.error('CAhandler._api_get() returned error during json parsing: %s', err_)
content = str(err_)
except Exception as err_:
self.logger.error('CAhandler._api_get() returned error: %s', err_)
code = 500
content = str(err_)

self.logger.debug('CAhandler._api_get() ended with code: %s', code)
return code, content

def _api_post(self, url: str, data: Dict[str, str]) -> Tuple[int, Dict[str, str]]:
Expand All @@ -84,23 +73,9 @@ def _api_post(self, url: str, data: Dict[str, str]) -> Tuple[int, Dict[str, str]
'X-DC-DEVKEY': self.api_key,
'Content-Type': CONTENT_TYPE
}
code, content = request_operation(self.logger, method='post', url=url, headers=headers, proxy=self.proxy, timeout=self.request_timeout, payload=data)

try:
api_response = requests.post(url=url, headers=headers, json=data, proxies=self.proxy, timeout=self.request_timeout)
code = api_response.status_code
if api_response.text:
try:
content = api_response.json()
except Exception as err_:
self.logger.error('CAhandler._api_post() returned error during json parsing: %s', err_)
content = str(err_)
else:
content = None
except Exception as err_:
self.logger.error('CAhandler._api_post() returned error: %s', err_)
code = 500
content = str(err_)

self.logger.debug('CAhandler._api_post() ended with code: %s', code)
return code, content

def _api_put(self, url: str, data: Dict[str, str]) -> Tuple[int, Dict[str, str]]:
Expand All @@ -110,23 +85,9 @@ def _api_put(self, url: str, data: Dict[str, str]) -> Tuple[int, Dict[str, str]]
'X-DC-DEVKEY': self.api_key,
'Content-Type': CONTENT_TYPE
}
code, content = request_operation(self.logger, method='put', url=url, headers=headers, proxy=self.proxy, timeout=self.request_timeout, payload=data)

try:
api_response = requests.put(url=url, headers=headers, json=data, proxies=self.proxy, timeout=self.request_timeout)
code = api_response.status_code
if api_response.text:
try:
content = api_response.json()
except Exception as err_:
self.logger.error('CAhandler._api_put() returned error during json parsing: %s', err_)
content = str(err_)
else:
content = None
except Exception as err_:
self.logger.error('CAhandler._api_put() returned error: %s', err_)
code = 500
content = str(err_)

self.logger.debug('CAhandler._api_put() ended with code: %s', code)
return code, content

def _config_check(self) -> str:
Expand Down Expand Up @@ -273,27 +234,6 @@ def _csr_check(self, csr: str) -> str:
self.logger.debug('CAhandler._csr_check() ended with: %s', error)
return error

def _csr_cn_lookup(self, csr: str) -> str:
""" lookup CN/ 1st san from CSR """
self.logger.debug('CAhandler._csr_cn_lookup()')

csr_cn = csr_cn_get(self.logger, csr)
if not csr_cn:
# lookup first san
san_list = csr_san_get(self.logger, csr)
if san_list and len(san_list) > 0:
for san in san_list:
try:
csr_cn = san.split(':')[1]
break
except Exception as err:
self.logger.error('CAhandler._csr_cn_lookup() split failed: %s', err)
else:
self.logger.error('CAhandler._csr_cn_lookup() no SANs found in CSR')

self.logger.debug('CAhandler._csr_cn_lookup() ended with: %s', csr_cn)
return csr_cn

def enroll(self, csr: str) -> Tuple[str, str, str, str]:
""" enroll certificate """
self.logger.debug('CAhandler.enroll()')
Expand All @@ -311,7 +251,7 @@ def enroll(self, csr: str) -> Tuple[str, str, str, str]:
error = self._csr_check(csr)

if not error:
csr_cn = self._csr_cn_lookup(csr)
csr_cn = csr_cn_lookup(self.logger, csr)
code, content = self._order_send(csr, csr_cn)

if code in (200, 201):
Expand Down
77 changes: 8 additions & 69 deletions examples/ca_handler/entrust_ca_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import requests
from requests_pkcs12 import Pkcs12Adapter
# pylint: disable=e0401
from acme_srv.helper import load_config, csr_cn_get, cert_pem2der, b64_encode, allowed_domainlist_check, eab_profile_header_info_check, uts_now, uts_to_date_utc, cert_serial_get, config_eab_profile_load, config_headerinfo_load, csr_san_get, header_info_get, b64_url_recode
from acme_srv.helper import load_config, cert_pem2der, b64_encode, allowed_domainlist_check, eab_profile_header_info_check, uts_now, uts_to_date_utc, cert_serial_get, config_eab_profile_load, config_headerinfo_load, header_info_get, b64_url_recode, request_operation, csr_cn_lookup


CONTENT_TYPE = 'application/json'
Expand Down Expand Up @@ -97,19 +97,8 @@ def _api_get(self, url: str) -> Tuple[int, Dict[str, str]]:
'Content-Type': CONTENT_TYPE
}

try:
api_response = self.session.get(url=url, headers=headers, proxies=self.proxy, timeout=self.request_timeout)
code = api_response.status_code
try:
content = api_response.json()
except Exception as err_:
self.logger.error('CAhandler._api_get() returned error during json parsing: %s', err_)
content = str(err_)
except Exception as err_:
self.logger.error('CAhandler._api_get() returned error: %s', err_)
code = 500
content = str(err_)

code, content = request_operation(self.logger, method='get', url=url, headers=headers, proxy=self.proxy, timeout=self.request_timeout, payload=None)
self.logger.debug('CAhandler._api_get() ended with code: %s', code)
return code, content

def _api_post(self, url: str, data: Dict[str, str]) -> Tuple[int, Dict[str, str]]:
Expand All @@ -118,23 +107,8 @@ def _api_post(self, url: str, data: Dict[str, str]) -> Tuple[int, Dict[str, str]
headers = {
'Content-Type': CONTENT_TYPE
}

try:
api_response = self.session.post(url=url, headers=headers, json=data, proxies=self.proxy, timeout=self.request_timeout)
code = api_response.status_code
if api_response.text:
try:
content = api_response.json()
except Exception as err_:
self.logger.error('CAhandler._api_post() returned error during json parsing: %s', err_)
content = str(err_)
else:
content = None
except Exception as err_:
self.logger.error('CAhandler._api_post() returned error: %s', err_)
code = 500
content = str(err_)

code, content = request_operation(self.logger, method='post', url=url, headers=headers, proxy=self.proxy, timeout=self.request_timeout, payload=data)
self.logger.debug('CAhandler._api_post() ended with code: %s', code)
return code, content

def _api_put(self, url: str, data: Dict[str, str]) -> Tuple[int, Dict[str, str]]:
Expand All @@ -143,23 +117,9 @@ def _api_put(self, url: str, data: Dict[str, str]) -> Tuple[int, Dict[str, str]]
headers = {
'Content-Type': CONTENT_TYPE
}
code, content = request_operation(self.logger, method='put', url=url, headers=headers, proxy=self.proxy, timeout=self.request_timeout, payload=data)

try:
api_response = self.session.put(url=url, headers=headers, json=data, proxies=self.proxy, timeout=self.request_timeout)
code = api_response.status_code
if api_response.text:
try:
content = api_response.json()
except Exception as err_:
self.logger.error('CAhandler._api_put() returned error during json parsing: %s', err_)
content = str(err_)
else:
content = None
except Exception as err_:
self.logger.error('CAhandler._api_put() returned error: %s', err_)
code = 500
content = str(err_)

self.logger.debug('CAhandler._api_put() ended with code: %s', code)
return code, content

def _certificates_get_from_serial(self, cert_serial: str) -> List[str]:
Expand Down Expand Up @@ -273,27 +233,6 @@ def _config_session_load(self, config_dic: Dict[str, str]):

self.logger.debug('CAhandler._config_session_load() ended')

def _csr_cn_lookup(self, csr: str) -> str:
""" lookup CN/ 1st san from CSR """
self.logger.debug('CAhandler._csr_cn_lookup()')

csr_cn = csr_cn_get(self.logger, csr)
if not csr_cn:
# lookup first san
san_list = csr_san_get(self.logger, csr)
if san_list and len(san_list) > 0:
for san in san_list:
try:
csr_cn = san.split(':')[1]
break
except Exception as err:
self.logger.error('CAhandler._csr_cn_lookup() split failed: %s', err)
else:
self.logger.error('CAhandler._csr_cn_lookup() no SANs found in CSR')

self.logger.debug('CAhandler._csr_cn_lookup() ended with: %s', csr_cn)
return csr_cn

def _org_domain_cfg_check(self) -> str:
""" check organizations """
self.logger.debug('CAhandler._organizations_check()')
Expand Down Expand Up @@ -462,7 +401,7 @@ def _enroll(self, csr: str) -> Tuple[str, str]:
poll_indentifier = None

# get CN and SANs
cn = self._csr_cn_lookup(csr)
cn = csr_cn_lookup(self.logger, csr)

# calculate cert expiry date
certexpirydate = datetime.datetime.now() + datetime.timedelta(days=self.cert_validity_days)
Expand Down
Loading

0 comments on commit 1e86f54

Please sign in to comment.