diff --git a/ros2bag/test/test_record_qos_profiles.py b/ros2bag/test/test_record_qos_profiles.py index 6eb632a013..91beba614f 100644 --- a/ros2bag/test/test_record_qos_profiles.py +++ b/ros2bag/test/test_record_qos_profiles.py @@ -36,7 +36,10 @@ PROFILE_PATH = Path(__file__).parent / 'resources' TEST_NODE = 'ros2bag_record_qos_profile_test_node' TEST_NAMESPACE = 'ros2bag_record_qos_profile' -ERROR_STRING = r'\[ERROR] \[ros2bag]:' +ERROR_STRING_MSG = 'ros2bag CLI did not produce the expected output'\ + '\n Expected output pattern: {}\n Actual output: {}' +OUTPUT_WAIT_TIMEOUT = 10 +SHUTDOWN_TIMEOUT = 5 @pytest.mark.rostest @@ -81,52 +84,67 @@ def test_qos_simple(self): output_path = Path(self.tmpdir.name) / 'ros2bag_test_basic' arguments = ['record', '-a', '--qos-profile-overrides-path', profile_path.as_posix(), '--output', output_path.as_posix()] + expected_string_regex = re.compile( + r'\[rosbag2_storage]: Opened database .* for READ_WRITE') with self.launch_bag_command(arguments=arguments) as bag_command: - time.sleep(3) - bag_command.wait_for_shutdown(timeout=5) + bag_command.wait_for_output( + condition=lambda output: expected_string_regex.search(output) is not None, + timeout=OUTPUT_WAIT_TIMEOUT) + bag_command.wait_for_shutdown(timeout=SHUTDOWN_TIMEOUT) assert bag_command.terminated - expected_string_regex = re.compile(ERROR_STRING) matches = expected_string_regex.search(bag_command.output) - assert not matches, print('ros2bag CLI did not produce the expected output') + assert matches, print( + ERROR_STRING_MSG.format(expected_string_regex.pattern, bag_command.output)) def test_incomplete_qos_profile(self): profile_path = PROFILE_PATH / 'incomplete_qos_profile.yaml' output_path = Path(self.tmpdir.name) / 'ros2bag_test_incomplete' arguments = ['record', '-a', '--qos-profile-overrides-path', profile_path.as_posix(), '--output', output_path.as_posix()] + expected_string_regex = re.compile( + r'\[rosbag2_storage]: Opened database .* for READ_WRITE') with self.launch_bag_command(arguments=arguments) as bag_command: - time.sleep(3) - bag_command.wait_for_shutdown(timeout=5) + bag_command.wait_for_output( + condition=lambda output: expected_string_regex.search(output) is not None, + timeout=OUTPUT_WAIT_TIMEOUT) + bag_command.wait_for_shutdown(timeout=SHUTDOWN_TIMEOUT) assert bag_command.terminated - expected_string_regex = re.compile(ERROR_STRING) matches = expected_string_regex.search(bag_command.output) - assert not matches, print('ros2bag CLI did not produce the expected output') + assert matches, print( + ERROR_STRING_MSG.format(expected_string_regex.pattern, bag_command.output)) def test_incomplete_qos_duration(self): profile_path = PROFILE_PATH / 'incomplete_qos_duration.yaml' output_path = Path(self.tmpdir.name) / 'ros2bag_test_incomplete_duration' arguments = ['record', '-a', '--qos-profile-overrides-path', profile_path.as_posix(), '--output', output_path.as_posix()] + expected_string_regex = re.compile( + r'\[ERROR] \[ros2bag]: Time overrides must include both') with self.launch_bag_command(arguments=arguments) as bag_command: - time.sleep(3) - bag_command.wait_for_shutdown(timeout=5) + bag_command.wait_for_output( + condition=lambda output: expected_string_regex.search(output) is not None, + timeout=OUTPUT_WAIT_TIMEOUT) + bag_command.wait_for_shutdown(timeout=SHUTDOWN_TIMEOUT) assert bag_command.terminated assert bag_command.exit_code != launch_testing.asserts.EXIT_OK - expected_string_regex = re.compile(ERROR_STRING) matches = expected_string_regex.search(bag_command.output) - assert matches, print('ros2bag CLI did not produce the expected output') + assert matches, print( + ERROR_STRING_MSG.format(expected_string_regex.pattern, bag_command.output)) def test_nonexistent_qos_profile(self): profile_path = PROFILE_PATH / 'foobar.yaml' output_path = Path(self.tmpdir.name) / 'ros2bag_test_nonexistent' arguments = ['record', '-a', '--qos-profile-overrides-path', profile_path.as_posix(), '--output', output_path.as_posix()] + expected_string_regex = re.compile( + r'ros2 bag record: error: argument --qos-profile-overrides-path: can\'t open') with self.launch_bag_command(arguments=arguments) as bag_command: - time.sleep(3) - bag_command.wait_for_shutdown(timeout=5) + bag_command.wait_for_output( + condition=lambda output: expected_string_regex.search(output) is not None, + timeout=OUTPUT_WAIT_TIMEOUT) + bag_command.wait_for_shutdown(timeout=SHUTDOWN_TIMEOUT) assert bag_command.terminated assert bag_command.exit_code != launch_testing.asserts.EXIT_OK - expected_string_regex = re.compile( - r"ros2 bag record: error: argument --qos-profile-overrides-path: can't open") matches = expected_string_regex.search(bag_command.output) - assert matches, print('ros2bag CLI did not produce the expected output') + assert matches, print( + ERROR_STRING_MSG.format(expected_string_regex.pattern, bag_command.output))