Skip to content

Commit

Permalink
Add "cancel_timeout" parameter (#21)
Browse files Browse the repository at this point in the history
Co-authored-by: Leonardo Oliveira Wellausen <leonardo.wellausen@karelics.fi>
  • Loading branch information
lowellausen and lowellausen-karelics authored Sep 9, 2024
1 parent cc88675 commit 73ca98b
Show file tree
Hide file tree
Showing 9 changed files with 16 additions and 19 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ The following tasks are available by default from the Task Manager
| \<task>.cancel_on_stop | bool | False | Whether the task should be cancelled when "STOP" task is executed. |
| \<task>.reentrant | bool | False | Allows executing multiple goals for the same task in parallel. Note that the service or action implementing the task logic should also use a reentrant callback group for enabling of this option to make sense. |
| \<task>.service_success_field | string | "" | A usual way for ROS services is to provide their successful execution status for example with the "success" field in the response message. Specify here the name of this field if you wish your task to automatically set its status to ERROR when this field value is `False`. If left empty, the task will always finish with DONE status. <br/><br/> Note: Works only for the tasks that implement their logic with a service. |
| \<task>.cancel_timeout | float | 5.0 | Time, in seconds, to wait when cancelling an ongoing task. If the time is exceeded before the cancelling is done, the task will fail. |
| enable_task_servers | bool | False | Creates new service and action topics for all the declared tasks under `/task_manager/task/<task_name>` topic, to allow easy task calling from the CLI using the ROS message interfaces instead of JSON format. Should be used for debugging and development purposes only, since the preferred task starting method through `/task_manager/execute_task` action topic allows user to also set Task ID and source. |

The parameters that have their default as "-" are mandatory.

## Getting started
Expand Down
12 changes: 5 additions & 7 deletions task_manager/task_manager/task_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ def __init__(
self._goal_handle: Optional[ClientGoalHandle] = None
self._result_future: Optional[Future] = None
self.server_wait_timeout = 10.0
self.cancel_task_timeout = 5.0 # Timeout to wait for task to cancel

@property
def task_details(self) -> TaskDetails:
Expand Down Expand Up @@ -164,13 +163,13 @@ def cancel_task(self) -> None:

# In some rare cases the goal might already be done at this point. If not, cancel it.
if self._goal_handle.status not in self.DONE_STATES:
response = self._request_canceling(self.cancel_task_timeout)
response = self._request_canceling(self._task_specs.cancel_timeout)
self._handle_cancel_response(response)

# Wait until _goal_done_cb is called and callbacks have been notified
if not self.goal_done.wait(timeout=self.cancel_task_timeout):
if not self.goal_done.wait(timeout=self._task_specs.cancel_timeout):
raise CancelTaskFailedError(
f"Task didn't finish within {self.cancel_task_timeout} second timeout after it was cancelled. "
f"Task didn't finish within {self._task_specs.cancel_timeout} second timeout after it was cancelled. "
f"Is the task cancel implemented correctly?"
)

Expand Down Expand Up @@ -316,7 +315,6 @@ def __init__(
)

self._client = self._service_clients[task_specs.task_name]
self.cancel_task_timeout = 5.0 # Timeout to wait for task to cancel

@property
def task_details(self) -> TaskDetails:
Expand Down Expand Up @@ -360,9 +358,9 @@ def cancel_task(self) -> None:
return
self._node.get_logger().warn(
f"Currently ongoing service call to {self._task_specs.topic} cannot be cancelled. "
f"Waiting for {self.cancel_task_timeout} seconds for the task to finish."
f"Waiting for {self._task_specs.cancel_timeout} seconds for the task to finish."
)
if not self.goal_done.wait(self.cancel_task_timeout):
if not self.goal_done.wait(self._task_specs.cancel_timeout):
raise CancelTaskFailedError(f"Service call to {self._task_specs.topic} cannot be cancelled.")

def _done_callback(self, future):
Expand Down
1 change: 1 addition & 0 deletions task_manager/task_manager/task_manager_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def declare_tasks(self):
msg_interface=msg_interface,
task_server_type=detect_task_server_type(msg_interface),
service_success_field=service_success_field,
cancel_timeout=self.declare_parameter(f"{task}.cancel_timeout", 5.0).value,
)
self.known_tasks[task_specs.task_name] = task_specs

Expand Down
3 changes: 0 additions & 3 deletions task_manager/task_manager/task_registrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ def __init__(
self._action_clients: Dict[str, ActionClient] = {} # [task_name, ActionClient]
self._service_clients: Dict[str, Client] = {} # [task_name, Client]

self.cancel_task_timeout = 5

def start_new_task(self, request: ExecuteTask.Goal, task_specs: TaskSpecs) -> TaskClient:
"""Starts a task and cancels any conflicting tasks.
Expand Down Expand Up @@ -118,7 +116,6 @@ def start_new_task(self, request: ExecuteTask.Goal, task_specs: TaskSpecs) -> Ta
)
msg_interface = task_specs.msg_interface.Request()

task_client.cancel_task_timeout = self.cancel_task_timeout
task_client.register_done_callback(self._task_done_cb)

task_goal_message = populate_msg(task_data=request.task_data, msg_interface=msg_interface)
Expand Down
1 change: 1 addition & 0 deletions task_manager/task_manager/task_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ class TaskSpecs:
cancel_reported_as_success: bool = False
reentrant: bool = False
service_success_field: str = ""
cancel_timeout: float = 5.0
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ def add_task(self, task_specs: TaskSpecs):
Parameter(name=f"{task_specs.task_name}.msg_interface", value=task_specs.msg_interface),
Parameter(name=f"{task_specs.task_name}.blocking", value=task_specs.blocking),
Parameter(name=f"{task_specs.task_name}.cancel_on_stop", value=task_specs.cancel_on_stop),
Parameter(name=f"{task_specs.task_name}.cancel_timeout", value=task_specs.cancel_timeout),
]
)
self._tasks.append(task_specs.task_name)
Expand Down Expand Up @@ -239,6 +240,7 @@ def add_service_task_add_two_ints(self, task_name, blocking=False):
task_server_type=TaskServerType.SERVICE,
blocking=blocking,
cancel_on_stop=False,
cancel_timeout=1.5,
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ def sleep_cancel_callback(_):
sleep(0.5)
return CancelResponse.ACCEPT

self._task_specs.cancel_timeout = 0.2
client = ActionTaskClient(self._node, self._task_details, self._task_specs, action_clients={})
client.cancel_task_timeout = 0.2

self.fibonacci_server.register_cancel_callback(sleep_cancel_callback)
client.start_task_async(Fibonacci.Goal(order=5))
Expand All @@ -181,8 +181,8 @@ def execute_cb(_goal_handle):
sleep(1)
return Fibonacci.Result()

self._task_specs.cancel_timeout = 0.2
client = ActionTaskClient(self._node, self._task_details, self._task_specs, action_clients={})
client.cancel_task_timeout = 0.2
self.fibonacci_server.register_execute_callback(execute_cb)
client.start_task_async(Fibonacci.Goal(order=0))
with self.assertRaises(CancelTaskFailedError):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,25 +137,25 @@ def test_cancel(self):
"""
service_clients = {}
with self.subTest("Happy flow"):
self._task_specs.cancel_timeout = 2.0
task_client = ServiceTaskClient(
self._node,
task_details=self._task_details,
task_specs=self._task_specs,
service_clients=service_clients,
)
task_client.cancel_task_timeout = 2.0
task_client.start_task_async(AddTwoInts.Request(a=1, b=0))
task_client.cancel_task()
self.assertEqual(task_client.task_details.status, TaskStatus.DONE)

with self.subTest("Service execution takes longer than our waiting timeout is"):
self._task_specs.cancel_timeout = 0.5
task_client = ServiceTaskClient(
self._node,
task_details=self._task_details,
task_specs=self._task_specs,
service_clients=service_clients,
)
task_client.cancel_task_timeout = 0.5
task_client.start_task_async(AddTwoInts.Request(a=1, b=0))
with self.assertRaises(CancelTaskFailedError):
task_client.cancel_task()
Expand All @@ -165,13 +165,13 @@ def test_cancel(self):
with self.subTest("Task had already finished when the cancel is called"):
logger_mock = Mock()
self._node.get_logger = Mock(return_value=logger_mock)
self._task_specs.cancel_timeout = 0.5
task_client = ServiceTaskClient(
self._node,
task_details=self._task_details,
task_specs=self._task_specs,
service_clients=service_clients,
)
task_client.cancel_task_timeout = 0.5
task_client.start_task_async(AddTwoInts.Request(a=0, b=0))
self.assertTrue(task_client.goal_done.wait(1))
task_client.cancel_task()
Expand Down
4 changes: 1 addition & 3 deletions task_manager/test/integration_tests/test_task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,7 @@ def test_two_non_blocking_tasks(self) -> None:
def test_blocking_task_cancelling_fails(self) -> None:
"""Sometimes blocking task cancelling might fail, for example if we try to cancel a service that executes for
more than the cancel timeout."""
self.task_manager_node.task_registrator.cancel_task_timeout = 0.1

goal_handle_1 = self.start_add_two_ints_service_task(run_time_secs=1)
goal_handle_1 = self.start_add_two_ints_service_task(run_time_secs=2)

# New blocking Service Task should cancel the previous task
goal_handle_2 = self.start_add_two_ints_service_task(run_time_secs=1)
Expand Down

0 comments on commit 73ca98b

Please sign in to comment.