Skip to content

Commit

Permalink
add classes to build and parse ciphers
Browse files Browse the repository at this point in the history
  • Loading branch information
erikvw committed Mar 20, 2024
1 parent 14ef68e commit 77609ae
Show file tree
Hide file tree
Showing 10 changed files with 282 additions and 301 deletions.
4 changes: 4 additions & 0 deletions django_crypto_fields/cipher/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .cipher import Cipher
from .cipher_parser import CipherParser

__all__ = ["Cipher", "CipherParser"]
44 changes: 44 additions & 0 deletions django_crypto_fields/cipher/cipher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from __future__ import annotations

from typing import Callable

from ..constants import CIPHER_PREFIX, HASH_PREFIX
from ..utils import make_hash, safe_encode_utf8

__all__ = ["Cipher"]


class Cipher:
"""A class that given a value builds a cipher of the format
hash_prefix + hashed_value + cipher_prefix + secret.
The secret is encrypted using the passed `encrypt` callable.
"""

def __init__(
self,
value: str | bytes,
salt_key: bytes,
encrypt: Callable[[bytes], bytes] | None = None,
):
encoded_value = safe_encode_utf8(value)
self.hash_prefix = b""
self.hashed_value = b""
self.cipher_prefix = b""
self.secret = b""
if salt_key:
self.hash_prefix: bytes = safe_encode_utf8(HASH_PREFIX)
self.hashed_value: bytes = make_hash(encoded_value, salt_key)
if encrypt:
self.secret = encrypt(encoded_value)
self.cipher_prefix: bytes = safe_encode_utf8(CIPHER_PREFIX)

@property
def cipher(self) -> bytes:
return self.hash_prefix + self.hashed_value + self.cipher_prefix + self.secret

def hash_with_prefix(self) -> bytes:
return self.hash_prefix + self.hashed_value

def secret_with_prefix(self) -> bytes:
return self.cipher_prefix + self.secret
57 changes: 57 additions & 0 deletions django_crypto_fields/cipher/cipher_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from __future__ import annotations

from ..constants import CIPHER_PREFIX, HASH_PREFIX
from ..exceptions import MalformedCiphertextError
from ..utils import make_hash, safe_encode_utf8

__all__ = ["CipherParser"]


class CipherParser:
def __init__(self, cipher: bytes, salt_key: bytes | None = None):
self._cipher_prefix = None
self._hash_prefix = None
self._hashed_value = None
self._secret = None
self.cipher = safe_encode_utf8(cipher)
self.salt_key = salt_key
self.validate_hashed_value()
self.validate_secret()

@property
def hash_prefix(self) -> bytes | None:
if self.cipher:
hash_prefix = safe_encode_utf8(HASH_PREFIX)
self._hash_prefix = hash_prefix if self.cipher.startswith(hash_prefix) else None
return self._hash_prefix

@property
def cipher_prefix(self) -> bytes | None:
if self.cipher:
cipher_prefix = safe_encode_utf8(CIPHER_PREFIX)
self._cipher_prefix = cipher_prefix if cipher_prefix in self.cipher else None
return self._cipher_prefix

@property
def hashed_value(self) -> bytes | None:
if self.cipher and self.cipher.startswith(self.hash_prefix):
self._hashed_value = self.cipher.split(self.hash_prefix)[1].split(
self.cipher_prefix
)[0]
return self._hashed_value

@property
def secret(self) -> bytes | None:
if self.cipher and safe_encode_utf8(CIPHER_PREFIX) in self.cipher:
self._secret = self.cipher.split(self.cipher_prefix)[1]
return self._secret

def validate_hashed_value(self) -> None:
if self.hash_prefix and not self.hashed_value:
raise MalformedCiphertextError("Invalid hashed_value. Got None.")
elif self.salt_key and len(self.hashed_value) != len(make_hash("Foo", self.salt_key)):
raise MalformedCiphertextError("Invalid hashed_value. Incorrect size.")

def validate_secret(self) -> None:
if self.cipher_prefix and not self.secret:
raise MalformedCiphertextError("Invalid secret. Got None.")
42 changes: 20 additions & 22 deletions django_crypto_fields/cryptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from Cryptodome import Random
from Cryptodome.Cipher import AES as AES_CIPHER

from .constants import AES, ENCODING, PRIVATE, PUBLIC, RSA
from .constants import AES, ENCODING, LOCAL_MODE, PRIVATE, PUBLIC, RSA
from .exceptions import EncryptionError
from .keys import encryption_keys
from .utils import get_keypath_from_settings
Expand All @@ -15,8 +15,6 @@
from Cryptodome.Cipher._mode_cbc import CbcMode
from Cryptodome.Cipher.PKCS1_OAEP import PKCS1OAEP_Cipher

from .keys import Keys


class Cryptor:
"""Base class for all classes providing RSA and AES encryption
Expand All @@ -26,9 +24,17 @@ class Cryptor:
of this except the filenames are replaced with the actual keys.
"""

def __init__(self):
def __init__(self, algorithm: AES | RSA, access_mode: PRIVATE | LOCAL_MODE = None) -> None:
self.algorithm = algorithm
self.aes_encryption_mode: int = AES_CIPHER.MODE_CBC
self.keys: Keys = encryption_keys
aes_key_attr: str = "_".join([AES, access_mode, PRIVATE, "key"])
self.aes_key: bytes = getattr(encryption_keys, aes_key_attr)
rsa_key_attr = "_".join([RSA, access_mode, PUBLIC, "key"])
self.rsa_public_key: PKCS1OAEP_Cipher = getattr(encryption_keys, rsa_key_attr)
rsa_key_attr = "_".join([RSA, access_mode, PRIVATE, "key"])
self.rsa_private_key: PKCS1OAEP_Cipher = getattr(encryption_keys, rsa_key_attr)
self.encrypt = getattr(self, f"_{self.algorithm.lower()}_encrypt")
self.decrypt = getattr(self, f"_{self.algorithm.lower()}_decrypt")

def get_with_padding(self, plaintext: str | bytes, block_size: int) -> bytes:
"""Return string padded so length is a multiple of the block size.
Expand Down Expand Up @@ -73,42 +79,34 @@ def get_without_padding(self, plaintext: str | bytes) -> bytes:
return plaintext[:-1]
return plaintext[:-padding_length]

def aes_encrypt(self, plaintext: str | bytes, mode: str) -> bytes:
aes_key_attr: str = "_".join([AES, mode, PRIVATE, "key"])
aes_key: bytes = getattr(self.keys, aes_key_attr)
def _aes_encrypt(self, plaintext: str | bytes) -> bytes:
iv: bytes = Random.new().read(AES_CIPHER.block_size)
cipher: CbcMode = AES_CIPHER.new(aes_key, self.aes_encryption_mode, iv)
cipher: CbcMode = AES_CIPHER.new(self.aes_key, self.aes_encryption_mode, iv)
padded_plaintext = self.get_with_padding(plaintext, cipher.block_size)
return iv + cipher.encrypt(padded_plaintext)

def aes_decrypt(self, ciphertext: bytes, mode: str) -> str:
aes_key_attr: str = "_".join([AES, mode, PRIVATE, "key"])
aes_key: bytes = getattr(self.keys, aes_key_attr)
def _aes_decrypt(self, ciphertext: bytes) -> str:
iv = ciphertext[: AES_CIPHER.block_size]
cipher: CbcMode = AES_CIPHER.new(aes_key, self.aes_encryption_mode, iv)
cipher: CbcMode = AES_CIPHER.new(self.aes_key, self.aes_encryption_mode, iv)
plaintext = cipher.decrypt(ciphertext)[AES_CIPHER.block_size :]
return self.get_without_padding(plaintext).decode()

def rsa_encrypt(self, plaintext: str | bytes, mode: int) -> bytes:
rsa_key_attr = "_".join([RSA, mode, PUBLIC, "key"])
rsa_key: PKCS1OAEP_Cipher = getattr(self.keys, rsa_key_attr)
def _rsa_encrypt(self, plaintext: str | bytes) -> bytes:
try:
plaintext = plaintext.encode(ENCODING)
except AttributeError:
pass
try:
ciphertext = rsa_key.encrypt(plaintext)
ciphertext = self.rsa_public_key.encrypt(plaintext)
except (ValueError, TypeError) as e:
raise EncryptionError(f"RSA encryption failed for value. Got '{e}'")
return ciphertext

def rsa_decrypt(self, ciphertext: bytes, mode: str) -> str:
rsa_key_attr = "_".join([RSA, mode, PRIVATE, "key"])
rsa_key: PKCS1OAEP_Cipher = getattr(self.keys, rsa_key_attr)
def _rsa_decrypt(self, ciphertext: bytes) -> str:
try:
plaintext = rsa_key.decrypt(ciphertext)
plaintext = self.rsa_private_key.decrypt(ciphertext)
except ValueError as e:
raise EncryptionError(
f"{e} Using {rsa_key_attr} from key_path=`{get_keypath_from_settings()}`."
f"{e} Using RSA from key_path=`{get_keypath_from_settings()}`."
)
return plaintext.decode(ENCODING)
Loading

0 comments on commit 77609ae

Please sign in to comment.