Skip to content

Commit

Permalink
[tst] unittests
Browse files Browse the repository at this point in the history
  • Loading branch information
grindsa committed Nov 28, 2024
1 parent 1e86f54 commit 0275801
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 174 deletions.
14 changes: 7 additions & 7 deletions acme_srv/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -1862,17 +1862,17 @@ def eab_profile_string_check(logger, cahandler, key, value):

logger.debug('Helper.eab_profile_string_check() ended')

def request_operation(logger: logging.Logger, headers: Dict[str, str], proxy: Dict[str, str] = {}, timeout: int = 20, url: str = None, method: str = 'GET', payload: Dict[str, str] = None):
def request_operation(logger: logging.Logger, headers: Dict[str, str] = {}, proxy: Dict[str, str] = {}, timeout: int = 20, url: str = None, session=requests, method: str = 'GET', payload: Dict[str, str] = None):
""" check if a for a string value taken from profile if its a variable inside a class and apply value """
logger.debug('Helper.api_operation(): method: %s', method)

try:
if method.lower() == 'get':
api_response = requests.get(url=url, headers=headers, proxies=proxy, timeout=timeout)
api_response = session.get(url=url, headers=headers, proxies=proxy, timeout=timeout)
elif method.lower() == 'post':
api_response = requests.post(url=url, headers=headers, proxies=proxy, timeout=timeout, json=payload)
api_response = session.post(url=url, headers=headers, proxies=proxy, timeout=timeout, json=payload)
elif method.lower() == 'put':
api_response = requests.put(url=url, headers=headers, proxies=proxy, timeout=timeout, json=payload)
api_response = session.put(url=url, headers=headers, proxies=proxy, timeout=timeout, json=payload)
else:
logger.error('unknown request method: %s', method)
api_response = None
Expand All @@ -1882,17 +1882,17 @@ def request_operation(logger: logging.Logger, headers: Dict[str, str], proxy: Di
try:
content = api_response.json()
except Exception as err_:
logger.error('CAhandler._api_get() returned error during json parsing: %s', err_)
logger.error('request_operation returned error during json parsing: %s', err_)
content = str(err_)
else:
content = None

except Exception as err_:
logger.error('CAhandler._api_get() returned error: %s', err_)
logger.error('request_operation returned error: %s', err_)
code = 500
content = str(err_)

logger.debug('Helper.api_operation() ended with: %s', code)
logger.debug('Helper.request_operation() ended with: %s', code)
return code, content


Expand Down
6 changes: 3 additions & 3 deletions examples/ca_handler/entrust_ca_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def _api_get(self, url: str) -> Tuple[int, Dict[str, str]]:
'Content-Type': CONTENT_TYPE
}

code, content = request_operation(self.logger, method='get', url=url, headers=headers, proxy=self.proxy, timeout=self.request_timeout, payload=None)
code, content = request_operation(self.logger, session=self.session, method='get', url=url, headers=headers, proxy=self.proxy, timeout=self.request_timeout, payload=None)
self.logger.debug('CAhandler._api_get() ended with code: %s', code)
return code, content

Expand All @@ -107,7 +107,7 @@ def _api_post(self, url: str, data: Dict[str, str]) -> Tuple[int, Dict[str, str]
headers = {
'Content-Type': CONTENT_TYPE
}
code, content = request_operation(self.logger, method='post', url=url, headers=headers, proxy=self.proxy, timeout=self.request_timeout, payload=data)
code, content = request_operation(self.logger, session=self.session, method='post', url=url, headers=headers, proxy=self.proxy, timeout=self.request_timeout, payload=data)
self.logger.debug('CAhandler._api_post() ended with code: %s', code)
return code, content

Expand All @@ -117,7 +117,7 @@ def _api_put(self, url: str, data: Dict[str, str]) -> Tuple[int, Dict[str, str]]
headers = {
'Content-Type': CONTENT_TYPE
}
code, content = request_operation(self.logger, method='put', url=url, headers=headers, proxy=self.proxy, timeout=self.request_timeout, payload=data)
code, content = request_operation(self.logger, session=self.session, method='put', url=url, headers=headers, proxy=self.proxy, timeout=self.request_timeout, payload=data)

self.logger.debug('CAhandler._api_put() ended with code: %s', code)
return code, content
Expand Down
65 changes: 19 additions & 46 deletions test/test_digicert.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def test_010__api_post(self, mock_req):
mock_req.return_value = mockresponse
with self.assertLogs('test_a2c', level='INFO') as lcm:
self.assertEqual(('status_code', "'str' object is not callable"), self.cahandler._api_post('url', 'data'))
self.assertIn("ERROR:test_a2c:CAhandler._api_post() returned error during json parsing: 'str' object is not callable", lcm.output)
self.assertIn("ERROR:test_a2c:request_operation returned error during json parsing: 'str' object is not callable", lcm.output)

@patch('requests.post')
def test_011__api_post(self, mock_req):
Expand All @@ -121,7 +121,7 @@ def test_012__api_post(self, mock_req):
mock_req.side_effect = Exception('exc_api_post')
with self.assertLogs('test_a2c', level='INFO') as lcm:
self.assertEqual((500, 'exc_api_post'), self.cahandler._api_post('url', 'data'))
self.assertIn('ERROR:test_a2c:CAhandler._api_post() returned error: exc_api_post', lcm.output)
self.assertIn('ERROR:test_a2c:request_operation returned error: exc_api_post', lcm.output)

@patch.object(requests, 'get')
def test_013__api_get(self, mock_req):
Expand All @@ -141,7 +141,7 @@ def test_014__api_get(self, mock_req):
mock_req.return_value = mockresponse
with self.assertLogs('test_a2c', level='INFO') as lcm:
self.assertEqual(('status_code', "'str' object is not callable"), self.cahandler._api_get('url'))
self.assertIn("ERROR:test_a2c:CAhandler._api_get() returned error during json parsing: 'str' object is not callable", lcm.output)
self.assertIn("ERROR:test_a2c:request_operation returned error during json parsing: 'str' object is not callable", lcm.output)

@patch('requests.get')
def test_015__api_get(self, mock_req):
Expand All @@ -151,7 +151,7 @@ def test_015__api_get(self, mock_req):
mock_req.side_effect = Exception('exc_api_get')
with self.assertLogs('test_a2c', level='INFO') as lcm:
self.assertEqual((500, 'exc_api_get'), self.cahandler._api_get('url'))
self.assertIn('ERROR:test_a2c:CAhandler._api_get() returned error: exc_api_get', lcm.output)
self.assertIn('ERROR:test_a2c:request_operation returned error: exc_api_get', lcm.output)

@patch.object(requests, 'put')
def test_016__api_put(self, mock_req):
Expand All @@ -171,7 +171,7 @@ def test_017__api_put(self, mock_req):
mock_req.return_value = mockresponse
with self.assertLogs('test_a2c', level='INFO') as lcm:
self.assertEqual(('status_code', "'str' object is not callable"), self.cahandler._api_put('url', 'data'))
self.assertIn("ERROR:test_a2c:CAhandler._api_put() returned error during json parsing: 'str' object is not callable", lcm.output)
self.assertIn("ERROR:test_a2c:request_operation returned error during json parsing: 'str' object is not callable", lcm.output)

@patch('requests.put')
def test_018__api_put(self, mock_req):
Expand All @@ -190,7 +190,7 @@ def test_019__api_put(self, mock_req):
mock_req.side_effect = Exception('exc_api_put')
with self.assertLogs('test_a2c', level='INFO') as lcm:
self.assertEqual((500, 'exc_api_put'), self.cahandler._api_put('url', 'data'))
self.assertIn('ERROR:test_a2c:CAhandler._api_put() returned error: exc_api_put', lcm.output)
self.assertIn('ERROR:test_a2c:request_operation returned error: exc_api_put', lcm.output)

def test_020__config_check(self):
""" test _config_check() """
Expand Down Expand Up @@ -653,7 +653,7 @@ def test_051_csr_check(self, mock_dlchk, mock_ehichk):

@patch('examples.ca_handler.digicert_ca_handler.CAhandler._order_response_parse')
@patch('examples.ca_handler.digicert_ca_handler.CAhandler._order_send')
@patch('examples.ca_handler.digicert_ca_handler.CAhandler._csr_cn_lookup')
@patch('examples.ca_handler.digicert_ca_handler.csr_cn_lookup')
@patch('examples.ca_handler.digicert_ca_handler.CAhandler._csr_check')
@patch('examples.ca_handler.digicert_ca_handler.CAhandler._config_check')
def test_052_enroll(self, mock_cfgchk, mock_csrchk, mock_cnget, mock_ordersend, mock_orderparse):
Expand All @@ -672,7 +672,7 @@ def test_052_enroll(self, mock_cfgchk, mock_csrchk, mock_cnget, mock_ordersend,

@patch('examples.ca_handler.digicert_ca_handler.CAhandler._order_response_parse')
@patch('examples.ca_handler.digicert_ca_handler.CAhandler._order_send')
@patch('examples.ca_handler.digicert_ca_handler.CAhandler._csr_cn_lookup')
@patch('examples.ca_handler.digicert_ca_handler.csr_cn_lookup')
@patch('examples.ca_handler.digicert_ca_handler.CAhandler._csr_check')
@patch('examples.ca_handler.digicert_ca_handler.CAhandler._config_check')
def test_053_enroll(self, mock_cfgchk, mock_csrchk, mock_cnget, mock_ordersend, mock_orderparse):
Expand All @@ -691,7 +691,7 @@ def test_053_enroll(self, mock_cfgchk, mock_csrchk, mock_cnget, mock_ordersend,

@patch('examples.ca_handler.digicert_ca_handler.CAhandler._order_response_parse')
@patch('examples.ca_handler.digicert_ca_handler.CAhandler._order_send')
@patch('examples.ca_handler.digicert_ca_handler.CAhandler._csr_cn_lookup')
@patch('examples.ca_handler.digicert_ca_handler.csr_cn_lookup')
@patch('examples.ca_handler.digicert_ca_handler.CAhandler._csr_check')
@patch('examples.ca_handler.digicert_ca_handler.CAhandler._config_check')
def test_054_enroll(self, mock_cfgchk, mock_csrchk, mock_cnget, mock_ordersend, mock_orderparse):
Expand All @@ -710,7 +710,7 @@ def test_054_enroll(self, mock_cfgchk, mock_csrchk, mock_cnget, mock_ordersend,

@patch('examples.ca_handler.digicert_ca_handler.CAhandler._order_response_parse')
@patch('examples.ca_handler.digicert_ca_handler.CAhandler._order_send')
@patch('examples.ca_handler.digicert_ca_handler.CAhandler._csr_cn_lookup')
@patch('examples.ca_handler.digicert_ca_handler.csr_cn_lookup')
@patch('examples.ca_handler.digicert_ca_handler.CAhandler._csr_check')
@patch('examples.ca_handler.digicert_ca_handler.CAhandler._config_check')
def test_055_enroll(self, mock_cfgchk, mock_csrchk, mock_cnget, mock_ordersend, mock_orderparse):
Expand All @@ -729,7 +729,7 @@ def test_055_enroll(self, mock_cfgchk, mock_csrchk, mock_cnget, mock_ordersend,

@patch('examples.ca_handler.digicert_ca_handler.CAhandler._order_response_parse')
@patch('examples.ca_handler.digicert_ca_handler.CAhandler._order_send')
@patch('examples.ca_handler.digicert_ca_handler.CAhandler._csr_cn_lookup')
@patch('examples.ca_handler.digicert_ca_handler.csr_cn_lookup')
@patch('examples.ca_handler.digicert_ca_handler.CAhandler._csr_check')
@patch('examples.ca_handler.digicert_ca_handler.CAhandler._config_check')
def test_056_enroll(self, mock_cfgchk, mock_csrchk, mock_cnget, mock_ordersend, mock_orderparse):
Expand Down Expand Up @@ -762,41 +762,14 @@ def test_058_revoke(self, mock_serial, mock_put):
mock_put.return_value = ('code', 'content')
self.assertEqual((500, None, 'Failed to parse certificate serial'), self.cahandler.revoke('cert'))

@patch('examples.ca_handler.digicert_ca_handler.csr_san_get')
@patch('examples.ca_handler.digicert_ca_handler.csr_cn_get')
def test_059__csr_cn_lookup(self, mock_cnget, mock_san_get):
""" test _csr_cn_lookup() """
mock_cnget.return_value = 'cn'
mock_san_get.return_value = ['foo:san1', 'foo:san2']
self.assertEqual('cn', self.cahandler._csr_cn_lookup('csr'))

@patch('examples.ca_handler.digicert_ca_handler.csr_san_get')
@patch('examples.ca_handler.digicert_ca_handler.csr_cn_get')
def test_060__csr_cn_lookup(self, mock_cnget, mock_san_get):
""" test _csr_cn_lookup() """
mock_cnget.return_value = None
mock_san_get.return_value = ['foo:san1', 'foo:san2']
self.assertEqual('san1', self.cahandler._csr_cn_lookup('csr'))

@patch('examples.ca_handler.digicert_ca_handler.csr_san_get')
@patch('examples.ca_handler.digicert_ca_handler.csr_cn_get')
def test_061__csr_cn_lookup(self, mock_cnget, mock_san_get):
""" test _csr_cn_lookup() """
mock_cnget.return_value = None
mock_san_get.return_value = ['foosan1', 'foo:san2']
with self.assertLogs('test_a2c', level='INFO') as lcm:
self.assertEqual('san2', self.cahandler._csr_cn_lookup('csr'))
self.assertIn('ERROR:test_a2c:CAhandler._csr_cn_lookup() split failed: list index out of range', lcm.output)

@patch('examples.ca_handler.digicert_ca_handler.csr_san_get')
@patch('examples.ca_handler.digicert_ca_handler.csr_cn_get')
def test_062__csr_cn_lookup(self, mock_cnget, mock_san_get):
""" test _csr_cn_lookup() """
mock_cnget.return_value = None
mock_san_get.return_value = None
with self.assertLogs('test_a2c', level='INFO') as lcm:
self.assertFalse(self.cahandler._csr_cn_lookup('csr'))
self.assertIn('ERROR:test_a2c:CAhandler._csr_cn_lookup() no SANs found in CSR', lcm.output)
@patch('examples.ca_handler.digicert_ca_handler.CAhandler._api_put')
@patch('examples.ca_handler.digicert_ca_handler.cert_serial_get')
def test_059_revoke(self, mock_serial, mock_put):
""" test revoke() """
mock_serial.return_value = 'serial'
mock_put.return_value = (204, 'content')
self.assertEqual((200, None, 'content'), self.cahandler.revoke('cert'))


if __name__ == '__main__':

Expand Down
Loading

0 comments on commit 0275801

Please sign in to comment.