Skip to content

Commit

Permalink
Migrate PKCS7 backend to Rust (#10228)
Browse files Browse the repository at this point in the history
* Migrate PKCS7 backend to Rust

* Disable PKCS7 functions under BoringSSL

* Misc PKCS7 fixes
  • Loading branch information
facutuesca authored Jan 22, 2024
1 parent d54093e commit 41daf2d
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 72 deletions.
57 changes: 1 addition & 56 deletions src/cryptography/hazmat/backends/openssl/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import typing

from cryptography import utils, x509
from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat.backends.openssl import aead
from cryptography.hazmat.backends.openssl.ciphers import _CipherContext
from cryptography.hazmat.bindings._rust import openssl as rust_openssl
Expand Down Expand Up @@ -863,61 +863,6 @@ def poly1305_supported(self) -> bool:
def pkcs7_supported(self) -> bool:
return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL

def load_pem_pkcs7_certificates(
self, data: bytes
) -> list[x509.Certificate]:
utils._check_bytes("data", data)
bio = self._bytes_to_bio(data)
p7 = self._lib.PEM_read_bio_PKCS7(
bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
)
if p7 == self._ffi.NULL:
self._consume_errors()
raise ValueError("Unable to parse PKCS7 data")

p7 = self._ffi.gc(p7, self._lib.PKCS7_free)
return self._load_pkcs7_certificates(p7)

def load_der_pkcs7_certificates(
self, data: bytes
) -> list[x509.Certificate]:
utils._check_bytes("data", data)
bio = self._bytes_to_bio(data)
p7 = self._lib.d2i_PKCS7_bio(bio.bio, self._ffi.NULL)
if p7 == self._ffi.NULL:
self._consume_errors()
raise ValueError("Unable to parse PKCS7 data")

p7 = self._ffi.gc(p7, self._lib.PKCS7_free)
return self._load_pkcs7_certificates(p7)

def _load_pkcs7_certificates(self, p7) -> list[x509.Certificate]:
nid = self._lib.OBJ_obj2nid(p7.type)
self.openssl_assert(nid != self._lib.NID_undef)
if nid != self._lib.NID_pkcs7_signed:
raise UnsupportedAlgorithm(
"Only basic signed structures are currently supported. NID"
f" for this data was {nid}",
_Reasons.UNSUPPORTED_SERIALIZATION,
)

if p7.d.sign == self._ffi.NULL:
raise ValueError(
"The provided PKCS7 has no certificate data, but a cert "
"loading method was called."
)

sk_x509 = p7.d.sign.cert
num = self._lib.sk_X509_num(sk_x509)
certs: list[x509.Certificate] = []
for i in range(num):
x509 = self._lib.sk_X509_value(sk_x509, i)
self.openssl_assert(x509 != self._ffi.NULL)
cert = self._ossl2cert(x509)
certs.append(cert)

return certs


class GetCipherByName:
def __init__(self, fmt: str):
Expand Down
6 changes: 6 additions & 0 deletions src/cryptography/hazmat/bindings/_rust/pkcs7.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,9 @@ def sign_and_serialize(
encoding: serialization.Encoding,
options: typing.Iterable[pkcs7.PKCS7Options],
) -> bytes: ...
def load_pem_pkcs7_certificates(
data: bytes,
) -> list[x509.Certificate]: ...
def load_der_pkcs7_certificates(
data: bytes,
) -> list[x509.Certificate]: ...
14 changes: 2 additions & 12 deletions src/cryptography/hazmat/primitives/serialization/pkcs7.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,12 @@
from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa
from cryptography.utils import _check_byteslike

load_pem_pkcs7_certificates = rust_pkcs7.load_pem_pkcs7_certificates

def load_pem_pkcs7_certificates(data: bytes) -> list[x509.Certificate]:
from cryptography.hazmat.backends.openssl.backend import backend

return backend.load_pem_pkcs7_certificates(data)


def load_der_pkcs7_certificates(data: bytes) -> list[x509.Certificate]:
from cryptography.hazmat.backends.openssl.backend import backend

return backend.load_der_pkcs7_certificates(data)

load_der_pkcs7_certificates = rust_pkcs7.load_der_pkcs7_certificates

serialize_certificates = rust_pkcs7.serialize_certificates


PKCS7HashTypes = typing.Union[
hashes.SHA224,
hashes.SHA256,
Expand Down
2 changes: 1 addition & 1 deletion src/rust/cryptography-openssl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ rust-version = "1.63.0"

[dependencies]
openssl = "0.10.63"
ffi = { package = "openssl-sys", version = "0.9.91" }
ffi = { package = "openssl-sys", version = "0.9.99" }
foreign-types = "0.3"
foreign-types-shared = "0.1"
95 changes: 93 additions & 2 deletions src/rust/src/pkcs7.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,17 @@ use std::ops::Deref;
use cryptography_x509::csr::Attribute;
use cryptography_x509::{common, oid, pkcs7};
use once_cell::sync::Lazy;
#[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))]
use openssl::pkcs7::Pkcs7;
#[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))]
use pyo3::IntoPy;

use crate::asn1::encode_der_data;
use crate::buf::CffiBuf;
use crate::error::CryptographyResult;
use crate::{types, x509};
use crate::error::{CryptographyError, CryptographyResult};
#[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))]
use crate::x509::certificate::load_der_x509_certificate;
use crate::{exceptions, types, x509};

const PKCS7_CONTENT_TYPE_OID: asn1::ObjectIdentifier = asn1::oid!(1, 2, 840, 113549, 1, 9, 3);
const PKCS7_MESSAGE_DIGEST_OID: asn1::ObjectIdentifier = asn1::oid!(1, 2, 840, 113549, 1, 9, 4);
Expand Down Expand Up @@ -290,11 +296,96 @@ fn smime_canonicalize(data: &[u8], text_mode: bool) -> (Cow<'_, [u8]>, Cow<'_, [
}
}

#[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))]
fn load_pkcs7_certificates(
py: pyo3::Python<'_>,
pkcs7: Pkcs7,
) -> CryptographyResult<&pyo3::types::PyList> {
let nid = pkcs7.type_().map(|t| t.nid());
if nid != Some(openssl::nid::Nid::PKCS7_SIGNED) {
let nid_string = nid.map_or("empty".to_string(), |n| n.as_raw().to_string());
return Err(CryptographyError::from(
exceptions::UnsupportedAlgorithm::new_err((
format!("Only basic signed structures are currently supported. NID for this data was {}", nid_string),
exceptions::Reasons::UNSUPPORTED_SERIALIZATION,
)),
));
}

let signed_certificates = pkcs7.signed().and_then(|x| x.certificates());
match signed_certificates {
None => Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err(
"The provided PKCS7 has no certificate data, but a cert loading method was called.",
),
)),
Some(certificates) => {
let result = pyo3::types::PyList::empty(py);
for c in certificates {
let cert_der = pyo3::types::PyBytes::new(py, c.to_der()?.as_slice()).into_py(py);
let cert = load_der_x509_certificate(py, cert_der, None)?;
result.append(cert.into_py(py))?;
}
Ok(result)
}
}
}

#[pyo3::prelude::pyfunction]
fn load_pem_pkcs7_certificates<'p>(
py: pyo3::Python<'p>,
data: &[u8],
) -> CryptographyResult<&'p pyo3::types::PyList> {
cfg_if::cfg_if! {
if #[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))] {
let pkcs7_decoded = openssl::pkcs7::Pkcs7::from_pem(data).map_err(|_| {
CryptographyError::from(pyo3::exceptions::PyValueError::new_err(
"Unable to parse PKCS7 data",
))
})?;
load_pkcs7_certificates(py, pkcs7_decoded)
} else {
return Err(CryptographyError::from(
exceptions::UnsupportedAlgorithm::new_err((
"PKCS#7 is not supported by this backend.",
exceptions::Reasons::UNSUPPORTED_SERIALIZATION,
)),
));
}
}
}

#[pyo3::prelude::pyfunction]
fn load_der_pkcs7_certificates<'p>(
py: pyo3::Python<'p>,
data: &[u8],
) -> CryptographyResult<&'p pyo3::types::PyList> {
cfg_if::cfg_if! {
if #[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))] {
let pkcs7_decoded = openssl::pkcs7::Pkcs7::from_der(data).map_err(|_| {
CryptographyError::from(pyo3::exceptions::PyValueError::new_err(
"Unable to parse PKCS7 data",
))
})?;
load_pkcs7_certificates(py, pkcs7_decoded)
} else {
return Err(CryptographyError::from(
exceptions::UnsupportedAlgorithm::new_err((
"PKCS#7 is not supported by this backend.",
exceptions::Reasons::UNSUPPORTED_SERIALIZATION,
)),
));
}
}
}

pub(crate) fn create_submodule(py: pyo3::Python<'_>) -> pyo3::PyResult<&pyo3::prelude::PyModule> {
let submod = pyo3::prelude::PyModule::new(py, "pkcs7")?;

submod.add_function(pyo3::wrap_pyfunction!(serialize_certificates, submod)?)?;
submod.add_function(pyo3::wrap_pyfunction!(sign_and_serialize, submod)?)?;
submod.add_function(pyo3::wrap_pyfunction!(load_pem_pkcs7_certificates, submod)?)?;
submod.add_function(pyo3::wrap_pyfunction!(load_der_pkcs7_certificates, submod)?)?;

Ok(submod)
}
Expand Down
2 changes: 1 addition & 1 deletion src/rust/src/x509/certificate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ fn load_pem_x509_certificates(
}

#[pyo3::prelude::pyfunction]
fn load_der_x509_certificate(
pub(crate) fn load_der_x509_certificate(
py: pyo3::Python<'_>,
data: pyo3::Py<pyo3::types::PyBytes>,
backend: Option<&pyo3::PyAny>,
Expand Down
13 changes: 13 additions & 0 deletions tests/hazmat/primitives/test_pkcs7.py
Original file line number Diff line number Diff line change
Expand Up @@ -922,3 +922,16 @@ def test_invalid_types(self):
certs,
"not an encoding", # type: ignore[arg-type]
)


@pytest.mark.supported(
only_if=lambda backend: not backend.pkcs7_supported(),
skip_message="Requires OpenSSL without PKCS7 support (BoringSSL)",
)
class TestPKCS7Unsupported:
def test_pkcs7_functions_unsupported(self):
with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_SERIALIZATION):
pkcs7.load_der_pkcs7_certificates(b"nonsense")

with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_SERIALIZATION):
pkcs7.load_pem_pkcs7_certificates(b"nonsense")

0 comments on commit 41daf2d

Please sign in to comment.