diff --git a/nav2_bringup/nav2_gazebo_spawner/nav2_gazebo_spawner/nav2_gazebo_spawner.py b/nav2_bringup/nav2_gazebo_spawner/nav2_gazebo_spawner/nav2_gazebo_spawner.py index 00f7012779..8ca57f41de 100644 --- a/nav2_bringup/nav2_gazebo_spawner/nav2_gazebo_spawner/nav2_gazebo_spawner.py +++ b/nav2_bringup/nav2_gazebo_spawner/nav2_gazebo_spawner/nav2_gazebo_spawner.py @@ -87,7 +87,7 @@ def main(): if args.robot_namespace: ros_params = plugin.find('ros') ros_tf_remap = ET.SubElement(ros_params, 'remapping') - ros_tf_remap.text = f"/tf:=/{args.robot_namespace}/tf" + ros_tf_remap.text = f'/tf:=/{args.robot_namespace}/tf' # Set data for request request = SpawnEntity.Request() diff --git a/nav2_simple_commander/nav2_simple_commander/demo_picking.py b/nav2_simple_commander/nav2_simple_commander/demo_picking.py index a93e1e60ae..8cfcb2aec3 100644 --- a/nav2_simple_commander/nav2_simple_commander/demo_picking.py +++ b/nav2_simple_commander/nav2_simple_commander/demo_picking.py @@ -75,7 +75,7 @@ def main(): shelf_item_pose.pose.position.y = shelf_positions[request_item_location][1] shelf_item_pose.pose.orientation.z = 1.0 shelf_item_pose.pose.orientation.w = 0.0 - print(f"Received request for item picking at {request_item_location}.") + print(f'Received request for item picking at {request_item_location}.') navigator.goToPose(shelf_item_pose) # Do something during our route @@ -105,12 +105,12 @@ def main(): navigator.goToPose(shipping_destination) elif result == NavigationResult.CANCELED: - print(f"Task at {request_item_location} was canceled. Returning to staging point...") + print(f'Task at {request_item_location} was canceled. Returning to staging point...') initial_pose.header.stamp = navigator.get_clock().now().to_msg() navigator.goToPose(initial_pose) elif result == NavigationResult.FAILED: - print(f"Task at {request_item_location} failed!") + print(f'Task at {request_item_location} failed!') exit(-1) while not navigator.isNavComplete(): diff --git a/nav2_simple_commander/nav2_simple_commander/robot_navigator.py b/nav2_simple_commander/nav2_simple_commander/robot_navigator.py index e16b5df3b3..80c21cfb58 100644 --- a/nav2_simple_commander/nav2_simple_commander/robot_navigator.py +++ b/nav2_simple_commander/nav2_simple_commander/robot_navigator.py @@ -96,14 +96,14 @@ def goThroughPoses(self, poses): goal_msg = NavigateThroughPoses.Goal() goal_msg.poses = poses - self.info(f"Navigating with {len(goal_msg.poses)} goals....") + self.info(f'Navigating with {len(goal_msg.poses)} goals....') send_goal_future = self.nav_through_poses_client.send_goal_async(goal_msg, self._feedbackCallback) rclpy.spin_until_future_complete(self, send_goal_future) self.goal_handle = send_goal_future.result() if not self.goal_handle.accepted: - self.error(f"Goal with {len(poses)} poses was rejected!") + self.error(f'Goal with {len(poses)} poses was rejected!') return False self.result_future = self.goal_handle.get_result_async() @@ -142,14 +142,14 @@ def followWaypoints(self, poses): goal_msg = FollowWaypoints.Goal() goal_msg.poses = poses - self.info(f"Following {len(goal_msg.poses)} goals....") + self.info(f'Following {len(goal_msg.poses)} goals....') send_goal_future = self.follow_waypoints_client.send_goal_async(goal_msg, self._feedbackCallback) rclpy.spin_until_future_complete(self, send_goal_future) self.goal_handle = send_goal_future.result() if not self.goal_handle.accepted: - self.error(f"Following {len(poses)} waypoints request was rejected!") + self.error(f'Following {len(poses)} waypoints request was rejected!') return False self.result_future = self.goal_handle.get_result_async() @@ -322,10 +322,10 @@ def lifecycleStartup(self): self.info('Starting up lifecycle nodes based on lifecycle_manager.') for srv_name, srv_type in self.get_service_names_and_types(): if srv_type[0] == 'nav2_msgs/srv/ManageLifecycleNodes': - self.info(f"Starting up {srv_name}") + self.info(f'Starting up {srv_name}') mgr_client = self.create_client(ManageLifecycleNodes, srv_name) while not mgr_client.wait_for_service(timeout_sec=1.0): - self.info(f"{srv_name} service not available, waiting...") + self.info(f'{srv_name} service not available, waiting...') req = ManageLifecycleNodes.Request() req.command = ManageLifecycleNodes.Request().STARTUP future = mgr_client.call_async(req) @@ -346,10 +346,10 @@ def lifecycleShutdown(self): self.info('Shutting down lifecycle nodes based on lifecycle_manager.') for srv_name, srv_type in self.get_service_names_and_types(): if srv_type[0] == 'nav2_msgs/srv/ManageLifecycleNodes': - self.info(f"Shutting down {srv_name}") + self.info(f'Shutting down {srv_name}') mgr_client = self.create_client(ManageLifecycleNodes, srv_name) while not mgr_client.wait_for_service(timeout_sec=1.0): - self.info(f"{srv_name} service not available, waiting...") + self.info(f'{srv_name} service not available, waiting...') req = ManageLifecycleNodes.Request() req.command = ManageLifecycleNodes.Request().SHUTDOWN future = mgr_client.call_async(req) @@ -359,16 +359,16 @@ def lifecycleShutdown(self): def _waitForNodeToActivate(self, node_name): # Waits for the node within the tester namespace to become active - self.debug(f"Waiting for {node_name} to become active..") - node_service = f"{node_name}/get_state" + self.debug(f'Waiting for {node_name} to become active..') + node_service = f'{node_name}/get_state' state_client = self.create_client(GetState, node_service) while not state_client.wait_for_service(timeout_sec=1.0): - self.info(f"{node_service} service not available, waiting...") + self.info(f'{node_service} service not available, waiting...') req = GetState.Request() state = 'unknown' while state != 'active': - self.debug(f"Getting {node_name} state...") + self.debug(f'Getting {node_name} state...') future = state_client.call_async(req) rclpy.spin_until_future_complete(self, future) if future.result() is not None: diff --git a/nav2_system_tests/src/costmap_filters/tester_node.py b/nav2_system_tests/src/costmap_filters/tester_node.py index 41f0511bae..0a61116254 100755 --- a/nav2_system_tests/src/costmap_filters/tester_node.py +++ b/nav2_system_tests/src/costmap_filters/tester_node.py @@ -230,7 +230,7 @@ def checkKeepout(self, x, y): self.warn_msg('Filter mask was not received') elif self.isInKeepout(x, y): self.filter_test_result = False - self.error_msg(f"Pose ({x}, {y}) belongs to keepout zone") + self.error_msg(f'Pose ({x}, {y}) belongs to keepout zone') return False return True @@ -285,7 +285,7 @@ def dwbCostCloudCallback(self, msg): self.cost_cloud_received = True def speedLimitCallback(self, msg): - self.info_msg(f"Received speed limit: {msg.speed_limit}") + self.info_msg(f'Received speed limit: {msg.speed_limit}') self.checkSpeed(self.speed_it, msg.speed_limit) self.speed_it += 1 @@ -339,20 +339,20 @@ def distanceFromGoal(self): d_x = self.current_pose.position.x - self.goal_pose.position.x d_y = self.current_pose.position.y - self.goal_pose.position.y distance = math.sqrt(d_x * d_x + d_y * d_y) - self.info_msg(f"Distance from goal is: {distance}") + self.info_msg(f'Distance from goal is: {distance}') return distance def wait_for_node_active(self, node_name: str): # Waits for the node within the tester namespace to become active - self.info_msg(f"Waiting for {node_name} to become active") - node_service = f"{node_name}/get_state" + self.info_msg(f'Waiting for {node_name} to become active') + node_service = f'{node_name}/get_state' state_client = self.create_client(GetState, node_service) while not state_client.wait_for_service(timeout_sec=1.0): - self.info_msg(f"{node_service} service not available, waiting...") + self.info_msg(f'{node_service} service not available, waiting...') req = GetState.Request() # empty request state = 'UNKNOWN' while (state != 'active'): - self.info_msg(f"Getting {node_name} state...") + self.info_msg(f'Getting {node_name} state...') future = state_client.call_async(req) rclpy.spin_until_future_complete(self, future) if future.result() is not None: @@ -371,7 +371,7 @@ def shutdown(self): mgr_client = self.create_client( ManageLifecycleNodes, transition_service) while not mgr_client.wait_for_service(timeout_sec=1.0): - self.info_msg(f"{transition_service} service not available, waiting...") + self.info_msg(f'{transition_service} service not available, waiting...') req = ManageLifecycleNodes.Request() req.command = ManageLifecycleNodes.Request().SHUTDOWN @@ -388,7 +388,7 @@ def shutdown(self): mgr_client = self.create_client( ManageLifecycleNodes, transition_service) while not mgr_client.wait_for_service(timeout_sec=1.0): - self.info_msg(f"{transition_service} service not available, waiting...") + self.info_msg(f'{transition_service} service not available, waiting...') req = ManageLifecycleNodes.Request() req.command = ManageLifecycleNodes.Request().SHUTDOWN diff --git a/nav2_system_tests/src/system/nav_through_poses_tester_node.py b/nav2_system_tests/src/system/nav_through_poses_tester_node.py index 7cd95c7f7b..ac7356cfa4 100755 --- a/nav2_system_tests/src/system/nav_through_poses_tester_node.py +++ b/nav2_system_tests/src/system/nav_through_poses_tester_node.py @@ -124,15 +124,15 @@ def poseCallback(self, msg): def wait_for_node_active(self, node_name: str): # Waits for the node within the tester namespace to become active - self.info_msg(f"Waiting for {node_name} to become active") - node_service = f"{node_name}/get_state" + self.info_msg(f'Waiting for {node_name} to become active') + node_service = f'{node_name}/get_state' state_client = self.create_client(GetState, node_service) while not state_client.wait_for_service(timeout_sec=1.0): - self.info_msg(f"{node_service} service not available, waiting...") + self.info_msg(f'{node_service} service not available, waiting...') req = GetState.Request() # empty request state = 'UNKNOWN' while (state != 'active'): - self.info_msg(f"Getting {node_name} state...") + self.info_msg(f'Getting {node_name} state...') future = state_client.call_async(req) rclpy.spin_until_future_complete(self, future) if future.result() is not None: @@ -149,7 +149,7 @@ def shutdown(self): transition_service = 'lifecycle_manager_navigation/manage_nodes' mgr_client = self.create_client(ManageLifecycleNodes, transition_service) while not mgr_client.wait_for_service(timeout_sec=1.0): - self.info_msg(f"{transition_service} service not available, waiting...") + self.info_msg(f'{transition_service} service not available, waiting...') req = ManageLifecycleNodes.Request() req.command = ManageLifecycleNodes.Request().SHUTDOWN @@ -164,7 +164,7 @@ def shutdown(self): transition_service = 'lifecycle_manager_localization/manage_nodes' mgr_client = self.create_client(ManageLifecycleNodes, transition_service) while not mgr_client.wait_for_service(timeout_sec=1.0): - self.info_msg(f"{transition_service} service not available, waiting...") + self.info_msg(f'{transition_service} service not available, waiting...') req = ManageLifecycleNodes.Request() req.command = ManageLifecycleNodes.Request().SHUTDOWN diff --git a/nav2_system_tests/src/system/nav_to_pose_tester_node.py b/nav2_system_tests/src/system/nav_to_pose_tester_node.py index aff1793176..fb038a1a59 100755 --- a/nav2_system_tests/src/system/nav_to_pose_tester_node.py +++ b/nav2_system_tests/src/system/nav_to_pose_tester_node.py @@ -133,7 +133,7 @@ def runNavigateAction(self, goal_pose: Optional[Pose] = None): self.error_msg('Failed GROOT_BT - Get Status from ZMQ Publisher') future_return = False except Exception as e: # noqa: B902 - self.error_msg(f"Failed GROOT_BT - ZMQ Tests: {e.__doc__}{e.message}") + self.error_msg(f'Failed GROOT_BT - ZMQ Tests: {e.__doc__}{e.message}') future_return = False self.info_msg("Waiting for 'NavigateToPose' action to complete") @@ -159,8 +159,8 @@ def grootMonitoringReloadTree(self): sock.setsockopt(zmq.RCVTIMEO, 1000) # sock.setsockopt(zmq.LINGER, 0) - sock.connect(f"tcp://localhost: {port}") - self.info_msg(f"ZMQ Server Port: {port}") + sock.connect(f'tcp://localhost: {port}') + self.info_msg(f'ZMQ Server Port: {port}') # this should fail try: @@ -206,7 +206,7 @@ def grootMonitoringGetStatus(self): # Define subscription and messages with prefix to accept. sock.setsockopt_string(zmq.SUBSCRIBE, '') port = 1666 # default publishing port for groot monitoring - sock.connect(f"tcp://127.0.0.1:{port}") + sock.connect(f'tcp://127.0.0.1:{port}') for request in range(3): try: @@ -242,20 +242,20 @@ def distanceFromGoal(self): d_x = self.current_pose.position.x - self.goal_pose.position.x d_y = self.current_pose.position.y - self.goal_pose.position.y distance = math.sqrt(d_x * d_x + d_y * d_y) - self.info_msg(f"Distance from goal is: {distance}") + self.info_msg(f'Distance from goal is: {distance}') return distance def wait_for_node_active(self, node_name: str): # Waits for the node within the tester namespace to become active - self.info_msg(f"Waiting for {node_name} to become active") - node_service = f"{node_name}/get_state" + self.info_msg(f'Waiting for {node_name} to become active') + node_service = f'{node_name}/get_state' state_client = self.create_client(GetState, node_service) while not state_client.wait_for_service(timeout_sec=1.0): - self.info_msg(f"{node_service} service not available, waiting...") + self.info_msg(f'{node_service} service not available, waiting...') req = GetState.Request() # empty request state = 'UNKNOWN' while (state != 'active'): - self.info_msg(f"Getting {node_name} state...") + self.info_msg(f'Getting {node_name} state...') future = state_client.call_async(req) rclpy.spin_until_future_complete(self, future) if future.result() is not None: @@ -272,7 +272,7 @@ def shutdown(self): transition_service = 'lifecycle_manager_navigation/manage_nodes' mgr_client = self.create_client(ManageLifecycleNodes, transition_service) while not mgr_client.wait_for_service(timeout_sec=1.0): - self.info_msg(f"{transition_service} service not available, waiting...") + self.info_msg(f'{transition_service} service not available, waiting...') req = ManageLifecycleNodes.Request() req.command = ManageLifecycleNodes.Request().SHUTDOWN @@ -287,7 +287,7 @@ def shutdown(self): transition_service = 'lifecycle_manager_localization/manage_nodes' mgr_client = self.create_client(ManageLifecycleNodes, transition_service) while not mgr_client.wait_for_service(timeout_sec=1.0): - self.info_msg(f"{transition_service} service not available, waiting...") + self.info_msg(f'{transition_service} service not available, waiting...') req = ManageLifecycleNodes.Request() req.command = ManageLifecycleNodes.Request().SHUTDOWN diff --git a/nav2_system_tests/src/system_failure/tester_node.py b/nav2_system_tests/src/system_failure/tester_node.py index 168f161155..e4b3a9baeb 100755 --- a/nav2_system_tests/src/system_failure/tester_node.py +++ b/nav2_system_tests/src/system_failure/tester_node.py @@ -146,20 +146,20 @@ def distanceFromGoal(self): d_x = self.current_pose.position.x - self.goal_pose.position.x d_y = self.current_pose.position.y - self.goal_pose.position.y distance = math.sqrt(d_x * d_x + d_y * d_y) - self.info_msg(f"Distance from goal is: {distance}") + self.info_msg(f'Distance from goal is: {distance}') return distance def wait_for_node_active(self, node_name: str): # Waits for the node within the tester namespace to become active - self.info_msg(f"Waiting for {node_name} to become active") - node_service = f"{node_name}/get_state" + self.info_msg(f'Waiting for {node_name} to become active') + node_service = f'{node_name}/get_state' state_client = self.create_client(GetState, node_service) while not state_client.wait_for_service(timeout_sec=1.0): - self.info_msg(f"{node_service} service not available, waiting...") + self.info_msg(f'{node_service} service not available, waiting...') req = GetState.Request() # empty request state = 'UNKNOWN' while (state != 'active'): - self.info_msg(f"Getting {node_name} state...") + self.info_msg(f'Getting {node_name} state...') future = state_client.call_async(req) rclpy.spin_until_future_complete(self, future) if future.result() is not None: @@ -176,7 +176,7 @@ def shutdown(self): transition_service = 'lifecycle_manager_navigation/manage_nodes' mgr_client = self.create_client(ManageLifecycleNodes, transition_service) while not mgr_client.wait_for_service(timeout_sec=1.0): - self.info_msg(f"{transition_service} service not available, waiting...") + self.info_msg(f'{transition_service} service not available, waiting...') req = ManageLifecycleNodes.Request() req.command = ManageLifecycleNodes.Request().SHUTDOWN @@ -191,7 +191,7 @@ def shutdown(self): transition_service = 'lifecycle_manager_localization/manage_nodes' mgr_client = self.create_client(ManageLifecycleNodes, transition_service) while not mgr_client.wait_for_service(timeout_sec=1.0): - self.info_msg(f"{transition_service} service not available, waiting...") + self.info_msg(f'{transition_service} service not available, waiting...') req = ManageLifecycleNodes.Request() req.command = ManageLifecycleNodes.Request().SHUTDOWN diff --git a/nav2_system_tests/src/waypoint_follower/tester.py b/nav2_system_tests/src/waypoint_follower/tester.py index 5b2555b60e..e6d299caa5 100755 --- a/nav2_system_tests/src/waypoint_follower/tester.py +++ b/nav2_system_tests/src/waypoint_follower/tester.py @@ -130,7 +130,7 @@ def shutdown(self): transition_service = 'lifecycle_manager_navigation/manage_nodes' mgr_client = self.create_client(ManageLifecycleNodes, transition_service) while not mgr_client.wait_for_service(timeout_sec=1.0): - self.info_msg(f"{transition_service} service not available, waiting...") + self.info_msg(f'{transition_service} service not available, waiting...') req = ManageLifecycleNodes.Request() req.command = ManageLifecycleNodes.Request().SHUTDOWN @@ -146,7 +146,7 @@ def shutdown(self): transition_service = 'lifecycle_manager_localization/manage_nodes' mgr_client = self.create_client(ManageLifecycleNodes, transition_service) while not mgr_client.wait_for_service(timeout_sec=1.0): - self.info_msg(f"{transition_service} service not available, waiting...") + self.info_msg(f'{transition_service} service not available, waiting...') req = ManageLifecycleNodes.Request() req.command = ManageLifecycleNodes.Request().SHUTDOWN diff --git a/tools/bt2img.py b/tools/bt2img.py index a1c76029a6..31480f19be 100755 --- a/tools/bt2img.py +++ b/tools/bt2img.py @@ -89,7 +89,7 @@ def main(): legend.render(args.legend) dot.format = 'png' if args.save_dot: - print(f"Saving dot to {args.save_dot}") + print(f'Saving dot to {args.save_dot}') args.save_dot.write(dot.source) dot.render(args.image_out, view=args.display) @@ -119,7 +119,7 @@ def find_behavior_tree(xml_tree, tree_name): if tree_name == tree.get('ID'): return tree - raise RuntimeError(f"No behavior tree for name {tree_name} found in the XML file") + raise RuntimeError(f'No behavior tree for name {tree_name} found in the XML file') # Generate a dot description of the root of the behavior tree. def convert2dot(behavior_tree): @@ -157,13 +157,13 @@ def add_nodes(dot, parent_dot_name, parent_node): # type, the name if provided, and the parameters. def make_label(node): label = '< ' - label += f"" + label += '' name = node.get('name') if name: - label += f"" + label += '' for (param_name, value) in node.items(): - label += f"" + label += '' label += '
{node.tag}
' + node.tag + '
{name}
' + name + '
{param_name}={value}
' + param_name + '=' + value + '
>' return label