diff --git a/controller_manager/controller_manager/__init__.py b/controller_manager/controller_manager/__init__.py index f49bed4d34..4a8d7daee5 100644 --- a/controller_manager/controller_manager/__init__.py +++ b/controller_manager/controller_manager/__init__.py @@ -23,6 +23,10 @@ set_hardware_component_state, switch_controllers, unload_controller, + get_parameter_from_param_file, + set_controller_parameters, + set_controller_parameters_from_param_file, + bcolors, ) __all__ = [ @@ -36,4 +40,8 @@ "set_hardware_component_state", "switch_controllers", "unload_controller", + "get_parameter_from_param_file", + "set_controller_parameters", + "set_controller_parameters_from_param_file", + "bcolors", ] diff --git a/controller_manager/controller_manager/controller_manager_services.py b/controller_manager/controller_manager/controller_manager_services.py index 00a6f37145..25cd932dab 100644 --- a/controller_manager/controller_manager/controller_manager_services.py +++ b/controller_manager/controller_manager/controller_manager_services.py @@ -26,6 +26,29 @@ ) import rclpy +import yaml +from rcl_interfaces.msg import Parameter + +# @note: The versions conditioning is added here to support the source-compatibility with Humble +# The `get_parameter_value` function is moved to `rclpy.parameter` module from `ros2param.api` module from version 3.6.0 +try: + from rclpy.parameter import get_parameter_value +except ImportError: + from ros2param.api import get_parameter_value +from ros2param.api import call_set_parameters + + +# from https://stackoverflow.com/a/287944 +class bcolors: + MAGENTA = "\033[95m" + OKBLUE = "\033[94m" + OKCYAN = "\033[96m" + OKGREEN = "\033[92m" + WARNING = "\033[93m" + FAIL = "\033[91m" + ENDC = "\033[0m" + BOLD = "\033[1m" + UNDERLINE = "\033[4m" class ServiceNotFoundError(Exception): @@ -219,3 +242,84 @@ def unload_controller(node, controller_manager_name, controller_name, service_ti request, service_timeout, ) + + +def get_parameter_from_param_file(controller_name, namespace, parameter_file, parameter_name): + with open(parameter_file) as f: + namespaced_controller = ( + controller_name if namespace == "/" else f"{namespace}/{controller_name}" + ) + parameters = yaml.safe_load(f) + if namespaced_controller in parameters: + value = parameters[namespaced_controller] + if not isinstance(value, dict) or "ros__parameters" not in value: + raise RuntimeError( + f"YAML file : {parameter_file} is not a valid ROS parameter file for controller : {namespaced_controller}" + ) + if parameter_name in parameters[namespaced_controller]["ros__parameters"]: + return parameters[namespaced_controller]["ros__parameters"][parameter_name] + else: + return None + else: + return None + + +def set_controller_parameters( + node, controller_manager_name, controller_name, parameter_name, parameter_value +): + parameter = Parameter() + parameter.name = controller_name + "." + parameter_name + parameter_string = str(parameter_value) + parameter.value = get_parameter_value(string_value=parameter_string) + + response = call_set_parameters( + node=node, node_name=controller_manager_name, parameters=[parameter] + ) + assert len(response.results) == 1 + result = response.results[0] + if result.successful: + node.get_logger().info( + bcolors.OKCYAN + + 'Setting controller param "' + + parameter_name + + '" to "' + + parameter_string + + '" for ' + + bcolors.BOLD + + controller_name + + bcolors.ENDC + ) + else: + node.get_logger().fatal( + bcolors.FAIL + + 'Could not set controller param "' + + parameter_name + + '" to "' + + parameter_string + + '" for ' + + bcolors.BOLD + + controller_name + + bcolors.ENDC + ) + return False + return True + + +def set_controller_parameters_from_param_file( + node, controller_manager_name, controller_name, parameter_file, namespace=None +): + if parameter_file: + spawner_namespace = namespace if namespace else node.get_namespace() + set_controller_parameters( + node, controller_manager_name, controller_name, "param_file", parameter_file + ) + + controller_type = get_parameter_from_param_file( + controller_name, spawner_namespace, parameter_file, "type" + ) + if controller_type: + if not set_controller_parameters( + node, controller_manager_name, controller_name, "type", controller_type + ): + return False + return True diff --git a/controller_manager/controller_manager/hardware_spawner.py b/controller_manager/controller_manager/hardware_spawner.py index 3e3a487c6a..29c0b5e97c 100644 --- a/controller_manager/controller_manager/hardware_spawner.py +++ b/controller_manager/controller_manager/hardware_spawner.py @@ -19,6 +19,7 @@ from controller_manager import ( list_hardware_components, set_hardware_component_state, + bcolors, ) from controller_manager.controller_manager_services import ServiceNotFoundError @@ -28,19 +29,6 @@ from rclpy.signals import SignalHandlerOptions -# from https://stackoverflow.com/a/287944 -class bcolors: - HEADER = "\033[95m" - OKBLUE = "\033[94m" - OKCYAN = "\033[96m" - OKGREEN = "\033[92m" - WARNING = "\033[93m" - FAIL = "\033[91m" - ENDC = "\033[0m" - BOLD = "\033[1m" - UNDERLINE = "\033[4m" - - def first_match(iterable, predicate): return next((n for n in iterable if predicate(n)), None) diff --git a/controller_manager/controller_manager/spawner.py b/controller_manager/controller_manager/spawner.py index 4b452ef1e0..f20c0a3066 100644 --- a/controller_manager/controller_manager/spawner.py +++ b/controller_manager/controller_manager/spawner.py @@ -19,7 +19,6 @@ import sys import time import warnings -import yaml from controller_manager import ( configure_controller, @@ -27,29 +26,15 @@ load_controller, switch_controllers, unload_controller, + set_controller_parameters, + set_controller_parameters_from_param_file, + bcolors, ) from controller_manager.controller_manager_services import ServiceNotFoundError import rclpy -from rcl_interfaces.msg import Parameter from rclpy.node import Node from rclpy.signals import SignalHandlerOptions -from ros2param.api import call_set_parameters -from ros2param.api import get_parameter_value - -# from https://stackoverflow.com/a/287944 - - -class bcolors: - MAGENTA = "\033[95m" - OKBLUE = "\033[94m" - OKCYAN = "\033[96m" - OKGREEN = "\033[92m" - WARNING = "\033[93m" - FAIL = "\033[91m" - ENDC = "\033[0m" - BOLD = "\033[1m" - UNDERLINE = "\033[4m" def first_match(iterable, predicate): @@ -81,24 +66,6 @@ def is_controller_loaded(node, controller_manager, controller_name, service_time return any(c.name == controller_name for c in controllers) -def get_parameter_from_param_file(controller_name, namespace, parameter_file, parameter_name): - with open(parameter_file) as f: - namespaced_controller = ( - controller_name if namespace == "/" else f"{namespace}/{controller_name}" - ) - parameters = yaml.safe_load(f) - if namespaced_controller in parameters: - value = parameters[namespaced_controller] - if not isinstance(value, dict) or "ros__parameters" not in value: - raise RuntimeError( - f"YAML file : {parameter_file} is not a valid ROS parameter file for controller : {namespaced_controller}" - ) - if parameter_name in parameters[namespaced_controller]["ros__parameters"]: - return parameters[namespaced_controller]["ros__parameters"][parameter_name] - else: - return None - - def main(args=None): rclpy.init(args=args, signal_handler_options=SignalHandlerOptions.NO) @@ -206,75 +173,23 @@ def main(args=None): + bcolors.ENDC ) else: - controller_type = ( - args.controller_type - if param_file is None - else get_parameter_from_param_file( - controller_name, spawner_namespace, param_file, "type" - ) - ) - if controller_type: - parameter = Parameter() - parameter.name = controller_name + ".type" - parameter.value = get_parameter_value(string_value=controller_type) - - response = call_set_parameters( - node=node, node_name=controller_manager_name, parameters=[parameter] - ) - assert len(response.results) == 1 - result = response.results[0] - if result.successful: - node.get_logger().info( - bcolors.OKCYAN - + 'Set controller type to "' - + controller_type - + '" for ' - + bcolors.BOLD - + controller_name - + bcolors.ENDC - ) - else: - node.get_logger().fatal( - bcolors.FAIL - + 'Could not set controller type to "' - + controller_type - + '" for ' - + bcolors.BOLD - + controller_name - + bcolors.ENDC - ) + if args.controller_type: + if not set_controller_parameters( + node, + controller_manager_name, + controller_name, + "type", + args.controller_type, + ): return 1 - if param_file: - parameter = Parameter() - parameter.name = controller_name + ".params_file" - parameter.value = get_parameter_value(string_value=param_file) - - response = call_set_parameters( - node=node, node_name=controller_manager_name, parameters=[parameter] - ) - assert len(response.results) == 1 - result = response.results[0] - if result.successful: - node.get_logger().info( - bcolors.OKCYAN - + 'Set controller params file to "' - + param_file - + '" for ' - + bcolors.BOLD - + controller_name - + bcolors.ENDC - ) - else: - node.get_logger().fatal( - bcolors.FAIL - + 'Could not set controller params file to "' - + param_file - + '" for ' - + bcolors.BOLD - + controller_name - + bcolors.ENDC - ) + if not set_controller_parameters_from_param_file( + node, + controller_manager_name, + controller_name, + param_file, + spawner_namespace, + ): return 1 ret = load_controller(node, controller_manager_name, controller_name) diff --git a/ros2controlcli/ros2controlcli/verb/list_controllers.py b/ros2controlcli/ros2controlcli/verb/list_controllers.py index 8367900cd5..b4ebff94cd 100644 --- a/ros2controlcli/ros2controlcli/verb/list_controllers.py +++ b/ros2controlcli/ros2controlcli/verb/list_controllers.py @@ -12,8 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from controller_manager import list_controllers -from controller_manager.spawner import bcolors +from controller_manager import list_controllers, bcolors from ros2cli.node.direct import add_arguments from ros2cli.node.strategy import NodeStrategy diff --git a/ros2controlcli/ros2controlcli/verb/list_hardware_components.py b/ros2controlcli/ros2controlcli/verb/list_hardware_components.py index 6c93b65cc4..d614cbc7ea 100644 --- a/ros2controlcli/ros2controlcli/verb/list_hardware_components.py +++ b/ros2controlcli/ros2controlcli/verb/list_hardware_components.py @@ -12,8 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from controller_manager import list_hardware_components -from controller_manager.spawner import bcolors +from controller_manager import list_hardware_components, bcolors from lifecycle_msgs.msg import State diff --git a/ros2controlcli/ros2controlcli/verb/list_hardware_interfaces.py b/ros2controlcli/ros2controlcli/verb/list_hardware_interfaces.py index 7aa850f3bc..4510998ad9 100644 --- a/ros2controlcli/ros2controlcli/verb/list_hardware_interfaces.py +++ b/ros2controlcli/ros2controlcli/verb/list_hardware_interfaces.py @@ -12,8 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from controller_manager import list_hardware_interfaces -from controller_manager.spawner import bcolors +from controller_manager import list_hardware_interfaces, bcolors from ros2cli.node.direct import add_arguments from ros2cli.node.strategy import NodeStrategy