From e6aaaa376c41ade96176101d1958839f4b02fc91 Mon Sep 17 00:00:00 2001 From: Kyle Fazzari Date: Tue, 7 Apr 2020 21:06:12 -0700 Subject: [PATCH 1/4] permission api: use _keystore.get_* functions Proof that we're still completely missing test coverage of some verbs. Add a basic `create_permissions` test as well. Signed-off-by: Kyle Fazzari --- sros2/sros2/api/_permission.py | 6 +- sros2/test/conftest.py | 8 ++ sros2/test/sros2/api/__init__.py | 0 .../security/verbs/test_create_permission.py | 86 +++++++++++++++++++ 4 files changed, 97 insertions(+), 3 deletions(-) create mode 100644 sros2/test/sros2/api/__init__.py create mode 100644 sros2/test/sros2/commands/security/verbs/test_create_permission.py diff --git a/sros2/sros2/api/_permission.py b/sros2/sros2/api/_permission.py index 5b590225..7e9007eb 100644 --- a/sros2/sros2/api/_permission.py +++ b/sros2/sros2/api/_permission.py @@ -34,16 +34,16 @@ def create_permission(keystore_path, identity, policy_file_path): def create_permissions_from_policy_element(keystore_path, identity, policy_element): relative_path = os.path.normpath(identity.lstrip('/')) - key_dir = os.path.join(_keystore.keystore_context_dir(keystore_path), relative_path) + key_dir = os.path.join(_keystore.get_keystore_context_dir(keystore_path), relative_path) print("creating permission file for identity: '%s'" % identity) permissions_path = os.path.join(key_dir, 'permissions.xml') create_permission_file(permissions_path, _utilities.domain_id(), policy_element) signed_permissions_path = os.path.join(key_dir, 'permissions.p7s') keystore_ca_cert_path = os.path.join( - _keystore.keystore_public_dir(keystore_path), 'ca.cert.pem') + _keystore.get_keystore_public_dir(keystore_path), 'ca.cert.pem') keystore_ca_key_path = os.path.join( - _keystore.keystore_private_dir(keystore_path), 'ca.key.pem') + _keystore.get_keystore_private_dir(keystore_path), 'ca.key.pem') _utilities.create_smime_signed_file( keystore_ca_cert_path, keystore_ca_key_path, permissions_path, signed_permissions_path) diff --git a/sros2/test/conftest.py b/sros2/test/conftest.py index 11debd4d..6f4ff13b 100644 --- a/sros2/test/conftest.py +++ b/sros2/test/conftest.py @@ -13,7 +13,15 @@ # limitations under the License. import os +import pathlib + +import pytest # Disable lxml2 lookup of resources on the internet by configuring it to use a proxy # that does not exist os.environ['HTTP_PROXY'] = 'http://example.com' + + +@pytest.fixture('session') +def test_policy_dir(): + return pathlib.Path(__file__).parent / 'policies' diff --git a/sros2/test/sros2/api/__init__.py b/sros2/test/sros2/api/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/sros2/test/sros2/commands/security/verbs/test_create_permission.py b/sros2/test/sros2/commands/security/verbs/test_create_permission.py new file mode 100644 index 00000000..8533b487 --- /dev/null +++ b/sros2/test/sros2/commands/security/verbs/test_create_permission.py @@ -0,0 +1,86 @@ +# Copyright 2020 Canonical Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pathlib + +import lxml + +import pytest + +from ros2cli import cli +from sros2.api import _key, _keystore + + +# This fixture will run once for the entire module (as opposed to once per test) +@pytest.fixture(scope='module') +def security_context_dir(tmpdir_factory, test_policy_dir) -> pathlib.Path: + keystore_dir = pathlib.Path(str(tmpdir_factory.mktemp('keystore'))) + + # First, create the keystore as well as a keypair for the talker + assert _keystore.create_keystore(keystore_dir) + assert _key.create_key(keystore_dir, '/talker_listener/talker') + + security_files_dir = keystore_dir / 'contexts' / 'talker_listener' / 'talker' + assert security_files_dir.is_dir() + + # Now using that keystore, create a permissions file using the sample policy + policy_file_path = test_policy_dir / 'sample.policy.xml' + assert cli.main( + argv=[ + 'security', 'create_permission', str(keystore_dir), '/talker_listener/talker', + str(policy_file_path)]) == 0 + + # Return path to directory containing the identity's files + return security_files_dir + + +def test_create_permission(security_context_dir): + assert security_context_dir.joinpath('permissions.xml').is_file() + assert security_context_dir.joinpath('permissions.p7s').is_file() + + # Give the generated permissions XML a smoke test + tree = lxml.etree.parse(str(security_context_dir.joinpath('permissions.xml'))) + + dds = tree.getroot() + assert dds.tag == 'dds' + + permissions = list(dds.iterchildren(tag='permissions')) + assert len(permissions) == 1 + + grants = list(permissions[0].iterchildren(tag='grant')) + assert len(grants) == 1 + assert grants[0].get('name') == '/talker_listener/talker' + + allow_rules = list(grants[0].iterchildren(tag='allow_rule')) + assert len(allow_rules) == 1 + + publish_rules = list(allow_rules[0].iterchildren(tag='publish')) + assert len(publish_rules) == 1 + + subscribe_rules = list(allow_rules[0].iterchildren(tag='subscribe')) + assert len(subscribe_rules) == 1 + + published_topics_set = list(publish_rules[0].iterchildren(tag='topics')) + assert len(published_topics_set) == 1 + published_topics = [c.text for c in published_topics_set[0].iterchildren(tag='topic')] + assert len(published_topics) == 15 + + subscribed_topics_set = list(subscribe_rules[0].iterchildren(tag='topics')) + assert len(subscribed_topics_set) == 1 + subscribed_topics = list(subscribed_topics_set[0].iterchildren(tag='topic')) + assert len(subscribed_topics) == 14 + + # Verify that publication is allowed on chatter, but not subscription + assert 'rt/chatter' in published_topics + assert 'rt/chatter' not in subscribed_topics From 06754f8bd65083b934221dde0fdb5bff680c469d Mon Sep 17 00:00:00 2001 From: Kyle Fazzari Date: Wed, 8 Apr 2020 13:26:46 -0700 Subject: [PATCH 2/4] Support multiple RMWs in test Signed-off-by: Kyle Fazzari --- .../commands/security/verbs/test_create_permission.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sros2/test/sros2/commands/security/verbs/test_create_permission.py b/sros2/test/sros2/commands/security/verbs/test_create_permission.py index 8533b487..597cd2e2 100644 --- a/sros2/test/sros2/commands/security/verbs/test_create_permission.py +++ b/sros2/test/sros2/commands/security/verbs/test_create_permission.py @@ -18,8 +18,9 @@ import pytest +import rclpy from ros2cli import cli -from sros2.api import _key, _keystore +from sros2.api import _key, _keystore, _permission # This fixture will run once for the entire module (as opposed to once per test) @@ -63,7 +64,10 @@ def test_create_permission(security_context_dir): assert grants[0].get('name') == '/talker_listener/talker' allow_rules = list(grants[0].iterchildren(tag='allow_rule')) - assert len(allow_rules) == 1 + if rclpy.get_rmw_implementation_identifier() in _permission._RMW_WITH_ROS_GRAPH_INFO_TOPIC: + assert len(allow_rules) == 2 + else: + assert len(allow_rules) == 1 publish_rules = list(allow_rules[0].iterchildren(tag='publish')) assert len(publish_rules) == 1 @@ -78,7 +82,7 @@ def test_create_permission(security_context_dir): subscribed_topics_set = list(subscribe_rules[0].iterchildren(tag='topics')) assert len(subscribed_topics_set) == 1 - subscribed_topics = list(subscribed_topics_set[0].iterchildren(tag='topic')) + subscribed_topics = [c.text for c in subscribed_topics_set[0].iterchildren(tag='topic')] assert len(subscribed_topics) == 14 # Verify that publication is allowed on chatter, but not subscription From cd0ee0d84b0141dc52b9ea0a3a4a0cc24d89dad4 Mon Sep 17 00:00:00 2001 From: Kyle Fazzari Date: Wed, 8 Apr 2020 13:37:08 -0700 Subject: [PATCH 3/4] Remove superfluous __init__.py Signed-off-by: Kyle Fazzari --- sros2/test/sros2/api/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 sros2/test/sros2/api/__init__.py diff --git a/sros2/test/sros2/api/__init__.py b/sros2/test/sros2/api/__init__.py deleted file mode 100644 index e69de29b..00000000 From e7156ce91139302c698badc3b90d1af5cd399706 Mon Sep 17 00:00:00 2001 From: Kyle Fazzari Date: Wed, 8 Apr 2020 14:13:59 -0700 Subject: [PATCH 4/4] Refactor test and validate permissions Signed-off-by: Kyle Fazzari --- .../security/verbs/test_create_permission.py | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/sros2/test/sros2/commands/security/verbs/test_create_permission.py b/sros2/test/sros2/commands/security/verbs/test_create_permission.py index 597cd2e2..a5ec09bd 100644 --- a/sros2/test/sros2/commands/security/verbs/test_create_permission.py +++ b/sros2/test/sros2/commands/security/verbs/test_create_permission.py @@ -21,6 +21,10 @@ import rclpy from ros2cli import cli from sros2.api import _key, _keystore, _permission +from sros2.policy import get_transport_schema + + +_test_identity = '/talker_listener/talker' # This fixture will run once for the entire module (as opposed to once per test) @@ -30,16 +34,16 @@ def security_context_dir(tmpdir_factory, test_policy_dir) -> pathlib.Path: # First, create the keystore as well as a keypair for the talker assert _keystore.create_keystore(keystore_dir) - assert _key.create_key(keystore_dir, '/talker_listener/talker') + assert _key.create_key(keystore_dir, _test_identity) - security_files_dir = keystore_dir / 'contexts' / 'talker_listener' / 'talker' + security_files_dir = keystore_dir.joinpath(f'contexts{_test_identity}') assert security_files_dir.is_dir() # Now using that keystore, create a permissions file using the sample policy policy_file_path = test_policy_dir / 'sample.policy.xml' assert cli.main( argv=[ - 'security', 'create_permission', str(keystore_dir), '/talker_listener/talker', + 'security', 'create_permission', str(keystore_dir), _test_identity, str(policy_file_path)]) == 0 # Return path to directory containing the identity's files @@ -50,9 +54,13 @@ def test_create_permission(security_context_dir): assert security_context_dir.joinpath('permissions.xml').is_file() assert security_context_dir.joinpath('permissions.p7s').is_file() - # Give the generated permissions XML a smoke test tree = lxml.etree.parse(str(security_context_dir.joinpath('permissions.xml'))) + # Validate the schema + permissions_xsd_path = get_transport_schema('dds', 'permissions.xsd') + permissions_xsd = lxml.etree.XMLSchema(lxml.etree.parse(permissions_xsd_path)) + permissions_xsd.assertValid(tree) + dds = tree.getroot() assert dds.tag == 'dds' @@ -61,7 +69,7 @@ def test_create_permission(security_context_dir): grants = list(permissions[0].iterchildren(tag='grant')) assert len(grants) == 1 - assert grants[0].get('name') == '/talker_listener/talker' + assert grants[0].get('name') == _test_identity allow_rules = list(grants[0].iterchildren(tag='allow_rule')) if rclpy.get_rmw_implementation_identifier() in _permission._RMW_WITH_ROS_GRAPH_INFO_TOPIC: @@ -78,12 +86,12 @@ def test_create_permission(security_context_dir): published_topics_set = list(publish_rules[0].iterchildren(tag='topics')) assert len(published_topics_set) == 1 published_topics = [c.text for c in published_topics_set[0].iterchildren(tag='topic')] - assert len(published_topics) == 15 + assert len(published_topics) > 0 subscribed_topics_set = list(subscribe_rules[0].iterchildren(tag='topics')) assert len(subscribed_topics_set) == 1 subscribed_topics = [c.text for c in subscribed_topics_set[0].iterchildren(tag='topic')] - assert len(subscribed_topics) == 14 + assert len(subscribed_topics) > 0 # Verify that publication is allowed on chatter, but not subscription assert 'rt/chatter' in published_topics