Skip to content

Commit

Permalink
make cache prefix key more specific, remove some value checks in meth…
Browse files Browse the repository at this point in the history
…ods/func involved in the encrypt, decrypt flow
  • Loading branch information
erikvw committed Mar 21, 2024
1 parent 530c76a commit 3eee158
Show file tree
Hide file tree
Showing 20 changed files with 385 additions and 254 deletions.
8 changes: 6 additions & 2 deletions django_crypto_fields/cipher/cipher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@

class Cipher:
"""A class that given a value builds a cipher of the format
hash_prefix + hashed_value + cipher_prefix + secret.
hash_prefix + hashed_value + cipher_prefix + secret.
.
For example:
enc1:::234234ed234a24enc2::\x0e\xb9\xae\x13s\x8d
\xe7O\xbb\r\x99.
The secret is encrypted using the passed `encrypt` callable.
The secret is encrypted using the passed `encrypt` callable.
"""

def __init__(
Expand Down
92 changes: 28 additions & 64 deletions django_crypto_fields/cryptor.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
from __future__ import annotations

import binascii
from typing import TYPE_CHECKING

from Cryptodome import Random
from Cryptodome.Cipher import AES as AES_CIPHER

from .constants import AES, ENCODING, LOCAL_MODE, PRIVATE, PUBLIC, RSA
from .constants import AES, ENCODING, PRIVATE, PUBLIC, RSA
from .exceptions import EncryptionError
from .keys import encryption_keys
from .utils import get_keypath_from_settings
from .utils import (
append_padding,
get_keypath_from_settings,
remove_padding,
safe_encode_utf8,
)

if TYPE_CHECKING:
from Cryptodome.Cipher._mode_cbc import CbcMode
from Cryptodome.Cipher.PKCS1_OAEP import PKCS1OAEP_Cipher

__all__ = ["Cryptor"]


class Cryptor:
"""Base class for all classes providing RSA and AES encryption
Expand All @@ -24,7 +30,7 @@ class Cryptor:
of this except the filenames are replaced with the actual keys.
"""

def __init__(self, algorithm: AES | RSA, access_mode: PRIVATE | LOCAL_MODE = None) -> None:
def __init__(self, algorithm, access_mode) -> None:
self.algorithm = algorithm
self.aes_encryption_mode: int = AES_CIPHER.MODE_CBC
aes_key_attr: str = "_".join([AES, access_mode, PRIVATE, "key"])
Expand All @@ -36,77 +42,35 @@ def __init__(self, algorithm: AES | RSA, access_mode: PRIVATE | LOCAL_MODE = Non
self.encrypt = getattr(self, f"_{self.algorithm.lower()}_encrypt")
self.decrypt = getattr(self, f"_{self.algorithm.lower()}_decrypt")

def get_with_padding(self, plaintext: str | bytes, block_size: int) -> bytes:
"""Return string padded so length is a multiple of the block size.
* store length of padding the last hex value.
* if padding is 0, pad as if padding is 16.
* AES_CIPHER.MODE_CFB should not be used, but was used
without padding in the past. Continue to skip padding
for this mode.
"""
try:
plaintext = plaintext.encode(ENCODING)
except AttributeError:
pass
if self.aes_encryption_mode == AES_CIPHER.MODE_CFB:
padding_length = 0
else:
padding_length = (block_size - len(plaintext) % block_size) % block_size
padding_length = padding_length or 16
padded = (
plaintext
+ (b"\x00" * (padding_length - 1))
+ binascii.a2b_hex(str(padding_length).zfill(2))
)
if len(padded) % block_size > 0:
multiple = len(padded) / block_size
raise EncryptionError(
f"Padding error, got padded string not a multiple "
f"of {block_size}. Got {multiple}"
)
return padded

def get_without_padding(self, plaintext: str | bytes) -> bytes:
"""Return original plaintext without padding.
Length of padding is stored in last two characters of
plaintext.
"""
if self.aes_encryption_mode == AES_CIPHER.MODE_CFB:
return plaintext
padding_length = int(binascii.b2a_hex(plaintext[-1:]))
if not padding_length:
return plaintext[:-1]
return plaintext[:-padding_length]

def _aes_encrypt(self, plaintext: str | bytes) -> bytes:
def _aes_encrypt(self, value: str | bytes) -> bytes:
encoded_value = safe_encode_utf8(value)
iv: bytes = Random.new().read(AES_CIPHER.block_size)
cipher: CbcMode = AES_CIPHER.new(self.aes_key, self.aes_encryption_mode, iv)
padded_plaintext = self.get_with_padding(plaintext, cipher.block_size)
return iv + cipher.encrypt(padded_plaintext)
encoded_value = append_padding(encoded_value, cipher.block_size)
secret = iv + cipher.encrypt(encoded_value)
return secret

def _aes_decrypt(self, ciphertext: bytes) -> str:
iv = ciphertext[: AES_CIPHER.block_size]
def _aes_decrypt(self, secret: bytes) -> str:
iv = secret[: AES_CIPHER.block_size]
cipher: CbcMode = AES_CIPHER.new(self.aes_key, self.aes_encryption_mode, iv)
plaintext = cipher.decrypt(ciphertext)[AES_CIPHER.block_size :]
return self.get_without_padding(plaintext).decode()
encoded_value = cipher.decrypt(secret)[AES_CIPHER.block_size :]
encoded_value = remove_padding(encoded_value)
value = encoded_value.decode()
return value

def _rsa_encrypt(self, plaintext: str | bytes) -> bytes:
try:
plaintext = plaintext.encode(ENCODING)
except AttributeError:
pass
def _rsa_encrypt(self, value: str | bytes) -> bytes:
encoded_value = safe_encode_utf8(value)
try:
ciphertext = self.rsa_public_key.encrypt(plaintext)
secret = self.rsa_public_key.encrypt(encoded_value)
except (ValueError, TypeError) as e:
raise EncryptionError(f"RSA encryption failed for value. Got '{e}'")
return ciphertext
return secret

def _rsa_decrypt(self, ciphertext: bytes) -> str:
def _rsa_decrypt(self, secret: bytes) -> str:
try:
plaintext = self.rsa_private_key.decrypt(ciphertext)
encoded_value = self.rsa_private_key.decrypt(secret)
except ValueError as e:
raise EncryptionError(
f"{e} Using RSA from key_path=`{get_keypath_from_settings()}`."
)
return plaintext.decode(ENCODING)
return encoded_value.decode(ENCODING)
Loading

0 comments on commit 3eee158

Please sign in to comment.