diff --git a/nassl/ephemeral_key_info.py b/nassl/ephemeral_key_info.py index d33d1be..6452cd4 100644 --- a/nassl/ephemeral_key_info.py +++ b/nassl/ephemeral_key_info.py @@ -87,6 +87,9 @@ def get_supported_by_ssl_client(cls) -> List["OpenSslEcNidEnum"]: OpenSslEvpPkeyEnum.EC: "ECDH", OpenSslEvpPkeyEnum.X25519: "ECDH", OpenSslEvpPkeyEnum.X448: "ECDH", + OpenSslEvpPkeyEnum.RSA: "RSA", + OpenSslEvpPkeyEnum.DSA: "DSA", + OpenSslEvpPkeyEnum.RSA_PSS: "RSA-PSS", } diff --git a/nassl/ssl_client.py b/nassl/ssl_client.py index 106d263..acd5466 100755 --- a/nassl/ssl_client.py +++ b/nassl/ssl_client.py @@ -466,9 +466,11 @@ def set_ciphersuites(self, cipher_suites: str) -> None: # TODO(AD): Eventually merge this method with get/set_cipher_list() self._ssl.set_ciphersuites(cipher_suites) - def set_sigalgs(self, sigalgs: List[Tuple[OpenSslDigestNidEnum, OpenSslEvpPkeyEnum]]) -> None: - """Set the enabled signature algorithms for the key exchange.""" - flattened_sigalgs = [item for sublist in sigalgs for item in sublist] + def set_signature_algorithms(self, algorithms: List[Tuple[OpenSslDigestNidEnum, OpenSslEvpPkeyEnum]]) -> None: + """Set the enabled signature algorithms for the key exchange. + + The algorithms parameter is a list of a public key algorithm and a digest.""" + flattened_sigalgs = [item for sublist in algorithms for item in sublist] self._ssl.set1_sigalgs(flattened_sigalgs) def get_peer_signature_nid(self) -> OpenSslDigestNidEnum: diff --git a/tests/ssl_client_test.py b/tests/ssl_client_test.py index 77a274a..60fc9c0 100644 --- a/tests/ssl_client_test.py +++ b/tests/ssl_client_test.py @@ -12,7 +12,8 @@ OpenSslVerifyEnum, SslClient, OpenSSLError, - OpenSslEarlyDataStatusEnum, OpenSslDigestNidEnum, + OpenSslEarlyDataStatusEnum, + OpenSslDigestNidEnum, ) from nassl.ephemeral_key_info import ( OpenSslEvpPkeyEnum, @@ -219,7 +220,6 @@ def test_get_verified_chain(self) -> None: # And when requesting the verified certificate chain, it returns it assert ssl_client.get_verified_chain() - assert ssl_client.get_peer_signature_nid() == OpenSslDigestNidEnum.SHA256 finally: ssl_client.shutdown() @@ -363,16 +363,20 @@ def test_set_groups_curve_x448(self) -> None: assert len(dh_info.public_bytes) == 56 def test_get_extended_master_secret_not_used(self) -> None: + # Given a TLS server that does NOT support the Extended Master Secret extension with LegacyOpenSslServer() as server: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) sock.connect((server.hostname, server.port)) + # When a client connects to it ssl_client = SslClient( ssl_version=OpenSslVersionEnum.TLSV1_2, underlying_socket=sock, ssl_verify=OpenSslVerifyEnum.NONE, ) + + # Then, before the handshake, the client cannot tell if Extended Master Secret was used exms_support_before_handshake = ssl_client.get_extended_master_secret_support() assert exms_support_before_handshake == ExtendedMasterSecretSupportEnum.UNKNOWN @@ -381,29 +385,83 @@ def test_get_extended_master_secret_not_used(self) -> None: finally: ssl_client.shutdown() + # And after the handshake, the client can tell that Extended Master Secret was NOT used exms_support = ssl_client.get_extended_master_secret_support() assert exms_support == ExtendedMasterSecretSupportEnum.NOT_USED_IN_CURRENT_SESSION def test_get_extended_master_secret_used(self) -> None: + # Given a TLS server that DOES support the Extended Master Secret extension with ModernOpenSslServer() as server: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) sock.connect((server.hostname, server.port)) + # When a client connects to it ssl_client = SslClient( ssl_version=OpenSslVersionEnum.TLSV1_2, underlying_socket=sock, ssl_verify=OpenSslVerifyEnum.NONE, ) + # Then, before the handshake, the client cannot tell if Extended Master Secret was used + exms_support_before_handshake = ssl_client.get_extended_master_secret_support() + assert exms_support_before_handshake == ExtendedMasterSecretSupportEnum.UNKNOWN + try: ssl_client.do_handshake() finally: ssl_client.shutdown() + # And after the handshake, the client can tell that Extended Master Secret was used exms_support = ssl_client.get_extended_master_secret_support() assert exms_support == ExtendedMasterSecretSupportEnum.USED_IN_CURRENT_SESSION + def test_set_signature_algorithms(self) -> None: + # Given a TLS server + with ModernOpenSslServer() as server: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(5) + sock.connect((server.hostname, server.port)) + + # And a client + ssl_client = SslClient( + ssl_version=OpenSslVersionEnum.TLSV1_2, + underlying_socket=sock, + ssl_verify=OpenSslVerifyEnum.NONE, + ) + # That's configured to use a specific signature algorithm + ssl_client.set_signature_algorithms([(OpenSslDigestNidEnum.SHA256, OpenSslEvpPkeyEnum.RSA)]) + + # When the client connects to the server, it succeeds + try: + ssl_client.do_handshake() + finally: + ssl_client.shutdown() + + # And the configured signature algorithm was used + assert ssl_client.get_peer_signature_nid() == OpenSslDigestNidEnum.SHA256 + + def test_set_signature_algorithms_but_not_supported(self) -> None: + # Given a TLS server + with ModernOpenSslServer() as server: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(5) + sock.connect((server.hostname, server.port)) + + # And a client + ssl_client = SslClient( + ssl_version=OpenSslVersionEnum.TLSV1_3, + underlying_socket=sock, + ssl_verify=OpenSslVerifyEnum.NONE, + ) + # That's configured to use signature algorithms that are NOT supported + ssl_client.set_signature_algorithms([(OpenSslDigestNidEnum.SHA512, OpenSslEvpPkeyEnum.EC)]) + + # Then, when the client connects to the server, the handshake fails + with pytest.raises(OpenSSLError, match="handshake failure"): + ssl_client.do_handshake() + ssl_client.shutdown() + class TestLegacySslClientOnline: def test_ssl_2(self) -> None: @@ -469,26 +527,6 @@ def test_set_ciphersuites(self) -> None: # And client's cipher suite was used assert "TLS_CHACHA20_POLY1305_SHA256" == ssl_client.get_current_cipher_name() - def test_set_sigalgs(self): - with ModernOpenSslServer() as server: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(5) - sock.connect((server.hostname, server.port)) - - ssl_client = SslClient( - ssl_version=OpenSslVersionEnum.TLSV1_3, - underlying_socket=sock, - ssl_verify=OpenSslVerifyEnum.NONE, - ) - # These signature algorithms are unsupported - ssl_client.set_sigalgs([ - (OpenSslDigestNidEnum.SHA512, OpenSslEvpPkeyEnum.EC) - ]) - - with pytest.raises(OpenSSLError): - ssl_client.do_handshake() - ssl_client.shutdown() - @staticmethod def _create_tls_1_3_session(server_host: str, server_port: int) -> _nassl.SSL_SESSION: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)