Skip to content

Commit

Permalink
Add tests for issue ros2#180 functionality
Browse files Browse the repository at this point in the history
  - Check that timers will shut down a launched system
  - Check that shutdown will cancel a timer without cancel_on_shutdown=False
  - Check that shutdown will not cancel a timre with cancel_on_shutdown=True
  • Loading branch information
Pete Baughman committed Feb 14, 2019
1 parent b8c3710 commit 1296c4e
Showing 1 changed file with 115 additions and 2 deletions.
117 changes: 115 additions & 2 deletions launch/test/launch/test_timer_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@
# limitations under the License.


"""Tests for the TimerAction Action."""
import sys

import launch
import launch.actions
import launch.event_handlers


def test_multiple_launch_with_timers():
Expand All @@ -24,6 +28,11 @@ def test_multiple_launch_with_timers():

def generate_launch_description():
return launch.LaunchDescription([

launch.actions.ExecuteProcess(
cmd=[sys.executable, '-c', 'while True: pass'],
),

launch.actions.TimerAction(
period='1',
actions=[
Expand All @@ -34,9 +43,113 @@ def generate_launch_description():

ls = launch.LaunchService()
ls.include_launch_description(generate_launch_description())
assert 0 == ls.run(shutdown_when_idle=False) # Always works
assert 0 == ls.run() # Always works

ls = launch.LaunchService()
ls.include_launch_description(generate_launch_description())
# Next line hangs forever before https://github.com/ros2/launch/issues/183 was fixed.
assert 0 == ls.run(shutdown_when_idle=False)
assert 0 == ls.run()


def _shutdown_listener_factory(reasons_arr):
return launch.actions.RegisterEventHandler(
launch.event_handlers.OnShutdown(
on_shutdown=lambda event, context: reasons_arr.append(event)
)
)


def test_timer_action_sanity_check():
"""Test that timer actions work (sanity check)."""
# This test is structured like test_shutdown_preempts_timers and
# test_timer_can_block_preemption as a sanity check that the shutdown listener
# and other launch related infrastructure works as expected
shutdown_reasons = []

ld = launch.LaunchDescription([
launch.actions.ExecuteProcess(
cmd=[sys.executable, '-c', 'while True: pass'],
),

launch.actions.TimerAction(
period='1',
actions=[
launch.actions.Shutdown(reason='One second timeout')
]
),

_shutdown_listener_factory(shutdown_reasons),
])

ls = launch.LaunchService()
ls.include_launch_description(ld)
assert 0 == ls.run()
assert shutdown_reasons[0].reason == 'One second timeout'


def test_shutdown_preempts_timers():
shutdown_reasons = []

ld = launch.LaunchDescription([

launch.actions.ExecuteProcess(
cmd=[sys.executable, '-c', 'while True: pass'],
),

launch.actions.TimerAction(
period='1',
actions=[
launch.actions.Shutdown(reason='fast shutdown')
]
),

launch.actions.TimerAction(
period='2',
actions=[
launch.actions.Shutdown(reason='slow shutdown')
]
),

_shutdown_listener_factory(shutdown_reasons),
])

ls = launch.LaunchService()
ls.include_launch_description(ld)
assert 0 == ls.run()
assert len(shutdown_reasons) == 1
assert shutdown_reasons[0].reason == 'fast shutdown'


def test_timer_can_block_preemption():
shutdown_reasons = []

ld = launch.LaunchDescription([

launch.actions.ExecuteProcess(
cmd=[sys.executable, '-c', 'while True: pass'],
),

launch.actions.TimerAction(
period='1',
actions=[
launch.actions.Shutdown(reason='fast shutdown')
]
),

launch.actions.TimerAction(
period="2",
actions=[
launch.actions.Shutdown(reason='slow shutdown')
],
cancel_on_shutdown=False # Preempted in test_shutdown_preempts_timers, but not here
),

_shutdown_listener_factory(shutdown_reasons),
])

ls = launch.LaunchService()
ls.include_launch_description(ld)
assert 0 == ls.run()
assert len(shutdown_reasons) == 2 # Should see 'shutdown' event twice because
assert shutdown_reasons[0].reason == 'fast shutdown'
assert shutdown_reasons[1].reason == 'slow shutdown'

0 comments on commit 1296c4e

Please sign in to comment.