Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ros2bag test_record] Gets rid of time.sleep and move to using command.wait_for_output #525

Merged
merged 5 commits into from
Sep 22, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 36 additions & 18 deletions ros2bag/test/test_record_qos_profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@
PROFILE_PATH = Path(__file__).parent / 'resources'
TEST_NODE = 'ros2bag_record_qos_profile_test_node'
TEST_NAMESPACE = 'ros2bag_record_qos_profile'
ERROR_STRING = r'\[ERROR] \[ros2bag]:'
ERROR_STRING_MSG = 'ros2bag CLI did not produce the expected output'\
'\n Expected output pattern: {}\n Actual output: {}'
OUTPUT_WAIT_TIMEOUT = 10
SHUTDOWN_TIMEOUT = 5


@pytest.mark.rostest
Expand Down Expand Up @@ -81,52 +84,67 @@ def test_qos_simple(self):
output_path = Path(self.tmpdir.name) / 'ros2bag_test_basic'
arguments = ['record', '-a', '--qos-profile-overrides-path', profile_path.as_posix(),
'--output', output_path.as_posix()]
expected_string_regex = re.compile(
r'\[rosbag2_storage]: Opened database .* for READ_WRITE')
with self.launch_bag_command(arguments=arguments) as bag_command:
time.sleep(3)
bag_command.wait_for_shutdown(timeout=5)
bag_command.wait_for_output(
condition=lambda output: expected_string_regex.search(output) is not None,
timeout=OUTPUT_WAIT_TIMEOUT)
bag_command.wait_for_shutdown(timeout=SHUTDOWN_TIMEOUT)
assert bag_command.terminated
expected_string_regex = re.compile(ERROR_STRING)
matches = expected_string_regex.search(bag_command.output)
assert not matches, print('ros2bag CLI did not produce the expected output')
assert matches, print(
ERROR_STRING_MSG.format(expected_string_regex.pattern, bag_command.output))

def test_incomplete_qos_profile(self):
profile_path = PROFILE_PATH / 'incomplete_qos_profile.yaml'
output_path = Path(self.tmpdir.name) / 'ros2bag_test_incomplete'
arguments = ['record', '-a', '--qos-profile-overrides-path', profile_path.as_posix(),
'--output', output_path.as_posix()]
expected_string_regex = re.compile(
r'\[rosbag2_storage]: Opened database .* for READ_WRITE')
with self.launch_bag_command(arguments=arguments) as bag_command:
time.sleep(3)
bag_command.wait_for_shutdown(timeout=5)
bag_command.wait_for_output(
condition=lambda output: expected_string_regex.search(output) is not None,
timeout=OUTPUT_WAIT_TIMEOUT)
bag_command.wait_for_shutdown(timeout=SHUTDOWN_TIMEOUT)
assert bag_command.terminated
expected_string_regex = re.compile(ERROR_STRING)
matches = expected_string_regex.search(bag_command.output)
assert not matches, print('ros2bag CLI did not produce the expected output')
assert matches, print(
ERROR_STRING_MSG.format(expected_string_regex.pattern, bag_command.output))

def test_incomplete_qos_duration(self):
profile_path = PROFILE_PATH / 'incomplete_qos_duration.yaml'
output_path = Path(self.tmpdir.name) / 'ros2bag_test_incomplete_duration'
arguments = ['record', '-a', '--qos-profile-overrides-path', profile_path.as_posix(),
'--output', output_path.as_posix()]
expected_string_regex = re.compile(
r'\[ERROR] \[ros2bag]: Time overrides must include both')
with self.launch_bag_command(arguments=arguments) as bag_command:
time.sleep(3)
bag_command.wait_for_shutdown(timeout=5)
bag_command.wait_for_output(
condition=lambda output: expected_string_regex.search(output) is not None,
timeout=OUTPUT_WAIT_TIMEOUT)
bag_command.wait_for_shutdown(timeout=SHUTDOWN_TIMEOUT)
assert bag_command.terminated
assert bag_command.exit_code != launch_testing.asserts.EXIT_OK
expected_string_regex = re.compile(ERROR_STRING)
matches = expected_string_regex.search(bag_command.output)
assert matches, print('ros2bag CLI did not produce the expected output')
assert matches, print(
ERROR_STRING_MSG.format(expected_string_regex.pattern, bag_command.output))

def test_nonexistent_qos_profile(self):
profile_path = PROFILE_PATH / 'foobar.yaml'
output_path = Path(self.tmpdir.name) / 'ros2bag_test_nonexistent'
arguments = ['record', '-a', '--qos-profile-overrides-path', profile_path.as_posix(),
'--output', output_path.as_posix()]
expected_string_regex = re.compile(
r'ros2 bag record: error: argument --qos-profile-overrides-path: can\'t open')
with self.launch_bag_command(arguments=arguments) as bag_command:
time.sleep(3)
bag_command.wait_for_shutdown(timeout=5)
bag_command.wait_for_output(
condition=lambda output: expected_string_regex.search(output) is not None,
timeout=OUTPUT_WAIT_TIMEOUT)
bag_command.wait_for_shutdown(timeout=SHUTDOWN_TIMEOUT)
assert bag_command.terminated
assert bag_command.exit_code != launch_testing.asserts.EXIT_OK
expected_string_regex = re.compile(
r"ros2 bag record: error: argument --qos-profile-overrides-path: can't open")
matches = expected_string_regex.search(bag_command.output)
assert matches, print('ros2bag CLI did not produce the expected output')
assert matches, print(
ERROR_STRING_MSG.format(expected_string_regex.pattern, bag_command.output))