Skip to content

Commit

Permalink
[feat] #114 - adjust cert validity
Browse files Browse the repository at this point in the history
  • Loading branch information
grindsa committed Dec 14, 2023
1 parent e10e81c commit a4c3571
Show file tree
Hide file tree
Showing 2 changed files with 265 additions and 41 deletions.
78 changes: 76 additions & 2 deletions examples/ca_handler/openssl_ca_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(self, debug: bool = False, logger: object = None):
}
self.ca_cert_chain_list = []
self.cert_validity_days = 365
self.cert_validity_adjust = False
self.openssl_conf = None
self.cert_save_path = None
self.save_cert_as_hex = False
Expand Down Expand Up @@ -334,6 +335,10 @@ def _config_policy_load(self, config_dic: Dict[str, str]):
self.cn_enforce = config_dic.getboolean('CAhandler', 'cn_enforce', fallback=False)
except Exception:
self.logger.error('CAhandler._config_load() variable cn_enforce cannot be parsed')
try:
self.cert_validity_adjust = config_dic.getboolean('CAhandler', 'cert_validity_adjust', fallback=False)
except Exception:
self.logger.error('CAhandler._config_load() variable cert_validity_adjust cannot be parsed')

self.logger.debug('CAhandler._config_policy_load() ended')

Expand Down Expand Up @@ -508,15 +513,84 @@ def _string_wlbl_check(self, entry: str, white_list: List[str], black_list: List
self.logger.debug('CAhandler._string_wlbl_check({0}) ended with: {1}'.format(entry, chk_result))
return chk_result

def _cert_expiry_get(self, cert):
""" get expiry date of certificate """
self.logger.debug('CAhandler._cert_expiry_get()')

# cert.not_valid_after - datetime.datetime.utcnow()
expiry_date = cert.not_valid_after

self.logger.debug('CAhandler._cert_expiry_get() ended')
return expiry_date

def _cacert_expiry_get(self):
""" get closesd expiry date of issuing CA """
self.logger.debug('CAhandler._cacert_expiry_get()')

ca_list = self.ca_cert_chain_list
if self.issuer_dict['issuing_ca_cert']:
ca_list.append(self.issuer_dict['issuing_ca_cert'])

# ca_cert.not_valid_after - datetime.datetime.utcnow()
expiry_days = 0
cert = None

for ca_cert in ca_list:
if ca_cert:
if os.path.exists(ca_cert):
with open(ca_cert, 'rb') as fso:
ca_cert = x509.load_pem_x509_certificate(fso.read(), backend=default_backend())
_tmp_expiry_days = (self._cert_expiry_get(ca_cert) - datetime.datetime.utcnow()).days
if not expiry_days or _tmp_expiry_days < expiry_days:
self.logger.debug('CAhandler._cacert_expiry_get(): set expiry_days to {0}'.format(_tmp_expiry_days))
expiry_days = _tmp_expiry_days
cert = ca_cert
else:
self.logger.error('CAhandler._cacert_expiry_get(): file {0} does not exist'.format(ca_cert))

self.logger.debug('CAhandler._cacert_expiry_get() ended')
return expiry_days, cert

def _certexpiry_date_default(self) -> datetime.datetime:
""" set certificate validity """
self.logger.debug('CAhandler._certexpiry_date_default()')

# default cert validity is taken from config
cert_validity = datetime.datetime.utcnow() + datetime.timedelta(days=self.cert_validity_days)

self.logger.debug('CAhandler._certexpiry_date_default() ended')
return cert_validity

def _certexpiry_date_set(self) -> datetime.datetime:
""" set certificate validity """
self.logger.debug('CAhandler._certexpiry_date_set()')

# default cert validity is taken from config
cert_validity = self._certexpiry_date_default()

if self.cert_validity_adjust:
# adjust validity to match the validity of the issuing CA

(ca_cert_validity, cert) = self._cacert_expiry_get()
if ca_cert_validity < self.cert_validity_days:
self.logger.info('CAhandler._certexpiry_date_set(): adjust validity to {0} days.'.format(ca_cert_validity))
cert_validity = cert.not_valid_after

self.logger.debug('CAhandler._certexpiry_date_set() ended')
return cert_validity

def _cert_signing_prep(self, ca_cert: object, req: object, subject: str) -> object:
""" enroll certificate """
# pylint: disable=R0914, R0915
self.logger.debug('CAhandler._cert_signing_prep()')

cert_validity = self._certexpiry_date_set()

# sign csr
builder = x509.CertificateBuilder()
builder = builder.not_valid_before(datetime.datetime.utcnow())
builder = builder.not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=self.cert_validity_days))
builder = builder.not_valid_after(cert_validity)
# builder = builder.not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=self.cert_validity_days))
builder = builder.issuer_name(ca_cert.subject)
builder = builder.subject_name(subject)
builder = builder.serial_number(uuid.uuid4().int)
Expand Down Expand Up @@ -643,7 +717,7 @@ def _crlobject_build(self, ca_cert: object, serial: int) -> Tuple[x509.Certifica
builder = builder.issuer_name(crl.issuer)
# add crl certificates from file to the new crl object
for revserial in crl:
builder = builder.add_revoked_certificate(revserial)
builder = builder.add_revoked_certificate(revserial) # pragma: no cover

# see if the cert to be revokek already in the list
ret = crl.get_revoked_certificate_by_serial_number(serial)
Expand Down
Loading

0 comments on commit a4c3571

Please sign in to comment.