Skip to content

Commit

Permalink
samples: Add Python code snippets for importing-a-key doc (#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
lwolfowitz-google authored and rsamborski committed Nov 14, 2022
1 parent 9d82d9a commit d8e0fdf
Show file tree
Hide file tree
Showing 6 changed files with 318 additions and 1 deletion.
41 changes: 41 additions & 0 deletions kms/snippets/check_state_import_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and


# [START kms_check_state_import_job]
def check_state_import_job(project_id, location_id, key_ring_id, import_job_id):
"""
Check the state of an import job in Cloud KMS.
Args:
project_id (string): Google Cloud project ID (e.g. 'my-project').
location_id (string): Cloud KMS location (e.g. 'us-east1').
key_ring_id (string): ID of the Cloud KMS key ring (e.g. 'my-key-ring').
import_job_id (string): ID of the import job (e.g. 'my-import-job').
"""

# Import the client library.
from google.cloud import kms

# Create the client.
client = kms.KeyManagementServiceClient()

# Retrieve the fully-qualified import_job string.
import_job_name = client.import_job_path(
project_id, location_id, key_ring_id, import_job_id)

# Retrieve the state from an existing import job.
import_job = client.get_import_job(name=import_job_name)

print('Current state of import job {}: {}'.format(import_job.name, import_job.state))
# [END kms_check_state_import_job]
41 changes: 41 additions & 0 deletions kms/snippets/check_state_imported_key.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and


# [START kms_check_state_imported_key]
def check_state_imported_key(project_id, location_id, key_ring_id, import_job_id):
"""
Check the state of an import job in Cloud KMS.
Args:
project_id (string): Google Cloud project ID (e.g. 'my-project').
location_id (string): Cloud KMS location (e.g. 'us-east1').
key_ring_id (string): ID of the Cloud KMS key ring (e.g. 'my-key-ring').
import_job_id (string): ID of the import job (e.g. 'my-import-job').
"""

# Import the client library.
from google.cloud import kms

# Create the client.
client = kms.KeyManagementServiceClient()

# Retrieve the fully-qualified import_job string.
import_job_name = client.import_job_path(
project_id, location_id, key_ring_id, import_job_id)

# Retrieve the state from an existing import job.
import_job = client.get_import_job(name=import_job_name)

print('Current state of import job {}: {}'.format(import_job.name, import_job.state))
# [END kms_check_state_imported_key]
44 changes: 44 additions & 0 deletions kms/snippets/create_import_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and


# [START kms_create_import_job]
def create_import_job(project_id, location_id, key_ring_id, import_job_id):
"""
Create a new import job in Cloud KMS.
Args:
project_id (string): Google Cloud project ID (e.g. 'my-project').
location_id (string): Cloud KMS location (e.g. 'us-east1').
key_ring_id (string): ID of the Cloud KMS key ring (e.g. 'my-key-ring').
import_job_id (string): ID of the import job (e.g. 'my-import-job').
"""

# Import the client library.
from google.cloud import kms

# Create the client.
client = kms.KeyManagementServiceClient()

# Retrieve the fully-qualified key_ring string.
key_ring_name = client.key_ring_path(project_id, location_id, key_ring_id)

# Set paramaters for the import job, allowed values for ImportMethod and ProtectionLevel found here:
# https://googleapis.dev/python/cloudkms/latest/_modules/google/cloud/kms_v1/types/resources.html
import_job_params = {"import_method": kms.ImportJob.ImportMethod.RSA_OAEP_3072_SHA1_AES_256, "protection_level": kms.ProtectionLevel.HSM}

# Call the client to create a new import job.
import_job = client.create_import_job({"parent": key_ring_name, "import_job_id": import_job_id, "import_job": import_job_params})

print('Created import job: {}'.format(import_job.name))
# [END kms_create_import_job]
67 changes: 67 additions & 0 deletions kms/snippets/create_key_for_import.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and


# [START kms_create_key_for_import]
def create_key_for_import(project_id, location_id, key_ring_id, crypto_key_id):
"""
Generate Cloud KMS-compatible key material locally and sets up an empty CryptoKey within a KeyRing for import.
Args:
project_id (string): Google Cloud project ID (e.g. 'my-project').
location_id (string): Cloud KMS location (e.g. 'us-east1').
key_ring_id (string): ID of the Cloud KMS key ring (e.g. 'my-key-ring').
crypto_key_id (string): ID of the key to import (e.g. 'my-asymmetric-signing-key').
"""

# Import Python standard cryptographic libraries.
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec

# Import the client library.
from google.cloud import kms

# Generate some key material in Python and format it in PKCS #8 DER as
# required by Google Cloud KMS.
key = ec.generate_private_key(ec.SECP256R1, default_backend())
formatted_key = key.private_bytes(
serialization.Encoding.DER,
serialization.PrivateFormat.PKCS8,
serialization.NoEncryption())

print('Generated key bytes: {}'.format(formatted_key))

# Create the client.
client = kms.KeyManagementServiceClient()

# Build the key. For more information regarding allowed values of these fields, see:
# https://googleapis.dev/python/cloudkms/latest/_modules/google/cloud/kms_v1/types/resources.html
purpose = kms.CryptoKey.CryptoKeyPurpose.ASYMMETRIC_SIGN
algorithm = kms.CryptoKeyVersion.CryptoKeyVersionAlgorithm.EC_SIGN_P256_SHA256
protection_level = kms.ProtectionLevel.HSM
key = {
'purpose': purpose,
'version_template': {
'algorithm': algorithm,
'protection_level': protection_level
}
}

# Build the parent key ring name.
key_ring_name = client.key_ring_path(project_id, location_id, key_ring_id)

# Call the API.
created_key = client.create_crypto_key(request={'parent': key_ring_name, 'crypto_key_id': crypto_key_id, 'crypto_key': key})
print('Created hsm key: {}'.format(created_key.name))
# [END kms_create_key_for_import]
74 changes: 74 additions & 0 deletions kms/snippets/import_manually_wrapped_key.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and


# [START kms_import_manually_wrapped_key]
def import_manually_wrapped_key(project_id, location_id, key_ring_id, crypto_key_id, import_job_id, key_material):
"""
Imports local key material to Cloud KMS.
Args:
project_id (string): Google Cloud project ID (e.g. 'my-project').
location_id (string): Cloud KMS location (e.g. 'us-east1').
key_ring_id (string): ID of the Cloud KMS key ring (e.g. 'my-key-ring').
crypto_key_id (string): ID of the key to import (e.g. 'my-asymmetric-signing-key').
import_job_id (string): ID of the import job (e.g. 'my-import-job').
key_material (bytes): Locally generated key material in PKCS #8 DER format.
Returns:
CryptoKeyVersion: An instance of the imported key in Cloud KMS.
"""

# Import the client library and Python standard cryptographic libraries.
import os
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, keywrap, serialization
from cryptography.hazmat.primitives.asymmetric import padding
from google.cloud import kms

# Create the client.
client = kms.KeyManagementServiceClient()

# Retrieve the fully-qualified crypto_key and import_job string.
crypto_key_name = client.crypto_key_path(
project_id, location_id, key_ring_id, crypto_key_id)
import_job_name = client.import_job_path(
project_id, location_id, key_ring_id, import_job_id)

# Generate a temporary 32-byte key for AES-KWP and wrap the key material.
kwp_key = os.urandom(32)
wrapped_target_key = keywrap.aes_key_wrap_with_padding(
kwp_key, key_material, default_backend())

# Retrieve the public key from the import job.
import_job = client.get_import_job(name=import_job_name)
import_job_pub = serialization.load_pem_public_key(
bytes(import_job.public_key.pem, 'UTF-8'), default_backend())

# Wrap the KWP key using the import job key.
wrapped_kwp_key = import_job_pub.encrypt(
kwp_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA1(),
label=None))

# Import the wrapped key material.
client.import_crypto_key_version({
"parent": crypto_key_name,
"import_job": import_job_name,
"algorithm": kms.CryptoKeyVersion.CryptoKeyVersionAlgorithm.EC_SIGN_P256_SHA256,
"rsa_aes_wrapped_key": wrapped_kwp_key + wrapped_target_key,
})

print('Imported: {}'.format(import_job.name))
# [END kms_import_manually_wrapped_key]
52 changes: 51 additions & 1 deletion kms/snippets/snippets_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, utils
from cryptography.hazmat.primitives.asymmetric import ec, padding, utils
from google.cloud import kms
import pytest

from check_state_import_job import check_state_import_job
from check_state_imported_key import check_state_imported_key
from create_import_job import create_import_job
from create_key_asymmetric_decrypt import create_key_asymmetric_decrypt
from create_key_asymmetric_sign import create_key_asymmetric_sign
from create_key_for_import import create_key_for_import
from create_key_hsm import create_key_hsm
from create_key_labels import create_key_labels
from create_key_ring import create_key_ring
Expand All @@ -45,6 +49,7 @@
from iam_add_member import iam_add_member
from iam_get_policy import iam_get_policy
from iam_remove_member import iam_remove_member
from import_manually_wrapped_key import import_manually_wrapped_key
from quickstart import quickstart
from restore_key_version import restore_key_version
from sign_asymmetric import sign_asymmetric
Expand Down Expand Up @@ -72,6 +77,16 @@ def location_id():
return "us-east1"


@pytest.fixture(scope="module")
def import_job_id():
return "my-import-job"


@pytest.fixture(scope="module")
def import_tests_key_id():
return "my-import-job-ec-key"


@pytest.fixture(scope="module")
def key_ring_id(client, project_id, location_id):
location_name = f"projects/{project_id}/locations/{location_id}"
Expand Down Expand Up @@ -176,6 +191,24 @@ def wait_for_ready(client, key_version_name):
pytest.fail('{} not ready'.format(key_version_name))


def test_create_import_job(project_id, location_id, key_ring_id, import_job_id, capsys):
create_import_job(project_id, location_id, key_ring_id, import_job_id)
out, _ = capsys.readouterr()
assert "Created import job" in out


def test_check_state_import_job(project_id, location_id, key_ring_id, import_job_id, capsys):
check_state_import_job(project_id, location_id, key_ring_id, import_job_id)
out, _ = capsys.readouterr()
assert "Current state" in out


def test_check_state_imported_key(project_id, location_id, key_ring_id, import_job_id, capsys):
check_state_imported_key(project_id, location_id, key_ring_id, import_job_id)
out, _ = capsys.readouterr()
assert "Current state" in out


def test_create_key_asymmetric_decrypt(project_id, location_id, key_ring_id):
key_id = '{}'.format(uuid.uuid4())
key = create_key_asymmetric_decrypt(project_id, location_id, key_ring_id, key_id)
Expand All @@ -190,6 +223,12 @@ def test_create_key_asymmetric_sign(project_id, location_id, key_ring_id):
assert key.version_template.algorithm == kms.CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PKCS1_2048_SHA256


def test_create_key_for_import(project_id, location_id, key_ring_id, import_tests_key_id, capsys):
create_key_for_import(project_id, location_id, key_ring_id, import_tests_key_id)
out, _ = capsys.readouterr()
assert "Generated key" in out


def test_create_key_hsm(project_id, location_id, key_ring_id):
key_id = '{}'.format(uuid.uuid4())
key = create_key_hsm(project_id, location_id, key_ring_id, key_id)
Expand Down Expand Up @@ -347,6 +386,17 @@ def test_iam_remove_member(client, project_id, location_id, key_ring_id, asymmet
assert any('group:tester@google.com' in b.members for b in policy.bindings)


def test_import_manually_wrapped_key(project_id, location_id, key_ring_id, import_job_id, import_tests_key_id, capsys):
key = ec.generate_private_key(ec.SECP256R1, default_backend())
formatted_key = key.private_bytes(
serialization.Encoding.DER,
serialization.PrivateFormat.PKCS8,
serialization.NoEncryption())
import_manually_wrapped_key(project_id, location_id, key_ring_id, import_tests_key_id, import_job_id, formatted_key)
out, _ = capsys.readouterr()
assert "Imported" in out


def test_sign_asymmetric(client, project_id, location_id, key_ring_id, asymmetric_sign_rsa_key_id):
message = 'my message'

Expand Down

0 comments on commit d8e0fdf

Please sign in to comment.