Skip to content

Commit

Permalink
[fix] exception handling during ca_key and cert_load
Browse files Browse the repository at this point in the history
  • Loading branch information
grindsa committed Jun 30, 2020
1 parent 461f5bb commit 37f0110
Showing 1 changed file with 50 additions and 48 deletions.
98 changes: 50 additions & 48 deletions examples/ca_handler/xca_ca_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __exit__(self, *args):

def _ca_cert_load(self):
""" load ca key from database """
self.logger.debug('CAhandler._ca_cert_load()')
self.logger.debug('CAhandler._ca_cert_load({0})'.format(self.issuing_ca_name))

# query database for key
self._db_open()
Expand All @@ -47,6 +47,7 @@ def _ca_cert_load(self):
try:
db_result = dict_from_row(self.cursor.fetchone())
except BaseException:
self.logger.error('cert lookup failed: {0}'.format(self.cursor.fetchone()))
db_result = {}
self._db_close()

Expand All @@ -58,26 +59,26 @@ def _ca_cert_load(self):
ca_cert = crypto.load_certificate(crypto.FILETYPE_ASN1, b64_decode(self.logger, db_result['cert']))
ca_id = db_result['id']
except BaseException as err_:
self.logger.error('CAhandler._ca_key_load() failed with error: {0}'.format(err_))
self.logger.error('CAhandler._ca_cert_load() failed with error: {0}'.format(err_))
return (ca_cert, ca_id)

def _ca_key_load(self):
""" load ca key from database """
self.logger.debug('CAhandler._ca_key_load()')
self.logger.debug('CAhandler._ca_key_load({0})'.format(self.issuing_ca_name))

# query database for key
self._db_open()
pre_statement = '''SELECT * from view_private WHERE name LIKE ?'''
self.logger.debug('pre_statement: {0}:{1}'.format(pre_statement, self.issuing_ca_name))
self.cursor.execute(pre_statement, [self.issuing_ca_name])
try:
db_result = dict_from_row(self.cursor.fetchone())
except BaseException as err_:
self.logger.error('key lookup failed: {0}'.format(self.cursor.fetchone()))
db_result = {}
self._db_close()

ca_key = None
if 'private' in db_result:
if db_result and 'private' in db_result:
try:
private_key = '-----BEGIN ENCRYPTED PRIVATE KEY-----\n{0}\n-----END ENCRYPTED PRIVATE KEY-----'.format(db_result['private'])
ca_key = crypto.load_privatekey(crypto.FILETYPE_PEM, private_key, convert_string_to_byte(self.passphrase))
Expand Down Expand Up @@ -107,7 +108,7 @@ def _config_check(self):
if self.xdb_file:
if not os.path.exists(self.xdb_file):
error = 'xdb_file {0} does not exist'.format(self.xdb_file)
self.xdb_file = None
self.xdb_file = None
else:
error = 'xdb_file must be specified in config file'

Expand Down Expand Up @@ -401,48 +402,49 @@ def enroll(self, csr):
# load ca cert and key
(ca_key, ca_cert, ca_id) = self._ca_load()

# load request
req = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr)

# copy cn of request
subject = req.get_subject()
# rewrite CN if required
if not subject.CN:
self.logger.debug('rewrite CN to {0}'.format(request_name))
subject.CN = request_name

cert = crypto.X509()
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(self.cert_validity_days * 86400)
cert.set_issuer(ca_cert.get_subject())
cert.set_subject(subject)
cert.set_pubkey(req.get_pubkey())
cert.set_serial_number(uuid.uuid4().int & (1<<63)-1)
cert.set_version(2)
cert.add_extensions(req.get_extensions())
default_extension_list = [
crypto.X509Extension(convert_string_to_byte('subjectKeyIdentifier'), False, convert_string_to_byte('hash'), subject=cert),
crypto.X509Extension(convert_string_to_byte('keyUsage'), True, convert_string_to_byte('digitalSignature,keyEncipherment')),
crypto.X509Extension(convert_string_to_byte('authorityKeyIdentifier'), False, convert_string_to_byte('keyid:always'), issuer=ca_cert),
crypto.X509Extension(convert_string_to_byte('basicConstraints'), True, convert_string_to_byte('CA:FALSE')),
crypto.X509Extension(convert_string_to_byte('extendedKeyUsage'), False, convert_string_to_byte('clientAuth,serverAuth')),
]

# add default extensions
cert.add_extensions(default_extension_list)
# sign csr
cert.sign(ca_key, 'sha256')
serial = cert.get_serial_number()

# get hsshes
issuer_hash = ca_cert.subject_name_hash() & 0x7fffffff
name_hash = cert.subject_name_hash() & 0x7fffffff

# store certificate
self._store_cert(ca_id, request_name, '{:X}'.format(serial), convert_byte_to_string(b64_encode(self.logger, crypto.dump_certificate(crypto.FILETYPE_ASN1, cert))), name_hash, issuer_hash)

cert_bundle = self._pemcertchain_generate(convert_byte_to_string(crypto.dump_certificate(crypto.FILETYPE_PEM, cert)), convert_byte_to_string(crypto.dump_certificate(crypto.FILETYPE_PEM, ca_cert)))
cert_raw = convert_byte_to_string(b64_encode(self.logger, crypto.dump_certificate(crypto.FILETYPE_ASN1, cert)))
if ca_key and ca_cert:
# load request
req = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr)

# copy cn of request
subject = req.get_subject()
# rewrite CN if required
if not subject.CN:
self.logger.debug('rewrite CN to {0}'.format(request_name))
subject.CN = request_name

cert = crypto.X509()
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(self.cert_validity_days * 86400)
cert.set_issuer(ca_cert.get_subject())
cert.set_subject(subject)
cert.set_pubkey(req.get_pubkey())
cert.set_serial_number(uuid.uuid4().int & (1<<63)-1)
cert.set_version(2)
cert.add_extensions(req.get_extensions())
default_extension_list = [
crypto.X509Extension(convert_string_to_byte('subjectKeyIdentifier'), False, convert_string_to_byte('hash'), subject=cert),
crypto.X509Extension(convert_string_to_byte('keyUsage'), True, convert_string_to_byte('digitalSignature,keyEncipherment')),
crypto.X509Extension(convert_string_to_byte('authorityKeyIdentifier'), False, convert_string_to_byte('keyid:always'), issuer=ca_cert),
crypto.X509Extension(convert_string_to_byte('basicConstraints'), True, convert_string_to_byte('CA:FALSE')),
crypto.X509Extension(convert_string_to_byte('extendedKeyUsage'), False, convert_string_to_byte('clientAuth,serverAuth')),
]

# add default extensions
cert.add_extensions(default_extension_list)
# sign csr
cert.sign(ca_key, 'sha256')
serial = cert.get_serial_number()

# get hsshes
issuer_hash = ca_cert.subject_name_hash() & 0x7fffffff
name_hash = cert.subject_name_hash() & 0x7fffffff

# store certificate
self._store_cert(ca_id, request_name, '{:X}'.format(serial), convert_byte_to_string(b64_encode(self.logger, crypto.dump_certificate(crypto.FILETYPE_ASN1, cert))), name_hash, issuer_hash)

cert_bundle = self._pemcertchain_generate(convert_byte_to_string(crypto.dump_certificate(crypto.FILETYPE_PEM, cert)), convert_byte_to_string(crypto.dump_certificate(crypto.FILETYPE_PEM, ca_cert)))
cert_raw = convert_byte_to_string(b64_encode(self.logger, crypto.dump_certificate(crypto.FILETYPE_ASN1, cert)))

self.logger.debug('Certificate.enroll() ended')
return(error, cert_bundle, cert_raw, None)
Expand Down

0 comments on commit 37f0110

Please sign in to comment.