Skip to content

Commit

Permalink
MAINT: Prepare for adding AES support via #1816
Browse files Browse the repository at this point in the history
Full credit goes to exiledkingcc

This PR was only made to make it easier to merge the other changes /
to avoid merge conflicts.

Co-authored-by: exiledkingcc <exiledkingcc@gmail.com>
  • Loading branch information
MartinThoma and exiledkingcc committed Jun 25, 2023
1 parent bd904ea commit 77e3932
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 7 deletions.
146 changes: 143 additions & 3 deletions pypdf/_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import hashlib
import secrets
import struct
from enum import IntEnum
from enum import Enum, IntEnum
from typing import Any, Dict, Optional, Tuple, Union, cast

from ._utils import logger_warning
Expand All @@ -36,6 +36,8 @@
ArrayObject,
ByteStringObject,
DictionaryObject,
NameObject,
NumberObject,
PdfObject,
StreamObject,
TextStringObject,
Expand Down Expand Up @@ -175,8 +177,28 @@ def __init__(
self.efCrypt = efCrypt

def encrypt_object(self, obj: PdfObject) -> PdfObject:
# TODO
return NotImplemented
if isinstance(obj, ByteStringObject):
data = self.strCrypt.encrypt(obj.original_bytes)
obj = ByteStringObject(data)
if isinstance(obj, TextStringObject):
data = self.strCrypt.encrypt(obj.get_encoded_bytes())
obj = ByteStringObject(data)
elif isinstance(obj, StreamObject):
obj2 = StreamObject()
obj2.update(obj)
obj2._data = self.stmCrypt.encrypt(obj._data)
obj = obj2
elif isinstance(obj, DictionaryObject):
obj2 = DictionaryObject() # type: ignore
for key, value in obj.items():
obj2[key] = self.encrypt_object(value)
obj = obj2
elif isinstance(obj, ArrayObject):
obj2 = ArrayObject() # type: ignore
for x in obj:
obj2.append(self.encrypt_object(x)) # type: ignore
obj = obj2
return obj

def decrypt_object(self, obj: PdfObject) -> PdfObject:
if isinstance(obj, (ByteStringObject, TextStringObject)):
Expand Down Expand Up @@ -737,6 +759,8 @@ def generate_values(
p: int,
metadata_encrypted: bool,
) -> Dict[Any, Any]:
user_password = user_password[:127]
owner_password = owner_password[:127]
u_value, ue_value = AlgV5.compute_U_value(R, user_password, key)
o_value, oe_value = AlgV5.compute_O_value(R, owner_password, key, u_value)
perms = AlgV5.compute_Perms_value(key, p, metadata_encrypted)
Expand Down Expand Up @@ -874,6 +898,15 @@ class PasswordType(IntEnum):
OWNER_PASSWORD = 2


class EncryptAlgorithm(tuple, Enum): # noqa: SLOT001
# V, R, Length
RC4_40 = (1, 2, 40)
RC4_128 = (2, 3, 128)
AES_128 = (4, 4, 128)
AES_256_R5 = (5, 5, 256)
AES_256 = (5, 6, 256)


class EncryptionValues:
O: bytes # noqa
U: bytes
Expand Down Expand Up @@ -940,6 +973,14 @@ def __init__(
def is_decrypted(self) -> bool:
return self._password_type != PasswordType.NOT_DECRYPTED

def encrypt_object(self, obj: PdfObject, idnum: int, generation: int) -> PdfObject:
# skip calculate key
if not self._is_encryption_object(obj):
return obj

cf = self._make_crypt_filter(idnum, generation)
return cf.encrypt_object(obj)

def decrypt_object(self, obj: PdfObject, idnum: int, generation: int) -> PdfObject:
# skip calculate key
if not self._is_encryption_object(obj):
Expand Down Expand Up @@ -1103,6 +1144,77 @@ def verify_v5(self, password: bytes) -> Tuple[bytes, PasswordType]:
logger_warning("ignore '/Perms' verify failed", __name__)
return key, rc

def write_entry(
self, user_password: str, owner_password: Optional[str]
) -> DictionaryObject:
user_pwd = self._encode_password(user_password)
owner_pwd = self._encode_password(owner_password) if owner_password else None
if owner_pwd is None:
owner_pwd = user_pwd

if self.V <= 4:
self.compute_values_v4(user_pwd, owner_pwd)
else:
self._key = secrets.token_bytes(self.Length // 8)
values = AlgV5.generate_values(
self.R, user_pwd, owner_pwd, self._key, self.P, self.EncryptMetadata
)
self.values.O = values["/O"]
self.values.U = values["/U"]
self.values.OE = values["/OE"]
self.values.UE = values["/UE"]
self.values.Perms = values["/Perms"]

dictObj = DictionaryObject()
dictObj[NameObject("/V")] = NumberObject(self.V)
dictObj[NameObject("/R")] = NumberObject(self.R)
dictObj[NameObject("/Length")] = NumberObject(self.Length)
dictObj[NameObject("/P")] = NumberObject(self.P)
dictObj[NameObject("/Filter")] = NameObject("/Standard")
# ignore /EncryptMetadata

dictObj[NameObject("/O")] = ByteStringObject(self.values.O)
dictObj[NameObject("/U")] = ByteStringObject(self.values.U)

if self.V >= 4:
# TODO: allow different method
StdCF = DictionaryObject()
StdCF[NameObject("/AuthEvent")] = NameObject("/DocOpen")
StdCF[NameObject("/CFM")] = NameObject(self.StmF)
StdCF[NameObject("/Length")] = NumberObject(self.Length // 8)
CF = DictionaryObject()
CF[NameObject("/StdCF")] = StdCF
dictObj[NameObject("/CF")] = CF
dictObj[NameObject("/StmF")] = NameObject("/StdCF")
dictObj[NameObject("/StrF")] = NameObject("/StdCF")
# ignore EFF
# dictObj[NameObject("/EFF")] = NameObject("/StdCF")

if self.V >= 5:
dictObj[NameObject("/OE")] = ByteStringObject(self.values.OE)
dictObj[NameObject("/UE")] = ByteStringObject(self.values.UE)
dictObj[NameObject("/Perms")] = ByteStringObject(self.values.Perms)
return dictObj

def compute_values_v4(self, user_password: bytes, owner_password: bytes) -> None:
rc4_key = AlgV4.compute_O_value_key(owner_password, self.R, self.Length)
o_value = AlgV4.compute_O_value(rc4_key, user_password, self.R)

key = AlgV4.compute_key(
user_password,
self.R,
self.Length,
o_value,
self.P,
self.id1_entry,
self.EncryptMetadata,
)
u_value = AlgV4.compute_U_value(key, self.R, self.id1_entry)

self._key = key
self.values.O = o_value
self.values.U = u_value

@staticmethod
def read(encryption_entry: DictionaryObject, first_id_entry: bytes) -> "Encryption":
filter = encryption_entry.get("/Filter")
Expand Down Expand Up @@ -1166,3 +1278,31 @@ def read(encryption_entry: DictionaryObject, first_id_entry: bytes) -> "Encrypti
EFF=EFF,
entry=encryption_entry, # can be deleted?
)

@staticmethod
def make(
alg: EncryptAlgorithm, permissions: int, first_id_entry: bytes
) -> "Encryption":
V, R, Length = cast(tuple, alg)
P = permissions

StmF, StrF, EFF = "/V2", "/V2", "/V2"

if alg == EncryptAlgorithm.AES_128:
StmF, StrF, EFF = "/AESV2", "/AESV2", "/AESV2"
elif alg in (EncryptAlgorithm.AES_256_R5, EncryptAlgorithm.AES_256):
StmF, StrF, EFF = "/AESV3", "/AESV3", "/AESV3"

return Encryption(
V=V,
R=R,
Length=Length,
P=P,
EncryptMetadata=True,
first_id_entry=first_id_entry,
values=None,
StrF=StrF,
StmF=StmF,
EFF=EFF,
entry=DictionaryObject(), # Dummy entry for the moment; will get removed
)
10 changes: 7 additions & 3 deletions pypdf/generic/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,16 +557,20 @@ def get_original_bytes(self) -> bytes:
else:
raise Exception("no information about original bytes")

def write_to_stream(
self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
) -> None:
def get_encoded_bytes(self) -> bytes:
# Try to write the string out as a PDFDocEncoding encoded string. It's
# nicer to look at in the PDF file. Sadly, we take a performance hit
# here for trying...
try:
bytearr = encode_pdfdocencoding(self)
except UnicodeEncodeError:
bytearr = codecs.BOM_UTF16_BE + self.encode("utf-16be")
return bytearr

def write_to_stream(
self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
) -> None:
bytearr = self.get_encoded_bytes()
if encryption_key:
from .._security import RC4_encrypt

Expand Down
2 changes: 1 addition & 1 deletion tests/test_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
# qpdf --encrypt "asdfzxcv" "" 40 -- unencrypted.pdf r2-user-password.pdf
("r2-user-password.pdf", False),
# created by:
# qpdf --encrypt "" "asdfzxcv" 40 -- unencrypted.pdf r2-user-password.pdf
# qpdf --encrypt "" "asdfzxcv" 40 -- unencrypted.pdf r2-owner-password.pdf
("r2-owner-password.pdf", False),
# created by:
# qpdf --encrypt "asdfzxcv" "" 128 -- unencrypted.pdf r3-user-password.pdf
Expand Down

0 comments on commit 77e3932

Please sign in to comment.