Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delay after node creation to ensure discovery for other rmw implementations #168

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sros2/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
<test_depend>ament_mypy</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<test_depend>ros_testing</test_depend>
<test_depend>std_msgs</test_depend>
<test_depend>std_srvs</test_depend>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2019 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import rclpy
from rclpy.node import Node

from std_srvs.srv import Trigger


class ClientServiceNode(Node):

def __init__(self):
super().__init__('test_generate_policy_services_node')
self.client = self.create_client(Trigger, 'test_generate_policy_services_client')
self.service = self.create_service(
Trigger, 'test_generate_policy_services_server', lambda request, response: response)

def destroy_node(self):
self.client.destroy()
self.service.destroy()
super().destroy_node()


def main(args=None):
rclpy.init(args=args)

node = ClientServiceNode()

try:
rclpy.spin(node)
except KeyboardInterrupt:
print('node stopped cleanly')
finally:
node.destroy_node()
rclpy.shutdown()


if __name__ == '__main__':
main()
50 changes: 50 additions & 0 deletions sros2/test/sros2/commands/security/verbs/fixtures/pub_sub_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2019 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import rclpy
from rclpy.node import Node

from std_msgs.msg import String


class PubSubNode(Node):

def __init__(self):
super().__init__('test_generate_policy_topics_node')
self.publisher = self.create_publisher(String, 'test_generate_policy_topics_pub', 1)
self.subscription = self.create_subscription(
String, 'test_generate_policy_topics_sub', lambda msg: None, 1)

def destroy_node(self):
self.publisher.destroy()
self.subscription.destroy()
super().destroy_node()


def main(args=None):
rclpy.init(args=args)

node = PubSubNode()

try:
rclpy.spin(node)
except KeyboardInterrupt:
print('node stopped cleanly')
finally:
node.destroy_node()
rclpy.shutdown()


if __name__ == '__main__':
main()
272 changes: 165 additions & 107 deletions sros2/test/sros2/commands/security/verbs/test_generate_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,116 +12,174 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import contextlib
import os
import sys
import tempfile
import unittest

from launch import LaunchDescription
from launch.actions import ExecuteProcess
from launch_ros.actions import Node

import launch_testing
from launch_testing.actions import ReadyToTest
import launch_testing.asserts
import launch_testing.markers
import launch_testing.tools
import launch_testing_ros.tools

import pytest

import rclpy
from ros2cli import cli
from rmw_implementation import get_available_rmw_implementations
from sros2.policy import load_policy
from std_msgs.msg import String
from std_srvs.srv import Trigger


def test_generate_policy_topics():
with tempfile.TemporaryDirectory() as tmpdir:
# Create a test-specific context so that generate_policy can still init
context = rclpy.Context()
rclpy.init(context=context)
node = rclpy.create_node('test_generate_policy_topics_node', context=context)

try:
# Create a publisher and subscription
node.create_publisher(String, 'test_generate_policy_topics_pub', 1)
node.create_subscription(
String, 'test_generate_policy_topics_sub', lambda msg: None, 1)

# Generate the policy for the running node
assert cli.main(
argv=['security', 'generate_policy', os.path.join(tmpdir, 'test-policy.xml')]) == 0
finally:
node.destroy_node()
rclpy.shutdown(context=context)

# Load the policy and pull out the allowed publications and subscriptions
policy = load_policy(os.path.join(tmpdir, 'test-policy.xml'))
profile = policy.find(
path='profiles/profile[@ns="/"][@node="test_generate_policy_topics_node"]')
assert profile is not None
topics_publish_allowed = profile.find(path='topics[@publish="ALLOW"]')
assert topics_publish_allowed is not None
topics_subscribe_allowed = profile.find(path='topics[@subscribe="ALLOW"]')
assert topics_subscribe_allowed is not None

# Verify that the allowed publications include topic_pub and not topic_sub
topics = topics_publish_allowed.findall('topic')
assert len([t for t in topics if t.text == 'test_generate_policy_topics_pub']) == 1
assert len([t for t in topics if t.text == 'test_generate_policy_topics_sub']) == 0

# Verify that the allowed subscriptions include topic_sub and not topic_pub
topics = topics_subscribe_allowed.findall('topic')
assert len([t for t in topics if t.text == 'test_generate_policy_topics_sub']) == 1
assert len([t for t in topics if t.text == 'test_generate_policy_topics_pub']) == 0


def test_generate_policy_services():
with tempfile.TemporaryDirectory() as tmpdir:
# Create a test-specific context so that generate_policy can still init
context = rclpy.Context()
rclpy.init(context=context)
node = rclpy.create_node('test_generate_policy_services_node', context=context)

try:
# Create a server and client
node.create_client(Trigger, 'test_generate_policy_services_client')
node.create_service(Trigger, 'test_generate_policy_services_server', lambda request,
response: response)

# Generate the policy for the running node
assert cli.main(
argv=['security', 'generate_policy', os.path.join(tmpdir, 'test-policy.xml')]) == 0
finally:
node.destroy_node()
rclpy.shutdown(context=context)

# Load the policy and pull out allowed replies and requests
policy = load_policy(os.path.join(tmpdir, 'test-policy.xml'))
profile = policy.find(
path='profiles/profile[@ns="/"][@node="test_generate_policy_services_node"]')
assert profile is not None
service_reply_allowed = profile.find(path='services[@reply="ALLOW"]')
assert service_reply_allowed is not None
service_request_allowed = profile.find(path='services[@request="ALLOW"]')
assert service_request_allowed is not None

# Verify that the allowed replies include service_server and not service_client
services = service_reply_allowed.findall('service')
assert len([s for s in services if s.text == 'test_generate_policy_services_server']) == 1
assert len([s for s in services if s.text == 'test_generate_policy_services_client']) == 0

# Verify that the allowed requests include service_client and not service_server
services = service_request_allowed.findall('service')
assert len([s for s in services if s.text == 'test_generate_policy_services_client']) == 1
assert len([s for s in services if s.text == 'test_generate_policy_services_server']) == 0


# TODO(jacobperron): On Windows, this test is flakey due to nodes left-over from tests in
# other packages.
# See: https://github.com/ros2/sros2/issues/143
@pytest.mark.skipif(
'nt' == os.name, reason='flakey due to nodes left-over from tests in other packages')
def test_generate_policy_no_nodes(capsys):
with tempfile.TemporaryDirectory() as tmpdir:
assert cli.main(argv=[
'security', 'generate_policy', os.path.join(tmpdir, 'test-policy.xml')]) != 0
stderr = capsys.readouterr().err.strip()
assert stderr == 'No nodes detected in the ROS graph. No policy file was generated.'


def test_generate_policy_no_policy_file(capsys):
with pytest.raises(SystemExit) as e:
cli.main(argv=['security', 'generate_policy'])
assert e.value.code != 0
stderr = capsys.readouterr().err.strip()
assert 'following arguments are required: POLICY_FILE_PATH' in stderr


@pytest.mark.rostest
@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations())
def generate_test_description(rmw_implementation):
additional_env = {'RMW_IMPLEMENTATION': rmw_implementation}
path_to_pub_sub_node_script = os.path.join(
os.path.dirname(__file__), 'fixtures', 'pub_sub_node.py'
)
path_to_client_srv_node_script = os.path.join(
os.path.dirname(__file__), 'fixtures', 'client_service_node.py'
)
pub_sub_node = Node(
node_executable=sys.executable,
arguments=[path_to_pub_sub_node_script],
node_name='test_generate_policy_topics_node',
additional_env=additional_env
)
client_srv_node = Node(
node_executable=sys.executable,
arguments=[path_to_client_srv_node_script],
node_name='test_generate_policy_services_node',
additional_env=additional_env
)
return LaunchDescription([
# Always restart daemon to isolate tests.
ExecuteProcess(
cmd=['ros2', 'daemon', 'stop'],
name='daemon-stop',
on_exit=[
ExecuteProcess(
cmd=['ros2', 'daemon', 'start'],
name='daemon-start',
on_exit=[
pub_sub_node, client_srv_node, ReadyToTest()
],
additional_env=additional_env
)
]
),
])


class TestSROS2GeneratePolicyVerb(unittest.TestCase):

@classmethod
def setUpClass(
cls,
launch_service,
proc_info,
proc_output,
rmw_implementation
):
@contextlib.contextmanager
def launch_gen_policy_command(self, arguments):
gen_policy_command_action = ExecuteProcess(
cmd=['ros2', 'security', 'generate_policy', *arguments],
additional_env={
'RMW_IMPLEMENTATION': rmw_implementation,
'PYTHONUNBUFFERED': '1'
},
name='ros2security-gen-policy-cli',
output='screen'
)
with launch_testing.tools.launch_process(
launch_service, gen_policy_command_action, proc_info, proc_output,
output_filter=launch_testing_ros.tools.basic_output_filter(
# ignore launch_ros and ros2cli daemon nodes
filtered_patterns=['.*launch_ros*', '.*ros2cli.*'],
filtered_rmw_implementation=rmw_implementation
)
) as gen_policy_command:
yield gen_policy_command
cls.launch_gen_policy_command = launch_gen_policy_command

@launch_testing.markers.retry_on_failure(times=3)
def test_generate_policy_topics(self):
with tempfile.TemporaryDirectory() as tmpdir:
with self.launch_gen_policy_command(
arguments=[os.path.join(tmpdir, 'test-policy.xml')]
) as gen_command:
assert gen_command.wait_for_shutdown(timeout=10)
assert gen_command.exit_code == launch_testing.asserts.EXIT_OK

# Load the policy and pull out the allowed publications and subscriptions
policy = load_policy(os.path.join(tmpdir, 'test-policy.xml'))
profile = policy.find(
path='profiles/profile[@ns="/"][@node="test_generate_policy_topics_node"]')
assert profile is not None
topics_publish_allowed = profile.find(path='topics[@publish="ALLOW"]')
assert topics_publish_allowed is not None
topics_subscribe_allowed = profile.find(path='topics[@subscribe="ALLOW"]')
assert topics_subscribe_allowed is not None

# Verify that the allowed publications include topic_pub and not topic_sub
topics = topics_publish_allowed.findall('topic')
assert len([t for t in topics if t.text == 'test_generate_policy_topics_pub']) == 1
assert len([t for t in topics if t.text == 'test_generate_policy_topics_sub']) == 0

# Verify that the allowed subscriptions include topic_sub and not topic_pub
topics = topics_subscribe_allowed.findall('topic')
assert len([t for t in topics if t.text == 'test_generate_policy_topics_sub']) == 1
assert len([t for t in topics if t.text == 'test_generate_policy_topics_pub']) == 0

@launch_testing.markers.retry_on_failure(times=3)
def test_generate_policy_services(self):
with tempfile.TemporaryDirectory() as tmpdir:
with self.launch_gen_policy_command(
arguments=[os.path.join(tmpdir, 'test-policy.xml')]
) as gen_command:
assert gen_command.wait_for_shutdown(timeout=10)
assert gen_command.exit_code == launch_testing.asserts.EXIT_OK

# Load the policy and pull out allowed replies and requests
policy = load_policy(os.path.join(tmpdir, 'test-policy.xml'))
profile = policy.find(
path='profiles/profile[@ns="/"][@node="test_generate_policy_services_node"]')
assert profile is not None
service_reply_allowed = profile.find(path='services[@reply="ALLOW"]')
assert service_reply_allowed is not None
service_request_allowed = profile.find(path='services[@request="ALLOW"]')
assert service_request_allowed is not None

# Verify that the allowed replies include service_server and not service_client
services = service_reply_allowed.findall('service')
assert len(
[s for s in services if s.text == 'test_generate_policy_services_server']) == 1
assert len(
[s for s in services if s.text == 'test_generate_policy_services_client']) == 0

# Verify that the allowed requests include service_client and not service_server
services = service_request_allowed.findall('service')
assert len(
[s for s in services if s.text == 'test_generate_policy_services_client']) == 1
assert len(
[s for s in services if s.text == 'test_generate_policy_services_server']) == 0

@launch_testing.markers.retry_on_failure(times=2)
def test_generate_policy_no_policy_file(self):
with self.launch_gen_policy_command(arguments=[]) as gen_command:
assert gen_command.wait_for_shutdown(timeout=10)
assert gen_command.exit_code != launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[
'ros2 security generate_policy: error: the following'
' arguments are required: POLICY_FILE_PATH'
],
text=gen_command.output,
strict=False
)
Loading