Skip to content

Commit

Permalink
backport smime fix (#8390)
Browse files Browse the repository at this point in the history
* fixes #8298 -- correctly generate content-type header in PKCS#7 SMIME (#8389)

* add changelog

* better changelog

---------

Co-authored-by: Alex Gaynor <alex.gaynor@gmail.com>
  • Loading branch information
reaperhulk and alex authored Feb 26, 2023
1 parent d6951dc commit a69fe98
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 51 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
Changelog
=========

.. _v39-0-2:

39.0.2 - 2023-03-02
~~~~~~~~~~~~~~~~~~~

* Fixed a bug where the content type header was not properly encoded for
PKCS7 signatures when using the ``Text`` option and ``SMIME`` encoding.

.. _v39-0-1:

39.0.1 - 2023-02-07
Expand Down
23 changes: 19 additions & 4 deletions src/cryptography/hazmat/primitives/serialization/pkcs7.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import email.base64mime
import email.generator
import email.message
import email.policy
import io
import typing

Expand Down Expand Up @@ -177,7 +178,9 @@ def sign(
return rust_pkcs7.sign_and_serialize(self, encoding, options)


def _smime_encode(data: bytes, signature: bytes, micalg: str) -> bytes:
def _smime_encode(
data: bytes, signature: bytes, micalg: str, text_mode: bool
) -> bytes:
# This function works pretty hard to replicate what OpenSSL does
# precisely. For good and for ill.

Expand All @@ -192,9 +195,10 @@ def _smime_encode(data: bytes, signature: bytes, micalg: str) -> bytes:

m.preamble = "This is an S/MIME signed message\n"

msg_part = email.message.MIMEPart()
msg_part = OpenSSLMimePart()
msg_part.set_payload(data)
msg_part.add_header("Content-Type", "text/plain")
if text_mode:
msg_part.add_header("Content-Type", "text/plain")
m.attach(msg_part)

sig_part = email.message.MIMEPart()
Expand All @@ -213,7 +217,18 @@ def _smime_encode(data: bytes, signature: bytes, micalg: str) -> bytes:

fp = io.BytesIO()
g = email.generator.BytesGenerator(
fp, maxheaderlen=0, mangle_from_=False, policy=m.policy
fp,
maxheaderlen=0,
mangle_from_=False,
policy=m.policy.clone(linesep="\r\n"),
)
g.flatten(m)
return fp.getvalue()


class OpenSSLMimePart(email.message.MIMEPart):
# A MIMEPart subclass that replicates OpenSSL's behavior of not including
# a newline if there are no headers.
def _write_headers(self, generator) -> None:
if list(self.raw_items()):
generator._write_headers(self)
123 changes: 81 additions & 42 deletions src/rust/src/pkcs7.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,13 @@ fn sign_and_serialize<'p>(
.getattr(crate::intern!(py, "PKCS7Options"))?;

let raw_data = builder.getattr(crate::intern!(py, "_data"))?.extract()?;
let data = if options.contains(pkcs7_options.getattr(crate::intern!(py, "Binary"))?)? {
Cow::Borrowed(raw_data)
} else {
smime_canonicalize(
raw_data,
options.contains(pkcs7_options.getattr(crate::intern!(py, "Text"))?)?,
)
};
let text_mode = options.contains(pkcs7_options.getattr(crate::intern!(py, "Text"))?)?;
let (data_with_header, data_without_header) =
if options.contains(pkcs7_options.getattr(crate::intern!(py, "Binary"))?)? {
(Cow::Borrowed(raw_data), Cow::Borrowed(raw_data))
} else {
smime_canonicalize(raw_data, text_mode)
};

let content_type_bytes = asn1::write_single(&PKCS7_DATA_OID)?;
let signing_time_bytes = asn1::write_single(&x509::certificate::time_from_chrono(
Expand Down Expand Up @@ -180,7 +179,7 @@ fn sign_and_serialize<'p>(
{
(
None,
x509::sign::sign_data(py, py_private_key, py_hash_alg, &data)?,
x509::sign::sign_data(py, py_private_key, py_hash_alg, &data_with_header)?,
)
} else {
let mut authenticated_attrs = vec![];
Expand All @@ -198,7 +197,8 @@ fn sign_and_serialize<'p>(
])),
});

let digest = asn1::write_single(&x509::ocsp::hash_data(py, py_hash_alg, &data)?)?;
let digest =
asn1::write_single(&x509::ocsp::hash_data(py, py_hash_alg, &data_with_header)?)?;
// Gross hack: copy to PyBytes to extend the lifetime to 'p
let digest_bytes = pyo3::types::PyBytes::new(py, &digest);
authenticated_attrs.push(x509::csr::Attribute {
Expand Down Expand Up @@ -264,7 +264,7 @@ fn sign_and_serialize<'p>(
if options.contains(pkcs7_options.getattr(crate::intern!(py, "DetachedSignature"))?)? {
None
} else {
data_tlv_bytes = asn1::write_single(&data.deref())?;
data_tlv_bytes = asn1::write_single(&data_with_header.deref())?;
Some(asn1::parse_single(&data_tlv_bytes).unwrap())
};

Expand All @@ -290,7 +290,7 @@ fn sign_and_serialize<'p>(
content_type: PKCS7_SIGNED_DATA_OID,
content: Some(asn1::parse_single(&signed_data_bytes).unwrap()),
};
let content_info_bytes = asn1::write_single(&content_info)?;
let ci_bytes = asn1::write_single(&content_info)?;

let encoding_class = py
.import("cryptography.hazmat.primitives.serialization")?
Expand All @@ -302,43 +302,49 @@ fn sign_and_serialize<'p>(
.map(|d| OIDS_TO_MIC_NAME[&d.oid])
.collect::<Vec<_>>()
.join(",");
Ok(py
let smime_encode = py
.import("cryptography.hazmat.primitives.serialization.pkcs7")?
.getattr(crate::intern!(py, "_smime_encode"))?
.call1((
pyo3::types::PyBytes::new(py, &data),
pyo3::types::PyBytes::new(py, &content_info_bytes),
mic_algs,
))?
.getattr(crate::intern!(py, "_smime_encode"))?;
Ok(smime_encode
.call1((&*data_without_header, &*ci_bytes, mic_algs, text_mode))?
.extract()?)
} else {
// Handles the DER, PEM, and error cases
encode_der_data(py, "PKCS7".to_string(), content_info_bytes, encoding)
encode_der_data(py, "PKCS7".to_string(), ci_bytes, encoding)
}
}

fn smime_canonicalize(data: &[u8], text_mode: bool) -> Cow<'_, [u8]> {
let mut new_data = vec![];
fn smime_canonicalize(data: &[u8], text_mode: bool) -> (Cow<'_, [u8]>, Cow<'_, [u8]>) {
let mut new_data_with_header = vec![];
let mut new_data_without_header = vec![];
if text_mode {
new_data.extend_from_slice(b"Content-Type: text/plain\r\n\r\n");
new_data_with_header.extend_from_slice(b"Content-Type: text/plain\r\n\r\n");
}

let mut last_idx = 0;
for (i, c) in data.iter().copied().enumerate() {
if c == b'\n' && (i == 0 || data[i - 1] != b'\r') {
new_data.extend_from_slice(&data[last_idx..i]);
new_data.push(b'\r');
new_data.push(b'\n');
new_data_with_header.extend_from_slice(&data[last_idx..i]);
new_data_with_header.push(b'\r');
new_data_with_header.push(b'\n');

new_data_without_header.extend_from_slice(&data[last_idx..i]);
new_data_without_header.push(b'\r');
new_data_without_header.push(b'\n');
last_idx = i + 1;
}
}
// If there's stuff in new_data, that means we need to copy the rest of
// data over.
if !new_data.is_empty() {
new_data.extend_from_slice(&data[last_idx..]);
Cow::Owned(new_data)
if !new_data_with_header.is_empty() {
new_data_with_header.extend_from_slice(&data[last_idx..]);
new_data_without_header.extend_from_slice(&data[last_idx..]);
(
Cow::Owned(new_data_with_header),
Cow::Owned(new_data_without_header),
)
} else {
Cow::Borrowed(data)
(Cow::Borrowed(data), Cow::Borrowed(data))
}
}

Expand All @@ -359,27 +365,60 @@ mod tests {

#[test]
fn test_smime_canonicalize() {
for (input, text_mode, expected, expected_is_borrowed) in [
for (
input,
text_mode,
expected_with_header,
expected_without_header,
expected_is_borrowed,
) in [
// Values with text_mode=false
(b"" as &[u8], false, b"" as &[u8], true),
(b"\n", false, b"\r\n", false),
(b"abc", false, b"abc", true),
(b"abc\r\ndef\n", false, b"abc\r\ndef\r\n", false),
(b"abc\r\n", false, b"abc\r\n", true),
(b"abc\ndef\n", false, b"abc\r\ndef\r\n", false),
(b"" as &[u8], false, b"" as &[u8], b"" as &[u8], true),
(b"\n", false, b"\r\n", b"\r\n", false),
(b"abc", false, b"abc", b"abc", true),
(
b"abc\r\ndef\n",
false,
b"abc\r\ndef\r\n",
b"abc\r\ndef\r\n",
false,
),
(b"abc\r\n", false, b"abc\r\n", b"abc\r\n", true),
(
b"abc\ndef\n",
false,
b"abc\r\ndef\r\n",
b"abc\r\ndef\r\n",
false,
),
// Values with text_mode=true
(b"", true, b"Content-Type: text/plain\r\n\r\n", false),
(b"abc", true, b"Content-Type: text/plain\r\n\r\nabc", false),
(b"", true, b"Content-Type: text/plain\r\n\r\n", b"", false),
(
b"abc",
true,
b"Content-Type: text/plain\r\n\r\nabc",
b"abc",
false,
),
(
b"abc\n",
true,
b"Content-Type: text/plain\r\n\r\nabc\r\n",
b"abc\r\n",
false,
),
] {
let result = smime_canonicalize(input, text_mode);
assert_eq!(result.deref(), expected);
assert_eq!(matches!(result, Cow::Borrowed(_)), expected_is_borrowed);
let (result_with_header, result_without_header) = smime_canonicalize(input, text_mode);
assert_eq!(result_with_header.deref(), expected_with_header);
assert_eq!(result_without_header.deref(), expected_without_header);
assert_eq!(
matches!(result_with_header, Cow::Borrowed(_)),
expected_is_borrowed
);
assert_eq!(
matches!(result_without_header, Cow::Borrowed(_)),
expected_is_borrowed
);
}
}
}
25 changes: 20 additions & 5 deletions tests/hazmat/primitives/test_pkcs7.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# for complete details.


import email.parser
import os
import typing

Expand Down Expand Up @@ -289,6 +290,7 @@ def test_smime_sign_detached(self, backend):

sig = builder.sign(serialization.Encoding.SMIME, options)
sig_binary = builder.sign(serialization.Encoding.DER, options)
assert b"text/plain" not in sig
# We don't have a generic ASN.1 parser available to us so we instead
# will assert on specific byte sequences being present based on the
# parameters chosen above.
Expand All @@ -298,8 +300,17 @@ def test_smime_sign_detached(self, backend):
# as a separate section before the PKCS7 data. So we should expect to
# have data in sig but not in sig_binary
assert data in sig
# Parse the message to get the signed data, which is the
# first payload in the message
message = email.parser.BytesParser().parsebytes(sig)
signed_data = message.get_payload()[0].get_payload().encode()
_pkcs7_verify(
serialization.Encoding.SMIME, sig, data, [cert], options, backend
serialization.Encoding.SMIME,
sig,
signed_data,
[cert],
options,
backend,
)
assert data not in sig_binary
_pkcs7_verify(
Expand Down Expand Up @@ -492,10 +503,14 @@ def test_sign_text(self, backend):
# The text option adds text/plain headers to the S/MIME message
# These headers are only relevant in SMIME mode, not binary, which is
# just the PKCS7 structure itself.
assert b"text/plain" in sig_pem
# When passing the Text option the header is prepended so the actual
# signed data is this.
signed_data = b"Content-Type: text/plain\r\n\r\nhello world"
assert sig_pem.count(b"text/plain") == 1
assert b"Content-Type: text/plain\r\n\r\nhello world\r\n" in sig_pem
# Parse the message to get the signed data, which is the
# first payload in the message
message = email.parser.BytesParser().parsebytes(sig_pem)
signed_data = message.get_payload()[0].as_bytes(
policy=message.policy.clone(linesep="\r\n")
)
_pkcs7_verify(
serialization.Encoding.SMIME,
sig_pem,
Expand Down

0 comments on commit a69fe98

Please sign in to comment.