Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: reorganize key API #192

Merged
merged 4 commits into from
Apr 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 4 additions & 126 deletions sros2/sros2/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,12 @@
# limitations under the License.

from collections import namedtuple
import errno
import os
import sys

from cryptography import x509
from cryptography.hazmat.backends import default_backend as cryptography_backend
from cryptography.hazmat.primitives import serialization
from sros2.policy import load_policy

from rclpy.exceptions import InvalidNamespaceException
from rclpy.validate_namespace import validate_namespace

from sros2.policy import (
get_policy_default,
load_policy,
)

from . import _keystore, _permission, _policy, _utilities
from . import _key, _keystore, _permission, _policy

HIDDEN_NODE_PREFIX = '_'

Expand Down Expand Up @@ -77,99 +66,6 @@ def get_client_info(node, node_name):
return get_topics(node_name, node.get_client_names_and_types_by_node)


def is_key_name_valid(name):
# TODO(ivanpauno): Use validate_security_context_name when it's propagated to `rclpy`.
# This is not to bad for the moment.
# Related with https://github.com/ros2/rclpy/issues/528.
try:
return validate_namespace(name)
except InvalidNamespaceException as e:
print(e)
return False


def create_key(keystore_path, identity):
if not _keystore.is_valid_keystore(keystore_path):
print("'%s' is not a valid keystore " % keystore_path)
return False
if not is_key_name_valid(identity):
return False
print("creating key for identity: '%s'" % identity)

relative_path = os.path.normpath(identity.lstrip('/'))
key_dir = os.path.join(_keystore.get_keystore_context_dir(keystore_path), relative_path)
os.makedirs(key_dir, exist_ok=True)

# symlink the CA cert in there
public_certs = ['identity_ca.cert.pem', 'permissions_ca.cert.pem']
for public_cert in public_certs:
dst = os.path.join(key_dir, public_cert)
keystore_ca_cert_path = os.path.join(
_keystore.get_keystore_public_dir(keystore_path), public_cert)
relativepath = os.path.relpath(keystore_ca_cert_path, key_dir)
_utilities.create_symlink(src=relativepath, dst=dst)

# symlink the governance file in there
keystore_governance_path = os.path.join(
_keystore.get_keystore_context_dir(keystore_path), 'governance.p7s')
dest_governance_path = os.path.join(key_dir, 'governance.p7s')
relativepath = os.path.relpath(keystore_governance_path, key_dir)
_utilities.create_symlink(src=relativepath, dst=dest_governance_path)

keystore_identity_ca_cert_path = os.path.join(
_keystore.get_keystore_public_dir(keystore_path), 'identity_ca.cert.pem')
keystore_identity_ca_key_path = os.path.join(
_keystore.get_keystore_private_dir(keystore_path), 'identity_ca.key.pem')

cert_path = os.path.join(key_dir, 'cert.pem')
key_path = os.path.join(key_dir, 'key.pem')
if not os.path.isfile(cert_path) or not os.path.isfile(key_path):
print('creating cert and key')
_create_key_and_cert(
keystore_identity_ca_cert_path,
keystore_identity_ca_key_path,
identity,
cert_path,
key_path
)
else:
print('found cert and key; not creating new ones!')

# create a wildcard permissions file for this node which can be overridden
# later using a policy if desired
policy_file_path = get_policy_default('policy.xml')
policy_element = _policy.get_policy('/', policy_file_path)
context_element = policy_element.find('contexts/context')
context_element.attrib['path'] = identity

permissions_path = os.path.join(key_dir, 'permissions.xml')
_permission.create_permission_file(permissions_path, _utilities.domain_id(), policy_element)

signed_permissions_path = os.path.join(key_dir, 'permissions.p7s')
keystore_permissions_ca_key_path = os.path.join(
_keystore.get_keystore_private_dir(keystore_path), 'permissions_ca.key.pem')
_utilities.create_smime_signed_file(
keystore_ca_cert_path,
keystore_permissions_ca_key_path,
permissions_path,
signed_permissions_path
)

return True


def list_keys(keystore_path):
contexts_path = _keystore.get_keystore_context_dir(keystore_path)
if not os.path.isdir(keystore_path):
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), keystore_path)
if not os.path.isdir(contexts_path):
return True
for name in os.listdir(contexts_path):
if os.path.isdir(os.path.join(contexts_path, name)):
print(name)
return True


def distribute_key(source_keystore_path, taget_keystore_path):
raise NotImplementedError()

Expand All @@ -193,35 +89,17 @@ def generate_artifacts(keystore_path=None, identity_names=[], policy_files=[]):

# create keys for all provided identities
for identity in identity_names:
if not create_key(keystore_path, identity):
if not _key.create_key(keystore_path, identity):
return False
for policy_file in policy_files:
policy_tree = load_policy(policy_file)
contexts_element = policy_tree.find('contexts')
for context in contexts_element:
identity_name = context.get('path')
if identity_name not in identity_names:
if not create_key(keystore_path, identity_name):
if not _key.create_key(keystore_path, identity_name):
return False
policy_element = _policy.get_policy_from_tree(identity_name, policy_tree)
_permission.create_permissions_from_policy_element(
keystore_path, identity_name, policy_element)
return True


def _create_key_and_cert(
keystore_ca_cert_path, keystore_ca_key_path, identity, cert_path, key_path):
# Load the CA cert and key from disk
with open(keystore_ca_cert_path, 'rb') as f:
ca_cert = x509.load_pem_x509_certificate(f.read(), cryptography_backend())

with open(keystore_ca_key_path, 'rb') as f:
ca_key = serialization.load_pem_private_key(f.read(), None, cryptography_backend())

cert, private_key = _utilities.build_key_and_cert(
x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, identity)]),
issuer_name=ca_cert.subject,
ca_key=ca_key)

_utilities.write_key(private_key, key_path)
_utilities.write_cert(cert, cert_path)
139 changes: 139 additions & 0 deletions sros2/sros2/api/_key.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Copyright 2019-2020 Canonical Ltd
ruffsl marked this conversation as resolved.
Show resolved Hide resolved
# Copyright 2016-2019 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import errno
import os

from cryptography import x509
from cryptography.hazmat.backends import default_backend as cryptography_backend
from cryptography.hazmat.primitives import serialization

from rclpy.exceptions import InvalidNamespaceException
from rclpy.validate_namespace import validate_namespace

from sros2.policy import get_policy_default

from . import _keystore, _permission, _policy, _utilities


def create_key(keystore_path, identity):
if not _keystore.is_valid_keystore(keystore_path):
print("'%s' is not a valid keystore " % keystore_path)
return False
if not _is_key_name_valid(identity):
return False
print("creating key for identity: '%s'" % identity)

relative_path = os.path.normpath(identity.lstrip('/'))
key_dir = os.path.join(_keystore.get_keystore_context_dir(keystore_path), relative_path)
os.makedirs(key_dir, exist_ok=True)

# symlink the CA cert in there
public_certs = ['identity_ca.cert.pem', 'permissions_ca.cert.pem']
for public_cert in public_certs:
dst = os.path.join(key_dir, public_cert)
keystore_ca_cert_path = os.path.join(
_keystore.get_keystore_public_dir(keystore_path), public_cert)
relativepath = os.path.relpath(keystore_ca_cert_path, key_dir)
_utilities.create_symlink(src=relativepath, dst=dst)

# symlink the governance file in there
keystore_governance_path = os.path.join(
_keystore.get_keystore_context_dir(keystore_path), 'governance.p7s')
dest_governance_path = os.path.join(key_dir, 'governance.p7s')
relativepath = os.path.relpath(keystore_governance_path, key_dir)
_utilities.create_symlink(src=relativepath, dst=dest_governance_path)

keystore_identity_ca_cert_path = os.path.join(
_keystore.get_keystore_public_dir(keystore_path), 'identity_ca.cert.pem')
keystore_identity_ca_key_path = os.path.join(
_keystore.get_keystore_private_dir(keystore_path), 'identity_ca.key.pem')

cert_path = os.path.join(key_dir, 'cert.pem')
key_path = os.path.join(key_dir, 'key.pem')
if not os.path.isfile(cert_path) or not os.path.isfile(key_path):
print('creating cert and key')
_create_key_and_cert(
keystore_identity_ca_cert_path,
keystore_identity_ca_key_path,
identity,
cert_path,
key_path
)
else:
print('found cert and key; not creating new ones!')

# create a wildcard permissions file for this node which can be overridden
# later using a policy if desired
policy_file_path = get_policy_default('policy.xml')
policy_element = _policy.get_policy('/', policy_file_path)
context_element = policy_element.find('contexts/context')
context_element.attrib['path'] = identity

permissions_path = os.path.join(key_dir, 'permissions.xml')
_permission.create_permission_file(permissions_path, _utilities.domain_id(), policy_element)

signed_permissions_path = os.path.join(key_dir, 'permissions.p7s')
keystore_permissions_ca_key_path = os.path.join(
_keystore.get_keystore_private_dir(keystore_path), 'permissions_ca.key.pem')
_utilities.create_smime_signed_file(
keystore_ca_cert_path,
keystore_permissions_ca_key_path,
permissions_path,
signed_permissions_path
)

return True


def list_keys(keystore_path):
contexts_path = _keystore.get_keystore_context_dir(keystore_path)
if not os.path.isdir(keystore_path):
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), keystore_path)
if not os.path.isdir(contexts_path):
return True
for name in os.listdir(contexts_path):
if os.path.isdir(os.path.join(contexts_path, name)):
print(name)
return True


def _is_key_name_valid(name):
artivis marked this conversation as resolved.
Show resolved Hide resolved
# TODO(ivanpauno): Use validate_security_context_name when it's propagated to `rclpy`.
# This is not to bad for the moment.
# Related with https://github.com/ros2/rclpy/issues/528.
try:
return validate_namespace(name)
except InvalidNamespaceException as e:
print(e)
return False


def _create_key_and_cert(
keystore_ca_cert_path, keystore_ca_key_path, identity, cert_path, key_path):
# Load the CA cert and key from disk
with open(keystore_ca_cert_path, 'rb') as f:
ca_cert = x509.load_pem_x509_certificate(f.read(), cryptography_backend())

with open(keystore_ca_key_path, 'rb') as f:
ca_key = serialization.load_pem_private_key(f.read(), None, cryptography_backend())

cert, private_key = _utilities.build_key_and_cert(
x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, identity)]),
issuer_name=ca_cert.subject,
ca_key=ca_key)

_utilities.write_key(private_key, key_path)
_utilities.write_cert(cert, cert_path)
4 changes: 2 additions & 2 deletions sros2/sros2/verb/create_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
def DirectoriesCompleter():
return None

from sros2.api import create_key
from sros2.api import _key
from sros2.verb import VerbExtension


Expand All @@ -31,5 +31,5 @@ def add_arguments(self, parser, cli_name):
parser.add_argument('NAME', help='key name, aka ROS security context name')

def main(self, *, args):
success = create_key(args.ROOT, args.NAME)
success = _key.create_key(args.ROOT, args.NAME)
return 0 if success else 1
4 changes: 2 additions & 2 deletions sros2/sros2/verb/list_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def DirectoriesCompleter():

import sys

from sros2.api import list_keys
from sros2.api import _key
from sros2.verb import VerbExtension


Expand All @@ -33,7 +33,7 @@ def add_arguments(self, parser, cli_name):

def main(self, *, args):
try:
if list_keys(args.ROOT):
if _key.list_keys(args.ROOT):
return 0
except FileNotFoundError as e:
print('No such file or directory: {!r}'.format(e.filename), file=sys.stderr)
Expand Down
22 changes: 11 additions & 11 deletions sros2/test/sros2/test_api.py → sros2/test/sros2/api/test_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from sros2.api import is_key_name_valid
from sros2.api import _key


def test_is_key_name_valid():
# Valid cases
assert is_key_name_valid('/foo')
assert is_key_name_valid('/foo/bar')
assert is_key_name_valid('/foo/bar123/_/baz_')
assert _key._is_key_name_valid('/foo')
assert _key._is_key_name_valid('/foo/bar')
assert _key._is_key_name_valid('/foo/bar123/_/baz_')

# Invalid cases
assert not is_key_name_valid('')
assert not is_key_name_valid(' ')
assert not is_key_name_valid('//')
assert not is_key_name_valid('foo')
assert not is_key_name_valid('foo/bar')
assert not is_key_name_valid('/42foo')
assert not is_key_name_valid('/foo/42bar')
assert not _key._is_key_name_valid('')
assert not _key._is_key_name_valid(' ')
assert not _key._is_key_name_valid('//')
assert not _key._is_key_name_valid('foo')
assert not _key._is_key_name_valid('foo/bar')
assert not _key._is_key_name_valid('/42foo')
assert not _key._is_key_name_valid('/foo/42bar')
4 changes: 2 additions & 2 deletions sros2/test/sros2/commands/security/verbs/test_list_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import tempfile

from ros2cli import cli
from sros2.api import _keystore, create_key
from sros2.api import _key, _keystore


def test_list_keys(capsys):
Expand All @@ -26,7 +26,7 @@ def test_list_keys(capsys):
assert _keystore.create_keystore(keystore_dir)

# Now using that keystore, create a keypair
assert create_key(keystore_dir, '/test_context')
assert _key.create_key(keystore_dir, '/test_context')

# Now verify that the key we just created is included in the list
assert cli.main(argv=['security', 'list_keys', keystore_dir]) == 0
Expand Down