Skip to content

Commit

Permalink
Merge pull request #124 from nabla-c0d3/pr-118
Browse files Browse the repository at this point in the history
Add functions to get/set signature algorithms
  • Loading branch information
nabla-c0d3 authored Dec 26, 2024
2 parents ba0d542 + aa7466f commit 14dfe18
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 1 deletion.
66 changes: 66 additions & 0 deletions nassl/_nassl/nassl_SSL.c
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,66 @@ static PyObject* nassl_SSL_set_ciphersuites(nassl_SSL_Object *self, PyObject *ar
}


// SSL_set1_sigalgs() is only available in OpenSSL 1.1.1
static PyObject* nassl_SSL_set1_sigalgs(nassl_SSL_Object *self, PyObject *args)
{
int i = 0;
PyObject *pyListOfOpensslNids;
Py_ssize_t nidsCount = 0;
int *listOfNids;

// Parse the Python list
if (!PyArg_ParseTuple(args, "O!", &PyList_Type, &pyListOfOpensslNids))
{
return NULL;
}

// Extract each NID int from the list
nidsCount = PyList_Size(pyListOfOpensslNids);
listOfNids = (int *) PyMem_Malloc(nidsCount * sizeof(int));
if (listOfNids == NULL)
{
return PyErr_NoMemory();
}

for (i=0; i<nidsCount; i++)
{
PyObject *pyNid;
int nid;

pyNid = PyList_GetItem(pyListOfOpensslNids, i);
if ((pyNid == NULL) || (!PyLong_Check(pyNid)))
{
PyMem_Free(listOfNids);
return NULL;
}
nid = PyLong_AsSize_t(pyNid);
listOfNids[i] = nid;
}

if (SSL_set1_sigalgs(self->ssl, listOfNids, nidsCount) != 1)
{
PyMem_Free(listOfNids);
return raise_OpenSSL_error();
}

PyMem_Free(listOfNids);
Py_RETURN_NONE;
}


static PyObject* nassl_get_peer_signature_nid(nassl_SSL_Object *self)
{
int psig_nid;

if(SSL_get_peer_signature_nid(self->ssl, &psig_nid) != 1)
{
return raise_OpenSSL_error();
}
return PyLong_FromUnsignedLong((long)psig_nid);
}


static PyObject* nassl_SSL_get0_verified_chain(nassl_SSL_Object *self, PyObject *args)
{
STACK_OF(X509) *verifiedCertChain = NULL;
Expand Down Expand Up @@ -1187,6 +1247,12 @@ static PyMethodDef nassl_SSL_Object_methods[] =
{"set_ciphersuites", (PyCFunction)nassl_SSL_set_ciphersuites, METH_VARARGS,
"OpenSSL's SSL_set_ciphersuites()."
},
{"set1_sigalgs", (PyCFunction)nassl_SSL_set1_sigalgs, METH_VARARGS,
"OpenSSL's SSL_set1_sigalgs()."
},
{"get_peer_signature_nid", (PyCFunction)nassl_get_peer_signature_nid, METH_NOARGS,
"OpenSSL's get_peer_signature_nid(). Returns a digest NID"
},
{"get0_verified_chain", (PyCFunction)nassl_SSL_get0_verified_chain, METH_NOARGS,
"OpenSSL's SSL_get0_verified_chain(). Returns an array of _nassl.X509 objects."
},
Expand Down
6 changes: 6 additions & 0 deletions nassl/ephemeral_key_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ class OpenSslEvpPkeyEnum(IntEnum):
EC = 408
X25519 = 1034
X448 = 1035
RSA = 6
DSA = 116
RSA_PSS = 912


class OpenSslEcNidEnum(IntEnum):
Expand Down Expand Up @@ -84,6 +87,9 @@ def get_supported_by_ssl_client(cls) -> List["OpenSslEcNidEnum"]:
OpenSslEvpPkeyEnum.EC: "ECDH",
OpenSslEvpPkeyEnum.X25519: "ECDH",
OpenSslEvpPkeyEnum.X448: "ECDH",
OpenSslEvpPkeyEnum.RSA: "RSA",
OpenSslEvpPkeyEnum.DSA: "DSA",
OpenSslEvpPkeyEnum.RSA_PSS: "RSA-PSS",
}


Expand Down
24 changes: 23 additions & 1 deletion nassl/ssl_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from nassl._nassl import WantReadError, OpenSSLError, WantX509LookupError

from enum import IntEnum
from typing import List, Any
from typing import List, Any, Tuple

from typing import Protocol

Expand All @@ -32,6 +32,17 @@ class OpenSslVerifyEnum(IntEnum):
CLIENT_ONCE = 4


class OpenSslDigestNidEnum(IntEnum):
"""SSL digest algorithms used for the signature algorithm, per obj_mac.h."""

MD5 = 4
SHA1 = 64
SHA224 = 675
SHA256 = 672
SHA384 = 673
SHA512 = 674


class OpenSslVersionEnum(IntEnum):
"""SSL version constants."""

Expand Down Expand Up @@ -455,6 +466,17 @@ def set_ciphersuites(self, cipher_suites: str) -> None:
# TODO(AD): Eventually merge this method with get/set_cipher_list()
self._ssl.set_ciphersuites(cipher_suites)

def set_signature_algorithms(self, algorithms: List[Tuple[OpenSslDigestNidEnum, OpenSslEvpPkeyEnum]]) -> None:
"""Set the enabled signature algorithms for the key exchange.
The algorithms parameter is a list of a public key algorithm and a digest."""
flattened_sigalgs = [item for sublist in algorithms for item in sublist]
self._ssl.set1_sigalgs(flattened_sigalgs)

def get_peer_signature_nid(self) -> OpenSslDigestNidEnum:
"""Get the digest used for TLS message signing."""
return OpenSslDigestNidEnum(self._ssl.get_peer_signature_nid())

def set_groups(self, supported_groups: List[OpenSslEcNidEnum]) -> None:
"""Specify elliptic curves or DH groups that are supported by the client in descending order."""
self._ssl.set1_groups(supported_groups)
Expand Down
60 changes: 60 additions & 0 deletions tests/ssl_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
SslClient,
OpenSSLError,
OpenSslEarlyDataStatusEnum,
OpenSslDigestNidEnum,
)
from nassl.ephemeral_key_info import (
OpenSslEvpPkeyEnum,
Expand Down Expand Up @@ -218,6 +219,7 @@ def test_get_verified_chain(self) -> None:

# And when requesting the verified certificate chain, it returns it
assert ssl_client.get_verified_chain()

finally:
ssl_client.shutdown()

Expand Down Expand Up @@ -361,16 +363,20 @@ def test_set_groups_curve_x448(self) -> None:
assert len(dh_info.public_bytes) == 56

def test_get_extended_master_secret_not_used(self) -> None:
# Given a TLS server that does NOT support the Extended Master Secret extension
with LegacyOpenSslServer() as server:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
sock.connect((server.hostname, server.port))

# When a client connects to it
ssl_client = SslClient(
ssl_version=OpenSslVersionEnum.TLSV1_2,
underlying_socket=sock,
ssl_verify=OpenSslVerifyEnum.NONE,
)

# Then, before the handshake, the client cannot tell if Extended Master Secret was used
exms_support_before_handshake = ssl_client.get_extended_master_secret_support()
assert exms_support_before_handshake == ExtendedMasterSecretSupportEnum.UNKNOWN

Expand All @@ -379,29 +385,83 @@ def test_get_extended_master_secret_not_used(self) -> None:
finally:
ssl_client.shutdown()

# And after the handshake, the client can tell that Extended Master Secret was NOT used
exms_support = ssl_client.get_extended_master_secret_support()
assert exms_support == ExtendedMasterSecretSupportEnum.NOT_USED_IN_CURRENT_SESSION

def test_get_extended_master_secret_used(self) -> None:
# Given a TLS server that DOES support the Extended Master Secret extension
with ModernOpenSslServer() as server:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
sock.connect((server.hostname, server.port))

# When a client connects to it
ssl_client = SslClient(
ssl_version=OpenSslVersionEnum.TLSV1_2,
underlying_socket=sock,
ssl_verify=OpenSslVerifyEnum.NONE,
)

# Then, before the handshake, the client cannot tell if Extended Master Secret was used
exms_support_before_handshake = ssl_client.get_extended_master_secret_support()
assert exms_support_before_handshake == ExtendedMasterSecretSupportEnum.UNKNOWN

try:
ssl_client.do_handshake()
finally:
ssl_client.shutdown()

# And after the handshake, the client can tell that Extended Master Secret was used
exms_support = ssl_client.get_extended_master_secret_support()
assert exms_support == ExtendedMasterSecretSupportEnum.USED_IN_CURRENT_SESSION

def test_set_signature_algorithms(self) -> None:
# Given a TLS server
with ModernOpenSslServer() as server:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
sock.connect((server.hostname, server.port))

# And a client
ssl_client = SslClient(
ssl_version=OpenSslVersionEnum.TLSV1_2,
underlying_socket=sock,
ssl_verify=OpenSslVerifyEnum.NONE,
)
# That's configured to use a specific signature algorithm
ssl_client.set_signature_algorithms([(OpenSslDigestNidEnum.SHA256, OpenSslEvpPkeyEnum.RSA)])

# When the client connects to the server, it succeeds
try:
ssl_client.do_handshake()
finally:
ssl_client.shutdown()

# And the configured signature algorithm was used
assert ssl_client.get_peer_signature_nid() == OpenSslDigestNidEnum.SHA256

def test_set_signature_algorithms_but_not_supported(self) -> None:
# Given a TLS server
with ModernOpenSslServer() as server:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
sock.connect((server.hostname, server.port))

# And a client
ssl_client = SslClient(
ssl_version=OpenSslVersionEnum.TLSV1_3,
underlying_socket=sock,
ssl_verify=OpenSslVerifyEnum.NONE,
)
# That's configured to use signature algorithms that are NOT supported
ssl_client.set_signature_algorithms([(OpenSslDigestNidEnum.SHA512, OpenSslEvpPkeyEnum.EC)])

# Then, when the client connects to the server, the handshake fails
with pytest.raises(OpenSSLError, match="handshake failure"):
ssl_client.do_handshake()
ssl_client.shutdown()


class TestLegacySslClientOnline:
def test_ssl_2(self) -> None:
Expand Down

0 comments on commit 14dfe18

Please sign in to comment.