Skip to content

Commit

Permalink
store runtime secrets in cache
Browse files Browse the repository at this point in the history
  • Loading branch information
erikvw committed Mar 20, 2024
1 parent 77609ae commit 530c76a
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 36 deletions.
44 changes: 22 additions & 22 deletions django_crypto_fields/field_cryptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

from Cryptodome.Cipher import AES as AES_CIPHER
from django.apps import apps as django_apps
from django.core.cache import cache
from django.core.exceptions import ObjectDoesNotExist

from .cipher import Cipher, CipherParser
from .cipher import Cipher
from .constants import AES, CIPHER_PREFIX, ENCODING, HASH_PREFIX, PRIVATE, RSA, SALT
from .cryptor import Cryptor
from .exceptions import EncryptionError, EncryptionKeyError, InvalidEncryptionAlgorithm
Expand Down Expand Up @@ -39,7 +40,7 @@ def __init__(self, algorithm: str, access_mode: str):
self.algorithm = algorithm
self.access_mode = access_mode
self.aes_encryption_mode = AES_CIPHER.MODE_CBC
self.cipher_buffer_key = f"{self.algorithm}_{self.access_mode}"
self.cipher_buffer_key = b"{self.algorithm}_{self.access_mode}"
self.cipher_buffer = {self.cipher_buffer_key: {}}
self.keys = encryption_keys
self.cryptor = self.cryptor_cls(algorithm=algorithm, access_mode=access_mode)
Expand Down Expand Up @@ -123,13 +124,10 @@ def using(self):

def update_crypt(self, cipher: Cipher):
"""Updates Crypt model and cipher_buffer."""
self.cipher_buffer[self.cipher_buffer_key].update({cipher.hashed_value: cipher.secret})
try:
crypt = self.crypt_model_cls.objects.using(self.using).get(
hash=cipher.hashed_value, algorithm=self.algorithm, mode=self.access_mode
)
crypt.secret = cipher.secret
crypt.save()
except ObjectDoesNotExist:
self.crypt_model_cls.objects.using(self.using).create(
hash=cipher.hashed_value,
Expand All @@ -138,6 +136,12 @@ def update_crypt(self, cipher: Cipher):
cipher_mode=self.aes_encryption_mode,
mode=self.access_mode,
)
else:
crypt.secret = cipher.secret
crypt.save()
cache.set(self.cipher_buffer_key + cipher.hashed_value, cipher.secret)
# self.cipher_buffer[self.cipher_buffer_key].update(
# {cipher.hashed_value: cipher.secret})

def get_prep_value(self, value: str | bytes | None) -> str | bytes | None:
"""Returns the prefix + hash_value as stored in the DB table column of
Expand All @@ -154,52 +158,48 @@ def get_prep_value(self, value: str | bytes | None) -> str | bytes | None:
hash_with_prefix = safe_decode(hash_with_prefix)
return hash_with_prefix or value

def fetch_secret(self, hash_with_prefix: bytes):
def fetch_secret(self, hash_with_prefix: bytes) -> bytes | None:
"""Fetch the secret from the DB or the buffer using
the hashed_value as the lookup.
If not found in buffer, lookup in DB and update the buffer.
A secret is the segment to follow the `enc2:::`.
"""
secret = None
hash_with_prefix = safe_encode_utf8(hash_with_prefix)
hashed_value = hash_with_prefix[len(HASH_PREFIX) :][: self.hash_size] or None
secret = self.cipher_buffer[self.cipher_buffer_key].get(hashed_value)
if hashed_value := hash_with_prefix[len(HASH_PREFIX) :][: self.hash_size] or None:
secret = cache.get(self.cipher_buffer_key + hashed_value, None)
# secret = self.cipher_buffer[self.cipher_buffer_key].get(hashed_value)
if not secret:
try:
data = (
self.crypt_model_cls.objects.using(self.using)
.values("secret")
.get(hash=hashed_value, algorithm=self.algorithm, mode=self.access_mode)
)
secret = data.get("secret")
self.cipher_buffer[self.cipher_buffer_key].update({hashed_value: secret})
except ObjectDoesNotExist:
raise EncryptionError(
f"EncryptionError. Failed to get secret for given {self.algorithm} "
f"{self.access_mode} hash. Got '{str(hash_with_prefix)}'"
)
else:
secret = data.get("secret")
cache.set(self.cipher_buffer_key + hashed_value, secret)
# self.cipher_buffer[self.cipher_buffer_key].update({hashed_value: secret})
return secret

def is_encrypted(self, value: str | bytes | None) -> bool:
@staticmethod
def is_encrypted(value: str | bytes | None) -> bool:
"""Returns True if value is encrypted.
An encrypted value starts with the hash_prefix.
Inspects a value that is:
* a string value -> False
* a well-formed hash
* a well-formed hash_prefix + hash -> True
* a well-formed hash + secret.
"""
is_encrypted = False
if value is not None:
value = safe_encode_utf8(value)
if value.startswith(safe_encode_utf8(HASH_PREFIX)):
p = CipherParser(value, self.salt_key)
p.validate_hashed_value()
is_encrypted = True
return is_encrypted
return True
return False

def mask(self, value, mask=None):
"""Returns 'mask' if value is encrypted."""
Expand Down
15 changes: 1 addition & 14 deletions django_crypto_fields/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from django.apps import apps as django_apps
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured

from .constants import CIPHER_PREFIX, ENCODING, HASH_ALGORITHM, HASH_PREFIX, HASH_ROUNDS
from .exceptions import MalformedCiphertextError
Expand Down Expand Up @@ -43,19 +42,7 @@ def get_crypt_model() -> str:


def get_crypt_model_cls() -> Type[Crypt]:
"""Return the Crypt model that is active in this project."""
try:
return django_apps.get_model(get_crypt_model(), require_ready=False)
except ValueError:
raise ImproperlyConfigured(
"Invalid. `settings.DJANGO_CRYPTO_FIELDS_MODEL` must refer to a model "
f"using lower_label format. Got {get_crypt_model()}."
)
except LookupError:
raise ImproperlyConfigured(
"Invalid. `settings.DJANGO_CRYPTO_FIELDS_MODEL` refers to a model "
f"that has not been installed. Got {get_crypt_model()}."
)
return django_apps.get_model(get_crypt_model())


def get_auto_create_keys_from_settings() -> bool:
Expand Down

0 comments on commit 530c76a

Please sign in to comment.