Skip to content

Commit

Permalink
Merge pull request #588 from malancas/add-azure-support
Browse files Browse the repository at this point in the history
Add Azure Signer support
  • Loading branch information
jku authored Jun 1, 2023
2 parents 48b59cb + 36b65a4 commit a4a86db
Show file tree
Hide file tree
Showing 4 changed files with 345 additions and 0 deletions.
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ ignore_missing_imports = True

[mypy-pyspx.*]
ignore_missing_imports = True

[mypy-azure.*]
ignore_missing_imports = True
2 changes: 2 additions & 0 deletions securesystemslib/signer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
This module provides extensible interfaces for public keys and signers:
Some implementations are provided by default but more can be added by users.
"""
from securesystemslib.signer._azure_signer import AzureSigner
from securesystemslib.signer._gcp_signer import GCPSigner
from securesystemslib.signer._gpg_signer import GPGKey, GPGSigner
from securesystemslib.signer._hsm_signer import HSMSigner
Expand All @@ -30,6 +31,7 @@
GCPSigner.SCHEME: GCPSigner,
HSMSigner.SCHEME: HSMSigner,
GPGSigner.SCHEME: GPGSigner,
AzureSigner.SCHEME: AzureSigner,
}
)

Expand Down
274 changes: 274 additions & 0 deletions securesystemslib/signer/_azure_signer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
"""Signer implementation for Azure Key Vault"""

import logging
from typing import Optional, Tuple
from urllib import parse

import securesystemslib.hash as sslib_hash
from securesystemslib.exceptions import UnsupportedLibraryError
from securesystemslib.signer._key import Key
from securesystemslib.signer._signer import (
SecretsHandler,
Signature,
Signer,
SSlibKey,
)

AZURE_IMPORT_ERROR = None
try:
from azure.core.exceptions import HttpResponseError
from azure.identity import DefaultAzureCredential
from azure.keyvault.keys import KeyClient, KeyCurveName, KeyVaultKey
from azure.keyvault.keys.crypto import (
CryptographyClient,
SignatureAlgorithm,
)
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric.utils import (
encode_dss_signature,
)
from cryptography.hazmat.primitives.serialization import (
Encoding,
PublicFormat,
)
except ImportError:
AZURE_IMPORT_ERROR = "Signing with Azure Key Vault requires azure-identity, azure-keyvault-keys and cryptography."

logger = logging.getLogger(__name__)


class UnsupportedKeyType(Exception):
pass


class AzureSigner(Signer):
"""Azure Key Vault Signer
This Signer uses Azure Key Vault to sign.
Currently this signer only supports signing with EC keys.
RSA support will be added in a separate pull request.
The specific permissions that AzureSigner needs are:
* "Key Vault Crypto User" for import() and sign()
See https://learn.microsoft.com/en-us/azure/key-vault/general/rbac-guide?tabs=azure-cli
for a list of all built-in Azure Key Vault roles
Arguments:
az_key_uri: Fully qualified Azure Key Vault name, like
https://<vault-name>.vault.azure.net/keys/<key-name>/<version>
public_key: public key object
Raises:
Various errors from azure.identity
Various errors from azure.keyvault.keys
"""

SCHEME = "azurekms"

def __init__(self, az_key_uri: str, public_key: Key):
if AZURE_IMPORT_ERROR:
raise UnsupportedLibraryError(AZURE_IMPORT_ERROR)

try:
cred = DefaultAzureCredential()
self.crypto_client = CryptographyClient(
az_key_uri,
credential=cred,
)
self.signature_algorithm = self._get_signature_algorithm(
public_key,
)
self.hash_algorithm = self._get_hash_algorithm(public_key)
except UnsupportedKeyType as e:
logger.info(
"Key %s has unsupported key type or unsupported elliptic curve"
)
raise e
self.public_key = public_key

@staticmethod
def _get_key_vault_key(
cred: "DefaultAzureCredential",
vault_name: str,
key_name: str,
) -> "KeyVaultKey":
"""Return KeyVaultKey created from the Vault name and key name"""
vault_url = f"https://{vault_name}.vault.azure.net/"

try:
key_client = KeyClient(vault_url=vault_url, credential=cred)
return key_client.get_key(key_name)
except (HttpResponseError,) as e:
logger.info(
"Key %s/%s failed to create key client from credentials, key ID, and Vault URL: %s",
vault_name,
key_name,
str(e),
)
raise e

@staticmethod
def _create_crypto_client(
cred: "DefaultAzureCredential",
kv_key: "KeyVaultKey",
) -> "CryptographyClient":
"""Return CryptographyClient created Azure credentials and a KeyVaultKey"""
try:
return CryptographyClient(kv_key, credential=cred)
except (HttpResponseError,) as e:
logger.info(
"Key %s failed to create crypto client from credentials and KeyVaultKey: %s",
kv_key,
str(e),
)
raise e

@staticmethod
def _get_signature_algorithm(public_key: Key) -> "SignatureAlgorithm":
"""Return SignatureAlgorithm after parsing the public key"""
if public_key.keytype != "ecdsa":
logger.info("only EC keys are supported for now")
raise UnsupportedKeyType("Supplied key must be an EC key")
# Format is "ecdsa-sha2-nistp256"
comps = public_key.scheme.split("-")
if len(comps) != 3:
raise UnsupportedKeyType("Invalid scheme found")

if comps[2] == "nistp256":
return SignatureAlgorithm.es256
if comps[2] == "nistp384":
return SignatureAlgorithm.es384
if comps[2] == "nistp521":
return SignatureAlgorithm.es512

raise UnsupportedKeyType("Unsupported curve supplied by key")

@staticmethod
def _get_hash_algorithm(public_key: "Key") -> str:
"""Return the hash algorithm used by the public key"""
# Format is "ecdsa-sha2-nistp256"
comps = public_key.scheme.split("-")
if len(comps) != 3:
raise UnsupportedKeyType("Invalid scheme found")

if comps[2] == "nistp256":
return "sha256"
if comps[2] == "nistp384":
return "sha384"
if comps[2] == "nistp521":
return "sha512"

raise UnsupportedKeyType("Unsupported curve supplied by key")

@staticmethod
def _get_keytype_and_scheme(crv: str) -> Tuple[str, str]:
if crv == KeyCurveName.p_256:
return "ecdsa", "ecdsa-sha2-nistp256"
if crv == KeyCurveName.p_384:
return "ecdsa", "ecdsa-sha2-nistp384"
if crv == KeyCurveName.p_521:
return "ecdsa", "ecdsa-sha2-nistp521"

raise UnsupportedKeyType("Unsupported curve supplied by key")

@classmethod
def from_priv_key_uri(
cls,
priv_key_uri: str,
public_key: Key,
secrets_handler: Optional[SecretsHandler] = None,
) -> "AzureSigner":
uri = parse.urlparse(priv_key_uri)

if uri.scheme != cls.SCHEME:
raise ValueError(f"AzureSigner does not support {priv_key_uri}")

az_key_uri = priv_key_uri.replace("azurekms:", "https:")
return cls(az_key_uri, public_key)

@classmethod
# pylint: disable=too-many-locals
def import_(cls, az_vault_name: str, az_key_name: str) -> Tuple[str, Key]:
"""Load key and signer details from KMS
Returns the private key uri and the public key. This method should only
be called once per key: the uri and Key should be stored for later use.
"""
if AZURE_IMPORT_ERROR:
raise UnsupportedLibraryError(AZURE_IMPORT_ERROR)

credential = DefaultAzureCredential()
key_vault_key = cls._get_key_vault_key(
credential, az_vault_name, az_key_name
)

if not key_vault_key.key.kty.startswith("EC"):
raise UnsupportedKeyType(
f"Unsupported key type {key_vault_key.key.kty}"
)

if key_vault_key.key.crv == KeyCurveName.p_256:
crv: ec.EllipticCurve = ec.SECP256R1()
elif key_vault_key.key.crv == KeyCurveName.p_384:
crv = ec.SECP384R1()
elif key_vault_key.key.crv == KeyCurveName.p_521:
crv = ec.SECP521R1()
else:
raise UnsupportedKeyType(
f"Unsupported curve type {key_vault_key.key.crv}"
)

# Key is in JWK format, create a curve from it with the parameters
x = int.from_bytes(key_vault_key.key.x, byteorder="big")
y = int.from_bytes(key_vault_key.key.y, byteorder="big")

cpub = ec.EllipticCurvePublicNumbers(x, y, crv)
pub_key = cpub.public_key()
pem = pub_key.public_bytes(
Encoding.PEM, PublicFormat.SubjectPublicKeyInfo
)

keytype, scheme = cls._get_keytype_and_scheme(key_vault_key.key.crv)
keyval = {"public": pem.decode("utf-8")}
keyid = cls._get_keyid(keytype, scheme, keyval)
public_key = SSlibKey(keyid, keytype, scheme, keyval)
priv_key_uri = key_vault_key.key.kid.replace("https:", "azurekms:")

return priv_key_uri, public_key

def sign(self, payload: bytes) -> Signature:
"""Signs payload with Azure Key Vault.
Arguments:
payload: bytes to be signed.
Raises:
Various errors from azure.keyvault.keys.
Returns:
Signature.
"""

hasher = sslib_hash.digest(self.hash_algorithm)
hasher.update(payload)
digest = hasher.digest()
response = self.crypto_client.sign(self.signature_algorithm, digest)

# This code is copied from:
# https://github.com/secure-systems-lab/securesystemslib/blob/135567fa04f10d0c6a4cd32eb45ce736e1f50a93/securesystemslib/signer/_hsm_signer.py#L379
#
# The PKCS11 signature octets correspond to the concatenation of the
# ECDSA values r and s, both represented as an octet string of equal
# length of at most nLen with the most significant byte first (i.e.
# big endian)
# https://docs.oasis-open.org/pkcs11/pkcs11-curr/v3.0/cs01/pkcs11-curr-v3.0-cs01.html#_Toc30061178
r_s_len = int(len(response.signature) / 2)
r = int.from_bytes(response.signature[:r_s_len], byteorder="big")
s = int.from_bytes(response.signature[r_s_len:], byteorder="big")

# Create an ASN.1 encoded Dss-Sig-Value to be used with
# pyca/cryptography
dss_sig_value = encode_dss_signature(r, s).hex()

return Signature(self.public_key.keyid, dss_sig_value)
66 changes: 66 additions & 0 deletions tests/check_azure_signer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""
This module confirms that signing using Azure KMS keys works.
The purpose is to do a smoke test, not to exhaustively test every possible
key and environment combination.
For Azure, the requirements to successfully test are:
* Azure authentication details have to be available in the environment
* The key defined in the test has to be available to the authenticated user
NOTE: the filename is purposefully check_ rather than test_ so that tests are
only run when explicitly invoked.
"""

import unittest

from securesystemslib.exceptions import UnverifiedSignatureError
from securesystemslib.signer import AzureSigner, Key, Signer


class TestAzureKeys(unittest.TestCase):
"""Test that KMS keys can be used to sign."""

azure_pubkey = Key.from_dict(
"8b4af6aec66518bc66718474aa15c8becd3286e8e2b958c497a60a828d591d04",
{
"keytype": "ecdsa",
"scheme": "ecdsa-sha2-nistp256",
"keyval": {
"public": "-----BEGIN PUBLIC KEY-----\nMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE95qxD+/kX6oCace7hrfChtz2IYGK\nHNBmUwtf3wXH0VEdLPWVoFgGITonvA7vxqYrF8ZzAeeZYNyEBbod7SEeaw==\n-----END PUBLIC KEY-----\n"
},
},
)
azure_id = "azurekms://fsn-vault-1.vault.azure.net/keys/ec-key-1/b1089bbf068742d483970282f02090de"

def test_azure_sign(self):
"""Test that Azure KMS key works for signing
Note that this test requires valid credentials available.
"""

data = "data".encode("utf-8")

signer = Signer.from_priv_key_uri(self.azure_id, self.azure_pubkey)
sig = signer.sign(data)

print(sig.signature)

self.azure_pubkey.verify_signature(sig, data)
with self.assertRaises(UnverifiedSignatureError):
self.azure_pubkey.verify_signature(sig, b"NOT DATA")

def test_azure_import(self):
"""Test that Azure KMS key works for signing
Note that this test requires valid credentials available.
"""

uri, pubkey = AzureSigner.import_("fsn-vault-1", "ec-key-1")

self.assertEqual(pubkey, self.azure_pubkey)
self.assertEqual(uri, self.azure_id)


if __name__ == "__main__":
unittest.main(verbosity=1, buffer=True)

0 comments on commit a4a86db

Please sign in to comment.