Skip to content

Commit

Permalink
Python string format (#2466)
Browse files Browse the repository at this point in the history
* Convert to python format strings for readability

* Merge concatenated strings

* Revert converting generated files

* Single quotes for consistency

* Just print the exception
  • Loading branch information
Timple authored Jul 23, 2021
1 parent a800b89 commit 5c61644
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 92 deletions.
2 changes: 1 addition & 1 deletion nav2_bringup/bringup/launch/multi_tb3_simulation_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def generate_launch_description():
# Define commands for launching the navigation instances
nav_instances_cmds = []
for robot in robots:
params_file = LaunchConfiguration(robot['name'] + '_params_file')
params_file = LaunchConfiguration(f"{robot['name']}_params_file")

group = GroupAction([
IncludeLaunchDescription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def main():
if args.turtlebot_type is not None:
sdf_file_path = os.path.join(
get_package_share_directory('turtlebot3_gazebo'), 'models',
'turtlebot3_{}'.format(args.turtlebot_type), 'model.sdf')
f'turtlebot3_{args.turtlebot_type}', 'model.sdf')
else:
sdf_file_path = args.sdf

Expand All @@ -86,7 +86,7 @@ def main():
if args.robot_namespace:
ros_params = plugin.find('ros')
ros_tf_remap = ET.SubElement(ros_params, 'remapping')
ros_tf_remap.text = '/tf:=/' + args.robot_namespace + '/tf'
ros_tf_remap.text = f'/tf:=/{args.robot_namespace}/tf'

# Set data for request
request = SpawnEntity.Request()
Expand All @@ -101,10 +101,10 @@ def main():
future = client.call_async(request)
rclpy.spin_until_future_complete(node, future, timeout_sec=args.timeout)
if future.result() is not None:
print('response: %r' % future.result())
print(f'response: {future.result()!r}')
else:
raise RuntimeError(
'exception while calling service: %r' % future.exception())
f'exception while calling service: {future.exception()!r}')

node.get_logger().info('Done! Shutting down node.')
node.destroy_node()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@


def generate_launch_description():
map_publisher = os.path.dirname(os.getenv('TEST_EXECUTABLE')) + '/test_map_saver_publisher'
map_publisher = f"{os.path.dirname(os.getenv('TEST_EXECUTABLE'))}/test_map_saver_publisher"

ld = LaunchDescription()

Expand Down
6 changes: 3 additions & 3 deletions nav2_simple_commander/nav2_simple_commander/demo_picking.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def main():
shelf_item_pose.pose.position.y = shelf_positions[request_item_location][1]
shelf_item_pose.pose.orientation.z = 1.0
shelf_item_pose.pose.orientation.w = 0.0
print('Received request for item picking at ' + request_item_location + '.')
print(f'Received request for item picking at {request_item_location}.')
navigator.goToPose(shelf_item_pose)

# Do something during our route
Expand Down Expand Up @@ -105,12 +105,12 @@ def main():
navigator.goToPose(shipping_destination)

elif result == NavigationResult.CANCELED:
print('Task at ' + request_item_location + ' was canceled. Returning to staging point...')
print(f'Task at {request_item_location} was canceled. Returning to staging point...')
initial_pose.header.stamp = navigator.get_clock().now().to_msg()
navigator.goToPose(initial_pose)

elif result == NavigationResult.FAILED:
print('Task at ' + request_item_location + ' failed!')
print(f'Task at {request_item_location} failed!')
exit(-1)

while not navigator.isNavComplete():
Expand Down
32 changes: 16 additions & 16 deletions nav2_simple_commander/nav2_simple_commander/robot_navigator.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,14 @@ def goThroughPoses(self, poses):
goal_msg = NavigateThroughPoses.Goal()
goal_msg.poses = poses

self.info('Navigating with ' + str(len(goal_msg.poses)) + ' goals.' + '...')
self.info(f'Navigating with {len(goal_msg.poses)} goals....')
send_goal_future = self.nav_through_poses_client.send_goal_async(goal_msg,
self._feedbackCallback)
rclpy.spin_until_future_complete(self, send_goal_future)
self.goal_handle = send_goal_future.result()

if not self.goal_handle.accepted:
self.error('Goal with ' + str(len(poses)) + ' poses was rejected!')
self.error(f'Goal with {len(poses)} poses was rejected!')
return False

self.result_future = self.goal_handle.get_result_async()
Expand Down Expand Up @@ -142,14 +142,14 @@ def followWaypoints(self, poses):
goal_msg = FollowWaypoints.Goal()
goal_msg.poses = poses

self.info('Following ' + str(len(goal_msg.poses)) + ' goals.' + '...')
self.info(f'Following {len(goal_msg.poses)} goals....')
send_goal_future = self.follow_waypoints_client.send_goal_async(goal_msg,
self._feedbackCallback)
rclpy.spin_until_future_complete(self, send_goal_future)
self.goal_handle = send_goal_future.result()

if not self.goal_handle.accepted:
self.error('Following ' + str(len(poses)) + ' waypoints request was rejected!')
self.error(f'Following {len(poses)} waypoints request was rejected!')
return False

self.result_future = self.goal_handle.get_result_async()
Expand All @@ -172,7 +172,7 @@ def isNavComplete(self):
if self.result_future.result():
self.status = self.result_future.result().status
if self.status != GoalStatus.STATUS_SUCCEEDED:
self.debug('Goal with failed with status code: {0}'.format(self.status))
self.debug(f'Goal with failed with status code: {self.status}')
return True
else:
# Timed out, still processing, not complete yet
Expand Down Expand Up @@ -227,7 +227,7 @@ def getPath(self, start, goal):
rclpy.spin_until_future_complete(self, self.result_future)
self.status = self.result_future.result().status
if self.status != GoalStatus.STATUS_SUCCEEDED:
self.warn('Getting path failed with status code: {0}'.format(self.status))
self.warn(f'Getting path failed with status code: {self.status}')
return None

return self.result_future.result().result.path
Expand Down Expand Up @@ -255,7 +255,7 @@ def getPathThroughPoses(self, start, goals):
rclpy.spin_until_future_complete(self, self.result_future)
self.status = self.result_future.result().status
if self.status != GoalStatus.STATUS_SUCCEEDED:
self.warn('Getting path failed with status code: {0}'.format(self.status))
self.warn(f'Getting path failed with status code: {self.status}')
return None

return self.result_future.result().result.path
Expand Down Expand Up @@ -322,10 +322,10 @@ def lifecycleStartup(self):
self.info('Starting up lifecycle nodes based on lifecycle_manager.')
for srv_name, srv_type in self.get_service_names_and_types():
if srv_type[0] == 'nav2_msgs/srv/ManageLifecycleNodes':
self.info('Starting up ' + srv_name)
self.info(f'Starting up {srv_name}')
mgr_client = self.create_client(ManageLifecycleNodes, srv_name)
while not mgr_client.wait_for_service(timeout_sec=1.0):
self.info(srv_name + ' service not available, waiting...')
self.info(f'{srv_name} service not available, waiting...')
req = ManageLifecycleNodes.Request()
req.command = ManageLifecycleNodes.Request().STARTUP
future = mgr_client.call_async(req)
Expand All @@ -346,10 +346,10 @@ def lifecycleShutdown(self):
self.info('Shutting down lifecycle nodes based on lifecycle_manager.')
for srv_name, srv_type in self.get_service_names_and_types():
if srv_type[0] == 'nav2_msgs/srv/ManageLifecycleNodes':
self.info('Shutting down ' + srv_name)
self.info(f'Shutting down {srv_name}')
mgr_client = self.create_client(ManageLifecycleNodes, srv_name)
while not mgr_client.wait_for_service(timeout_sec=1.0):
self.info(srv_name + ' service not available, waiting...')
self.info(f'{srv_name} service not available, waiting...')
req = ManageLifecycleNodes.Request()
req.command = ManageLifecycleNodes.Request().SHUTDOWN
future = mgr_client.call_async(req)
Expand All @@ -359,21 +359,21 @@ def lifecycleShutdown(self):

def _waitForNodeToActivate(self, node_name):
# Waits for the node within the tester namespace to become active
self.debug('Waiting for ' + node_name + ' to become active..')
node_service = node_name + '/get_state'
self.debug(f'Waiting for {node_name} to become active..')
node_service = f'{node_name}/get_state'
state_client = self.create_client(GetState, node_service)
while not state_client.wait_for_service(timeout_sec=1.0):
self.info(node_service + ' service not available, waiting...')
self.info(f'{node_service} service not available, waiting...')

req = GetState.Request()
state = 'unknown'
while state != 'active':
self.debug('Getting ' + node_name + ' state...')
self.debug(f'Getting {node_name} state...')
future = state_client.call_async(req)
rclpy.spin_until_future_complete(self, future)
if future.result() is not None:
state = future.result().current_state.label
self.debug('Result of get_state: %s' % state)
self.debug(f'Result of get_state: {state}')
time.sleep(2)
return

Expand Down
29 changes: 13 additions & 16 deletions nav2_system_tests/src/costmap_filters/tester_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def runNavigateAction(self, goal_pose: Optional[Pose] = None):
rclpy.spin_until_future_complete(self, get_result_future)
status = get_result_future.result().status
if status != GoalStatus.STATUS_SUCCEEDED:
self.info_msg('Goal failed with status code: {0}'.format(status))
self.info_msg(f'Goal failed with status code: {status}')
return False

self.info_msg('Goal succeeded!')
Expand All @@ -230,8 +230,7 @@ def checkKeepout(self, x, y):
self.warn_msg('Filter mask was not received')
elif self.isInKeepout(x, y):
self.filter_test_result = False
self.error_msg('Pose (' + str(x) + ', ' +
str(y) + ') belongs to keepout zone')
self.error_msg(f'Pose ({x}, {y}) belongs to keepout zone')
return False
return True

Expand Down Expand Up @@ -286,7 +285,7 @@ def dwbCostCloudCallback(self, msg):
self.cost_cloud_received = True

def speedLimitCallback(self, msg):
self.info_msg('Received speed limit: ' + str(msg.speed_limit))
self.info_msg(f'Received speed limit: {msg.speed_limit}')
self.checkSpeed(self.speed_it, msg.speed_limit)
self.speed_it += 1

Expand Down Expand Up @@ -340,25 +339,25 @@ def distanceFromGoal(self):
d_x = self.current_pose.position.x - self.goal_pose.position.x
d_y = self.current_pose.position.y - self.goal_pose.position.y
distance = math.sqrt(d_x * d_x + d_y * d_y)
self.info_msg('Distance from goal is: ' + str(distance))
self.info_msg(f'Distance from goal is: {distance}')
return distance

def wait_for_node_active(self, node_name: str):
# Waits for the node within the tester namespace to become active
self.info_msg('Waiting for ' + node_name + ' to become active')
node_service = node_name + '/get_state'
self.info_msg(f'Waiting for {node_name} to become active')
node_service = f'{node_name}/get_state'
state_client = self.create_client(GetState, node_service)
while not state_client.wait_for_service(timeout_sec=1.0):
self.info_msg(node_service + ' service not available, waiting...')
self.info_msg(f'{node_service} service not available, waiting...')
req = GetState.Request() # empty request
state = 'UNKNOWN'
while (state != 'active'):
self.info_msg('Getting ' + node_name + ' state...')
self.info_msg(f'Getting {node_name} state...')
future = state_client.call_async(req)
rclpy.spin_until_future_complete(self, future)
if future.result() is not None:
state = future.result().current_state.label
self.info_msg('Result of get_state: %s' % state)
self.info_msg(f'Result of get_state: {state}')
else:
self.error_msg('Exception while calling service: %r' %
future.exception())
Expand All @@ -372,8 +371,7 @@ def shutdown(self):
mgr_client = self.create_client(
ManageLifecycleNodes, transition_service)
while not mgr_client.wait_for_service(timeout_sec=1.0):
self.info_msg(transition_service +
' service not available, waiting...')
self.info_msg(f'{transition_service} service not available, waiting...')

req = ManageLifecycleNodes.Request()
req.command = ManageLifecycleNodes.Request().SHUTDOWN
Expand All @@ -385,13 +383,12 @@ def shutdown(self):
self.info_msg(
'Shutting down navigation lifecycle manager complete.')
except Exception as e: # noqa: B902
self.error_msg('Service call failed %r' % (e,))
self.error_msg(f'Service call failed {e!r}')
transition_service = 'lifecycle_manager_localization/manage_nodes'
mgr_client = self.create_client(
ManageLifecycleNodes, transition_service)
while not mgr_client.wait_for_service(timeout_sec=1.0):
self.info_msg(transition_service +
' service not available, waiting...')
self.info_msg(f'{transition_service} service not available, waiting...')

req = ManageLifecycleNodes.Request()
req.command = ManageLifecycleNodes.Request().SHUTDOWN
Expand All @@ -403,7 +400,7 @@ def shutdown(self):
self.info_msg(
'Shutting down localization lifecycle manager complete')
except Exception as e: # noqa: B902
self.error_msg('Service call failed %r' % (e,))
self.error_msg(f'Service call failed {e!r}')

def wait_for_initial_pose(self):
self.initial_pose_received = False
Expand Down
22 changes: 11 additions & 11 deletions nav2_system_tests/src/system/nav_through_poses_tester_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def runNavigateAction(self, goal_pose: Optional[Pose] = None):
rclpy.spin_until_future_complete(self, get_result_future)
status = get_result_future.result().status
if status != GoalStatus.STATUS_SUCCEEDED:
self.info_msg('Goal failed with status code: {0}'.format(status))
self.info_msg(f'Goal failed with status code: {status}')
return False

self.info_msg('Goal succeeded!')
Expand All @@ -124,22 +124,22 @@ def poseCallback(self, msg):

def wait_for_node_active(self, node_name: str):
# Waits for the node within the tester namespace to become active
self.info_msg('Waiting for ' + node_name + ' to become active')
node_service = node_name + '/get_state'
self.info_msg(f'Waiting for {node_name} to become active')
node_service = f'{node_name}/get_state'
state_client = self.create_client(GetState, node_service)
while not state_client.wait_for_service(timeout_sec=1.0):
self.info_msg(node_service + ' service not available, waiting...')
self.info_msg(f'{node_service} service not available, waiting...')
req = GetState.Request() # empty request
state = 'UNKNOWN'
while (state != 'active'):
self.info_msg('Getting ' + node_name + ' state...')
self.info_msg(f'Getting {node_name} state...')
future = state_client.call_async(req)
rclpy.spin_until_future_complete(self, future)
if future.result() is not None:
state = future.result().current_state.label
self.info_msg('Result of get_state: %s' % state)
self.info_msg(f'Result of get_state: {state}')
else:
self.error_msg('Exception while calling service: %r' % future.exception())
self.error_msg(f'Exception while calling service: {future.exception()!r}')
time.sleep(5)

def shutdown(self):
Expand All @@ -149,7 +149,7 @@ def shutdown(self):
transition_service = 'lifecycle_manager_navigation/manage_nodes'
mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
while not mgr_client.wait_for_service(timeout_sec=1.0):
self.info_msg(transition_service + ' service not available, waiting...')
self.info_msg(f'{transition_service} service not available, waiting...')

req = ManageLifecycleNodes.Request()
req.command = ManageLifecycleNodes.Request().SHUTDOWN
Expand All @@ -160,11 +160,11 @@ def shutdown(self):
future.result()
self.info_msg('Shutting down navigation lifecycle manager complete.')
except Exception as e: # noqa: B902
self.error_msg('Service call failed %r' % (e,))
self.error_msg(f'Service call failed {e!r}')
transition_service = 'lifecycle_manager_localization/manage_nodes'
mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
while not mgr_client.wait_for_service(timeout_sec=1.0):
self.info_msg(transition_service + ' service not available, waiting...')
self.info_msg(f'{transition_service} service not available, waiting...')

req = ManageLifecycleNodes.Request()
req.command = ManageLifecycleNodes.Request().SHUTDOWN
Expand All @@ -175,7 +175,7 @@ def shutdown(self):
future.result()
self.info_msg('Shutting down localization lifecycle manager complete')
except Exception as e: # noqa: B902
self.error_msg('Service call failed %r' % (e,))
self.error_msg(f'Service call failed {e!r}')

def wait_for_initial_pose(self):
self.initial_pose_received = False
Expand Down
Loading

0 comments on commit 5c61644

Please sign in to comment.