diff --git a/nav2_bringup/bringup/launch/multi_tb3_simulation_launch.py b/nav2_bringup/bringup/launch/multi_tb3_simulation_launch.py index dd13fae8bb..4c6a30aac0 100644 --- a/nav2_bringup/bringup/launch/multi_tb3_simulation_launch.py +++ b/nav2_bringup/bringup/launch/multi_tb3_simulation_launch.py @@ -123,7 +123,7 @@ def generate_launch_description(): # Define commands for launching the navigation instances nav_instances_cmds = [] for robot in robots: - params_file = LaunchConfiguration(robot['name'] + '_params_file') + params_file = LaunchConfiguration(f"{robot['name']}_params_file") group = GroupAction([ IncludeLaunchDescription( diff --git a/nav2_bringup/nav2_gazebo_spawner/nav2_gazebo_spawner/nav2_gazebo_spawner.py b/nav2_bringup/nav2_gazebo_spawner/nav2_gazebo_spawner/nav2_gazebo_spawner.py index 9f4daaf196..8ca57f41de 100644 --- a/nav2_bringup/nav2_gazebo_spawner/nav2_gazebo_spawner/nav2_gazebo_spawner.py +++ b/nav2_bringup/nav2_gazebo_spawner/nav2_gazebo_spawner/nav2_gazebo_spawner.py @@ -67,7 +67,7 @@ def main(): if args.turtlebot_type is not None: sdf_file_path = os.path.join( get_package_share_directory('turtlebot3_gazebo'), 'models', - 'turtlebot3_{}'.format(args.turtlebot_type), 'model.sdf') + f'turtlebot3_{args.turtlebot_type}', 'model.sdf') else: sdf_file_path = args.sdf @@ -87,7 +87,7 @@ def main(): if args.robot_namespace: ros_params = plugin.find('ros') ros_tf_remap = ET.SubElement(ros_params, 'remapping') - ros_tf_remap.text = '/tf:=/' + args.robot_namespace + '/tf' + ros_tf_remap.text = f'/tf:=/{args.robot_namespace}/tf' # Set data for request request = SpawnEntity.Request() @@ -102,10 +102,10 @@ def main(): future = client.call_async(request) rclpy.spin_until_future_complete(node, future, timeout_sec=args.timeout) if future.result() is not None: - print('response: %r' % future.result()) + print(f'response: {future.result()!r}') else: raise RuntimeError( - 'exception while calling service: %r' % future.exception()) + f'exception while calling service: {future.exception()!r}') node.get_logger().info('Done! Shutting down node.') node.destroy_node() diff --git a/nav2_map_server/test/test_launch_files/map_saver_node.launch.py b/nav2_map_server/test/test_launch_files/map_saver_node.launch.py index 9c061439c7..7287f73594 100644 --- a/nav2_map_server/test/test_launch_files/map_saver_node.launch.py +++ b/nav2_map_server/test/test_launch_files/map_saver_node.launch.py @@ -22,7 +22,7 @@ def generate_launch_description(): - map_publisher = os.path.dirname(os.getenv('TEST_EXECUTABLE')) + '/test_map_saver_publisher' + map_publisher = f"{os.path.dirname(os.getenv('TEST_EXECUTABLE'))}/test_map_saver_publisher" ld = LaunchDescription() diff --git a/nav2_simple_commander/nav2_simple_commander/demo_picking.py b/nav2_simple_commander/nav2_simple_commander/demo_picking.py index 8dbf0f7b8e..8cfcb2aec3 100644 --- a/nav2_simple_commander/nav2_simple_commander/demo_picking.py +++ b/nav2_simple_commander/nav2_simple_commander/demo_picking.py @@ -75,7 +75,7 @@ def main(): shelf_item_pose.pose.position.y = shelf_positions[request_item_location][1] shelf_item_pose.pose.orientation.z = 1.0 shelf_item_pose.pose.orientation.w = 0.0 - print('Received request for item picking at ' + request_item_location + '.') + print(f'Received request for item picking at {request_item_location}.') navigator.goToPose(shelf_item_pose) # Do something during our route @@ -105,12 +105,12 @@ def main(): navigator.goToPose(shipping_destination) elif result == NavigationResult.CANCELED: - print('Task at ' + request_item_location + ' was canceled. Returning to staging point...') + print(f'Task at {request_item_location} was canceled. Returning to staging point...') initial_pose.header.stamp = navigator.get_clock().now().to_msg() navigator.goToPose(initial_pose) elif result == NavigationResult.FAILED: - print('Task at ' + request_item_location + ' failed!') + print(f'Task at {request_item_location} failed!') exit(-1) while not navigator.isNavComplete(): diff --git a/nav2_simple_commander/nav2_simple_commander/robot_navigator.py b/nav2_simple_commander/nav2_simple_commander/robot_navigator.py index 20428fdf1b..80c21cfb58 100644 --- a/nav2_simple_commander/nav2_simple_commander/robot_navigator.py +++ b/nav2_simple_commander/nav2_simple_commander/robot_navigator.py @@ -96,14 +96,14 @@ def goThroughPoses(self, poses): goal_msg = NavigateThroughPoses.Goal() goal_msg.poses = poses - self.info('Navigating with ' + str(len(goal_msg.poses)) + ' goals.' + '...') + self.info(f'Navigating with {len(goal_msg.poses)} goals....') send_goal_future = self.nav_through_poses_client.send_goal_async(goal_msg, self._feedbackCallback) rclpy.spin_until_future_complete(self, send_goal_future) self.goal_handle = send_goal_future.result() if not self.goal_handle.accepted: - self.error('Goal with ' + str(len(poses)) + ' poses was rejected!') + self.error(f'Goal with {len(poses)} poses was rejected!') return False self.result_future = self.goal_handle.get_result_async() @@ -142,14 +142,14 @@ def followWaypoints(self, poses): goal_msg = FollowWaypoints.Goal() goal_msg.poses = poses - self.info('Following ' + str(len(goal_msg.poses)) + ' goals.' + '...') + self.info(f'Following {len(goal_msg.poses)} goals....') send_goal_future = self.follow_waypoints_client.send_goal_async(goal_msg, self._feedbackCallback) rclpy.spin_until_future_complete(self, send_goal_future) self.goal_handle = send_goal_future.result() if not self.goal_handle.accepted: - self.error('Following ' + str(len(poses)) + ' waypoints request was rejected!') + self.error(f'Following {len(poses)} waypoints request was rejected!') return False self.result_future = self.goal_handle.get_result_async() @@ -172,7 +172,7 @@ def isNavComplete(self): if self.result_future.result(): self.status = self.result_future.result().status if self.status != GoalStatus.STATUS_SUCCEEDED: - self.debug('Goal with failed with status code: {0}'.format(self.status)) + self.debug(f'Goal with failed with status code: {self.status}') return True else: # Timed out, still processing, not complete yet @@ -227,7 +227,7 @@ def getPath(self, start, goal): rclpy.spin_until_future_complete(self, self.result_future) self.status = self.result_future.result().status if self.status != GoalStatus.STATUS_SUCCEEDED: - self.warn('Getting path failed with status code: {0}'.format(self.status)) + self.warn(f'Getting path failed with status code: {self.status}') return None return self.result_future.result().result.path @@ -255,7 +255,7 @@ def getPathThroughPoses(self, start, goals): rclpy.spin_until_future_complete(self, self.result_future) self.status = self.result_future.result().status if self.status != GoalStatus.STATUS_SUCCEEDED: - self.warn('Getting path failed with status code: {0}'.format(self.status)) + self.warn(f'Getting path failed with status code: {self.status}') return None return self.result_future.result().result.path @@ -322,10 +322,10 @@ def lifecycleStartup(self): self.info('Starting up lifecycle nodes based on lifecycle_manager.') for srv_name, srv_type in self.get_service_names_and_types(): if srv_type[0] == 'nav2_msgs/srv/ManageLifecycleNodes': - self.info('Starting up ' + srv_name) + self.info(f'Starting up {srv_name}') mgr_client = self.create_client(ManageLifecycleNodes, srv_name) while not mgr_client.wait_for_service(timeout_sec=1.0): - self.info(srv_name + ' service not available, waiting...') + self.info(f'{srv_name} service not available, waiting...') req = ManageLifecycleNodes.Request() req.command = ManageLifecycleNodes.Request().STARTUP future = mgr_client.call_async(req) @@ -346,10 +346,10 @@ def lifecycleShutdown(self): self.info('Shutting down lifecycle nodes based on lifecycle_manager.') for srv_name, srv_type in self.get_service_names_and_types(): if srv_type[0] == 'nav2_msgs/srv/ManageLifecycleNodes': - self.info('Shutting down ' + srv_name) + self.info(f'Shutting down {srv_name}') mgr_client = self.create_client(ManageLifecycleNodes, srv_name) while not mgr_client.wait_for_service(timeout_sec=1.0): - self.info(srv_name + ' service not available, waiting...') + self.info(f'{srv_name} service not available, waiting...') req = ManageLifecycleNodes.Request() req.command = ManageLifecycleNodes.Request().SHUTDOWN future = mgr_client.call_async(req) @@ -359,21 +359,21 @@ def lifecycleShutdown(self): def _waitForNodeToActivate(self, node_name): # Waits for the node within the tester namespace to become active - self.debug('Waiting for ' + node_name + ' to become active..') - node_service = node_name + '/get_state' + self.debug(f'Waiting for {node_name} to become active..') + node_service = f'{node_name}/get_state' state_client = self.create_client(GetState, node_service) while not state_client.wait_for_service(timeout_sec=1.0): - self.info(node_service + ' service not available, waiting...') + self.info(f'{node_service} service not available, waiting...') req = GetState.Request() state = 'unknown' while state != 'active': - self.debug('Getting ' + node_name + ' state...') + self.debug(f'Getting {node_name} state...') future = state_client.call_async(req) rclpy.spin_until_future_complete(self, future) if future.result() is not None: state = future.result().current_state.label - self.debug('Result of get_state: %s' % state) + self.debug(f'Result of get_state: {state}') time.sleep(2) return diff --git a/nav2_system_tests/src/costmap_filters/tester_node.py b/nav2_system_tests/src/costmap_filters/tester_node.py index ca0b585b66..0a61116254 100755 --- a/nav2_system_tests/src/costmap_filters/tester_node.py +++ b/nav2_system_tests/src/costmap_filters/tester_node.py @@ -210,7 +210,7 @@ def runNavigateAction(self, goal_pose: Optional[Pose] = None): rclpy.spin_until_future_complete(self, get_result_future) status = get_result_future.result().status if status != GoalStatus.STATUS_SUCCEEDED: - self.info_msg('Goal failed with status code: {0}'.format(status)) + self.info_msg(f'Goal failed with status code: {status}') return False self.info_msg('Goal succeeded!') @@ -230,8 +230,7 @@ def checkKeepout(self, x, y): self.warn_msg('Filter mask was not received') elif self.isInKeepout(x, y): self.filter_test_result = False - self.error_msg('Pose (' + str(x) + ', ' + - str(y) + ') belongs to keepout zone') + self.error_msg(f'Pose ({x}, {y}) belongs to keepout zone') return False return True @@ -286,7 +285,7 @@ def dwbCostCloudCallback(self, msg): self.cost_cloud_received = True def speedLimitCallback(self, msg): - self.info_msg('Received speed limit: ' + str(msg.speed_limit)) + self.info_msg(f'Received speed limit: {msg.speed_limit}') self.checkSpeed(self.speed_it, msg.speed_limit) self.speed_it += 1 @@ -340,25 +339,25 @@ def distanceFromGoal(self): d_x = self.current_pose.position.x - self.goal_pose.position.x d_y = self.current_pose.position.y - self.goal_pose.position.y distance = math.sqrt(d_x * d_x + d_y * d_y) - self.info_msg('Distance from goal is: ' + str(distance)) + self.info_msg(f'Distance from goal is: {distance}') return distance def wait_for_node_active(self, node_name: str): # Waits for the node within the tester namespace to become active - self.info_msg('Waiting for ' + node_name + ' to become active') - node_service = node_name + '/get_state' + self.info_msg(f'Waiting for {node_name} to become active') + node_service = f'{node_name}/get_state' state_client = self.create_client(GetState, node_service) while not state_client.wait_for_service(timeout_sec=1.0): - self.info_msg(node_service + ' service not available, waiting...') + self.info_msg(f'{node_service} service not available, waiting...') req = GetState.Request() # empty request state = 'UNKNOWN' while (state != 'active'): - self.info_msg('Getting ' + node_name + ' state...') + self.info_msg(f'Getting {node_name} state...') future = state_client.call_async(req) rclpy.spin_until_future_complete(self, future) if future.result() is not None: state = future.result().current_state.label - self.info_msg('Result of get_state: %s' % state) + self.info_msg(f'Result of get_state: {state}') else: self.error_msg('Exception while calling service: %r' % future.exception()) @@ -372,8 +371,7 @@ def shutdown(self): mgr_client = self.create_client( ManageLifecycleNodes, transition_service) while not mgr_client.wait_for_service(timeout_sec=1.0): - self.info_msg(transition_service + - ' service not available, waiting...') + self.info_msg(f'{transition_service} service not available, waiting...') req = ManageLifecycleNodes.Request() req.command = ManageLifecycleNodes.Request().SHUTDOWN @@ -385,13 +383,12 @@ def shutdown(self): self.info_msg( 'Shutting down navigation lifecycle manager complete.') except Exception as e: # noqa: B902 - self.error_msg('Service call failed %r' % (e,)) + self.error_msg(f'Service call failed {e!r}') transition_service = 'lifecycle_manager_localization/manage_nodes' mgr_client = self.create_client( ManageLifecycleNodes, transition_service) while not mgr_client.wait_for_service(timeout_sec=1.0): - self.info_msg(transition_service + - ' service not available, waiting...') + self.info_msg(f'{transition_service} service not available, waiting...') req = ManageLifecycleNodes.Request() req.command = ManageLifecycleNodes.Request().SHUTDOWN @@ -403,7 +400,7 @@ def shutdown(self): self.info_msg( 'Shutting down localization lifecycle manager complete') except Exception as e: # noqa: B902 - self.error_msg('Service call failed %r' % (e,)) + self.error_msg(f'Service call failed {e!r}') def wait_for_initial_pose(self): self.initial_pose_received = False diff --git a/nav2_system_tests/src/system/nav_through_poses_tester_node.py b/nav2_system_tests/src/system/nav_through_poses_tester_node.py index c25e26d49e..ac7356cfa4 100755 --- a/nav2_system_tests/src/system/nav_through_poses_tester_node.py +++ b/nav2_system_tests/src/system/nav_through_poses_tester_node.py @@ -111,7 +111,7 @@ def runNavigateAction(self, goal_pose: Optional[Pose] = None): rclpy.spin_until_future_complete(self, get_result_future) status = get_result_future.result().status if status != GoalStatus.STATUS_SUCCEEDED: - self.info_msg('Goal failed with status code: {0}'.format(status)) + self.info_msg(f'Goal failed with status code: {status}') return False self.info_msg('Goal succeeded!') @@ -124,22 +124,22 @@ def poseCallback(self, msg): def wait_for_node_active(self, node_name: str): # Waits for the node within the tester namespace to become active - self.info_msg('Waiting for ' + node_name + ' to become active') - node_service = node_name + '/get_state' + self.info_msg(f'Waiting for {node_name} to become active') + node_service = f'{node_name}/get_state' state_client = self.create_client(GetState, node_service) while not state_client.wait_for_service(timeout_sec=1.0): - self.info_msg(node_service + ' service not available, waiting...') + self.info_msg(f'{node_service} service not available, waiting...') req = GetState.Request() # empty request state = 'UNKNOWN' while (state != 'active'): - self.info_msg('Getting ' + node_name + ' state...') + self.info_msg(f'Getting {node_name} state...') future = state_client.call_async(req) rclpy.spin_until_future_complete(self, future) if future.result() is not None: state = future.result().current_state.label - self.info_msg('Result of get_state: %s' % state) + self.info_msg(f'Result of get_state: {state}') else: - self.error_msg('Exception while calling service: %r' % future.exception()) + self.error_msg(f'Exception while calling service: {future.exception()!r}') time.sleep(5) def shutdown(self): @@ -149,7 +149,7 @@ def shutdown(self): transition_service = 'lifecycle_manager_navigation/manage_nodes' mgr_client = self.create_client(ManageLifecycleNodes, transition_service) while not mgr_client.wait_for_service(timeout_sec=1.0): - self.info_msg(transition_service + ' service not available, waiting...') + self.info_msg(f'{transition_service} service not available, waiting...') req = ManageLifecycleNodes.Request() req.command = ManageLifecycleNodes.Request().SHUTDOWN @@ -160,11 +160,11 @@ def shutdown(self): future.result() self.info_msg('Shutting down navigation lifecycle manager complete.') except Exception as e: # noqa: B902 - self.error_msg('Service call failed %r' % (e,)) + self.error_msg(f'Service call failed {e!r}') transition_service = 'lifecycle_manager_localization/manage_nodes' mgr_client = self.create_client(ManageLifecycleNodes, transition_service) while not mgr_client.wait_for_service(timeout_sec=1.0): - self.info_msg(transition_service + ' service not available, waiting...') + self.info_msg(f'{transition_service} service not available, waiting...') req = ManageLifecycleNodes.Request() req.command = ManageLifecycleNodes.Request().SHUTDOWN @@ -175,7 +175,7 @@ def shutdown(self): future.result() self.info_msg('Shutting down localization lifecycle manager complete') except Exception as e: # noqa: B902 - self.error_msg('Service call failed %r' % (e,)) + self.error_msg(f'Service call failed {e!r}') def wait_for_initial_pose(self): self.initial_pose_received = False diff --git a/nav2_system_tests/src/system/nav_to_pose_tester_node.py b/nav2_system_tests/src/system/nav_to_pose_tester_node.py index 176f2e4490..2f7aca1b5c 100755 --- a/nav2_system_tests/src/system/nav_to_pose_tester_node.py +++ b/nav2_system_tests/src/system/nav_to_pose_tester_node.py @@ -133,14 +133,14 @@ def runNavigateAction(self, goal_pose: Optional[Pose] = None): self.error_msg('Failed GROOT_BT - Get Status from ZMQ Publisher') future_return = False except Exception as e: # noqa: B902 - self.error_msg('Failed GROOT_BT - ZMQ Tests: ' + e.__doc__ + e.message) + self.error_msg(f'Failed GROOT_BT - ZMQ Tests: {e}') future_return = False self.info_msg("Waiting for 'NavigateToPose' action to complete") rclpy.spin_until_future_complete(self, get_result_future) status = get_result_future.result().status if status != GoalStatus.STATUS_SUCCEEDED: - self.info_msg('Goal failed with status code: {0}'.format(status)) + self.info_msg(f'Goal failed with status code: {status}') return False if not future_return: @@ -159,8 +159,8 @@ def grootMonitoringReloadTree(self): sock.setsockopt(zmq.RCVTIMEO, 1000) # sock.setsockopt(zmq.LINGER, 0) - sock.connect('tcp://localhost:' + str(port)) - self.info_msg('ZMQ Server Port: ' + str(port)) + sock.connect(f'tcp://localhost: {port}') + self.info_msg(f'ZMQ Server Port: {port}') # this should fail try: @@ -206,7 +206,7 @@ def grootMonitoringGetStatus(self): # Define subscription and messages with prefix to accept. sock.setsockopt_string(zmq.SUBSCRIBE, '') port = 1666 # default publishing port for groot monitoring - sock.connect('tcp://127.0.0.1:' + str(port)) + sock.connect(f'tcp://127.0.0.1:{port}') for request in range(3): try: @@ -242,27 +242,27 @@ def distanceFromGoal(self): d_x = self.current_pose.position.x - self.goal_pose.position.x d_y = self.current_pose.position.y - self.goal_pose.position.y distance = math.sqrt(d_x * d_x + d_y * d_y) - self.info_msg('Distance from goal is: ' + str(distance)) + self.info_msg(f'Distance from goal is: {distance}') return distance def wait_for_node_active(self, node_name: str): # Waits for the node within the tester namespace to become active - self.info_msg('Waiting for ' + node_name + ' to become active') - node_service = node_name + '/get_state' + self.info_msg(f'Waiting for {node_name} to become active') + node_service = f'{node_name}/get_state' state_client = self.create_client(GetState, node_service) while not state_client.wait_for_service(timeout_sec=1.0): - self.info_msg(node_service + ' service not available, waiting...') + self.info_msg(f'{node_service} service not available, waiting...') req = GetState.Request() # empty request state = 'UNKNOWN' while (state != 'active'): - self.info_msg('Getting ' + node_name + ' state...') + self.info_msg(f'Getting {node_name} state...') future = state_client.call_async(req) rclpy.spin_until_future_complete(self, future) if future.result() is not None: state = future.result().current_state.label - self.info_msg('Result of get_state: %s' % state) + self.info_msg(f'Result of get_state: {state}') else: - self.error_msg('Exception while calling service: %r' % future.exception()) + self.error_msg(f'Exception while calling service: {future.exception()!r}') time.sleep(5) def shutdown(self): @@ -272,7 +272,7 @@ def shutdown(self): transition_service = 'lifecycle_manager_navigation/manage_nodes' mgr_client = self.create_client(ManageLifecycleNodes, transition_service) while not mgr_client.wait_for_service(timeout_sec=1.0): - self.info_msg(transition_service + ' service not available, waiting...') + self.info_msg(f'{transition_service} service not available, waiting...') req = ManageLifecycleNodes.Request() req.command = ManageLifecycleNodes.Request().SHUTDOWN @@ -283,11 +283,11 @@ def shutdown(self): future.result() self.info_msg('Shutting down navigation lifecycle manager complete.') except Exception as e: # noqa: B902 - self.error_msg('Service call failed %r' % (e,)) + self.error_msg(f'Service call failed {e!r}') transition_service = 'lifecycle_manager_localization/manage_nodes' mgr_client = self.create_client(ManageLifecycleNodes, transition_service) while not mgr_client.wait_for_service(timeout_sec=1.0): - self.info_msg(transition_service + ' service not available, waiting...') + self.info_msg(f'{transition_service} service not available, waiting...') req = ManageLifecycleNodes.Request() req.command = ManageLifecycleNodes.Request().SHUTDOWN @@ -298,7 +298,7 @@ def shutdown(self): future.result() self.info_msg('Shutting down localization lifecycle manager complete') except Exception as e: # noqa: B902 - self.error_msg('Service call failed %r' % (e,)) + self.error_msg(f'Service call failed {e!r}') def wait_for_initial_pose(self): self.initial_pose_received = False diff --git a/nav2_system_tests/src/system/test_multi_robot_launch.py b/nav2_system_tests/src/system/test_multi_robot_launch.py index b0104804ef..ff37ab2af2 100755 --- a/nav2_system_tests/src/system/test_multi_robot_launch.py +++ b/nav2_system_tests/src/system/test_multi_robot_launch.py @@ -89,7 +89,7 @@ def generate_launch_description(): # Define commands for launching the navigation instances nav_instances_cmds = [] for robot in robots: - params_file = eval(robot['name'] + '_params_file') + params_file = eval(f"{robot['name']}_params_file") group = GroupAction([ # Instances use the robot's name for namespace diff --git a/nav2_system_tests/src/system_failure/tester_node.py b/nav2_system_tests/src/system_failure/tester_node.py index 3635f70a1f..e4b3a9baeb 100755 --- a/nav2_system_tests/src/system_failure/tester_node.py +++ b/nav2_system_tests/src/system_failure/tester_node.py @@ -116,7 +116,7 @@ def runNavigateAction(self, goal_pose: Optional[Pose] = None): rclpy.spin_until_future_complete(self, get_result_future) status = get_result_future.result().status if status != GoalStatus.STATUS_ABORTED: - self.info_msg('Goal failed with status code: {0}'.format(status)) + self.info_msg(f'Goal failed with status code: {status}') return False self.info_msg('Goal failed, as expected!') @@ -146,27 +146,27 @@ def distanceFromGoal(self): d_x = self.current_pose.position.x - self.goal_pose.position.x d_y = self.current_pose.position.y - self.goal_pose.position.y distance = math.sqrt(d_x * d_x + d_y * d_y) - self.info_msg('Distance from goal is: ' + str(distance)) + self.info_msg(f'Distance from goal is: {distance}') return distance def wait_for_node_active(self, node_name: str): # Waits for the node within the tester namespace to become active - self.info_msg('Waiting for ' + node_name + ' to become active') - node_service = node_name + '/get_state' + self.info_msg(f'Waiting for {node_name} to become active') + node_service = f'{node_name}/get_state' state_client = self.create_client(GetState, node_service) while not state_client.wait_for_service(timeout_sec=1.0): - self.info_msg(node_service + ' service not available, waiting...') + self.info_msg(f'{node_service} service not available, waiting...') req = GetState.Request() # empty request state = 'UNKNOWN' while (state != 'active'): - self.info_msg('Getting ' + node_name + ' state...') + self.info_msg(f'Getting {node_name} state...') future = state_client.call_async(req) rclpy.spin_until_future_complete(self, future) if future.result() is not None: state = future.result().current_state.label - self.info_msg('Result of get_state: %s' % state) + self.info_msg(f'Result of get_state: {state}') else: - self.error_msg('Exception while calling service: %r' % future.exception()) + self.error_msg(f'Exception while calling service: {future.exception()!r}') time.sleep(5) def shutdown(self): @@ -176,7 +176,7 @@ def shutdown(self): transition_service = 'lifecycle_manager_navigation/manage_nodes' mgr_client = self.create_client(ManageLifecycleNodes, transition_service) while not mgr_client.wait_for_service(timeout_sec=1.0): - self.info_msg(transition_service + ' service not available, waiting...') + self.info_msg(f'{transition_service} service not available, waiting...') req = ManageLifecycleNodes.Request() req.command = ManageLifecycleNodes.Request().SHUTDOWN @@ -187,11 +187,11 @@ def shutdown(self): future.result() self.info_msg('Shutting down navigation lifecycle manager complete.') except Exception as e: # noqa: B902 - self.error_msg('Service call failed %r' % (e,)) + self.error_msg(f'Service call failed {e!r}') transition_service = 'lifecycle_manager_localization/manage_nodes' mgr_client = self.create_client(ManageLifecycleNodes, transition_service) while not mgr_client.wait_for_service(timeout_sec=1.0): - self.info_msg(transition_service + ' service not available, waiting...') + self.info_msg(f'{transition_service} service not available, waiting...') req = ManageLifecycleNodes.Request() req.command = ManageLifecycleNodes.Request().SHUTDOWN @@ -202,7 +202,7 @@ def shutdown(self): future.result() self.info_msg('Shutting down localization lifecycle manager complete') except Exception as e: # noqa: B902 - self.error_msg('Service call failed %r' % (e,)) + self.error_msg(f'Service call failed {e!r}') def wait_for_initial_pose(self): self.initial_pose_received = False diff --git a/nav2_system_tests/src/waypoint_follower/tester.py b/nav2_system_tests/src/waypoint_follower/tester.py index beb27a0b75..e6d299caa5 100755 --- a/nav2_system_tests/src/waypoint_follower/tester.py +++ b/nav2_system_tests/src/waypoint_follower/tester.py @@ -87,7 +87,7 @@ def run(self, block): rclpy.spin_until_future_complete(self, send_goal_future) self.goal_handle = send_goal_future.result() except Exception as e: # noqa: B902 - self.error_msg('Service call failed %r' % (e,)) + self.error_msg(f'Service call failed {e!r}') if not self.goal_handle.accepted: self.error_msg('Goal rejected') @@ -105,10 +105,10 @@ def run(self, block): status = get_result_future.result().status result = get_result_future.result().result except Exception as e: # noqa: B902 - self.error_msg('Service call failed %r' % (e,)) + self.error_msg(f'Service call failed {e!r}') if status != GoalStatus.STATUS_SUCCEEDED: - self.info_msg('Goal failed with status code: {0}'.format(status)) + self.info_msg(f'Goal failed with status code: {status}') return False if len(result.missed_waypoints) > 0: self.info_msg('Goal failed to process all waypoints,' @@ -130,7 +130,7 @@ def shutdown(self): transition_service = 'lifecycle_manager_navigation/manage_nodes' mgr_client = self.create_client(ManageLifecycleNodes, transition_service) while not mgr_client.wait_for_service(timeout_sec=1.0): - self.info_msg(transition_service + ' service not available, waiting...') + self.info_msg(f'{transition_service} service not available, waiting...') req = ManageLifecycleNodes.Request() req.command = ManageLifecycleNodes.Request().SHUTDOWN @@ -139,14 +139,14 @@ def shutdown(self): rclpy.spin_until_future_complete(self, future) future.result() except Exception as e: # noqa: B902 - self.error_msg('%s service call failed %r' % (transition_service, e,)) + self.error_msg(f'{transition_service} service call failed {e!r}') - self.info_msg('{} finished'.format(transition_service)) + self.info_msg(f'{transition_service} finished') transition_service = 'lifecycle_manager_localization/manage_nodes' mgr_client = self.create_client(ManageLifecycleNodes, transition_service) while not mgr_client.wait_for_service(timeout_sec=1.0): - self.info_msg(transition_service + ' service not available, waiting...') + self.info_msg(f'{transition_service} service not available, waiting...') req = ManageLifecycleNodes.Request() req.command = ManageLifecycleNodes.Request().SHUTDOWN @@ -155,9 +155,9 @@ def shutdown(self): rclpy.spin_until_future_complete(self, future) future.result() except Exception as e: # noqa: B902 - self.error_msg('%s service call failed %r' % (transition_service, e,)) + self.error_msg(f'{transition_service} service call failed {e!r}') - self.info_msg('{} finished'.format(transition_service)) + self.info_msg(f'{transition_service} finished') def cancel_goal(self): cancel_future = self.goal_handle.cancel_goal_async() diff --git a/tools/bt2img.py b/tools/bt2img.py index 31c84eecec..31480f19be 100755 --- a/tools/bt2img.py +++ b/tools/bt2img.py @@ -89,7 +89,7 @@ def main(): legend.render(args.legend) dot.format = 'png' if args.save_dot: - print("Saving dot to " + str(args.save_dot)) + print(f'Saving dot to {args.save_dot}') args.save_dot.write(dot.source) dot.render(args.image_out, view=args.display) @@ -119,7 +119,7 @@ def find_behavior_tree(xml_tree, tree_name): if tree_name == tree.get('ID'): return tree - raise RuntimeError("No behavior tree for name " + tree_name + " found in the XML file") + raise RuntimeError(f'No behavior tree for name {tree_name} found in the XML file') # Generate a dot description of the root of the behavior tree. def convert2dot(behavior_tree):