Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check error flags when canceling an action task #19

Merged
merged 8 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions task_manager/mypy.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
[mypy]
ignore_missing_imports = True

# Python version to be compatible with
python_version = 3.8

#
# Define type checking settings. These settings are pretty strict.
#

# Warns about per-module sections in the config file that do not match any files processed when invoking mypy.
warn_unused_configs = True

# Disallows usage of generic types that do not specify explicit type parameters.
disallow_any_generics = True

# Disallows subclassing a value of type Any.
disallow_subclassing_any = True

# Disallows calling functions without type annotations from functions with type annotations.
# Hopefully this makes adding types more attractive.
disallow_untyped_calls = True

# Disallows defining functions without type annotations or with incomplete type annotations.
disallow_untyped_defs = False

# Disallows defining functions with incomplete type annotations.
disallow_incomplete_defs = True

# Type-checks the interior of functions without type annotations.
check_untyped_defs = True

# Reports an error whenever a function with type annotations is decorated with a decorator without annotations.
disallow_untyped_decorators = True

# Warns about casting an expression to its inferred type.
warn_redundant_casts = True

# Warns about unneeded # type: ignore comments.
warn_unused_ignores = True

# Shows errors for missing return statements on some execution paths.
warn_return_any = True

# Shows a warning when encountering any code inferred to be unreachable or redundant after performing type analysis.
warn_unreachable = True

# By default, imported values to a module are treated as exported and mypy allows other modules to import them. When
# false, mypy will not re-export unless the item is imported using from-as or is included in __all__.
implicit_reexport = false

# Prohibit equality checks, identity checks, and container checks between non-overlapping types.
strict_equality = True

# Changes the treatment of arguments with a default value of None by not implicitly making their type Optional.
no_implicit_optional = True
103 changes: 76 additions & 27 deletions task_manager/task_manager/task_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
from rclpy.node import Node

# ROS messages
from action_msgs.msg import GoalInfo, GoalStatus
from action_msgs.msg import GoalStatus
from action_msgs.srv import CancelGoal

# Task Manager messages
from task_manager_msgs.msg import TaskStatus
Expand Down Expand Up @@ -65,6 +66,8 @@ def cancel_task(self) -> None:
class ActionTaskClient(TaskClient):
"""Task client that keeps track of a single Action task."""

DONE_STATES = [GoalStatus.STATUS_SUCCEEDED, GoalStatus.STATUS_ABORTED, GoalStatus.STATUS_CANCELED]

def __init__(
self,
node: Node,
Expand Down Expand Up @@ -94,6 +97,7 @@ def __init__(
self._client: ActionClient = action_clients[task_specs.task_name]

self._goal_handle: Optional[ClientGoalHandle] = None
self._result_future: Optional[Future] = None
self.server_wait_timeout = 10.0
self.cancel_task_timeout = 5.0 # Timeout to wait for task to cancel

Expand Down Expand Up @@ -147,27 +151,21 @@ def start_task_async(self, goal_message: Any) -> None:
raise TaskStartError(f"Goal was not accepted by the action server for the task {self.task_specs.task_name}")

self.task_details.status = TaskStatus.IN_PROGRESS
future: Future = self._goal_handle.get_result_async()
future.add_done_callback(self._goal_done_cb)
self._result_future = self._goal_handle.get_result_async()
self._result_future.add_done_callback(self._goal_done_cb)

def cancel_task(self) -> None:
"""
"""Cancel the task.

:raises CancelTaskFailedError: If cancel request fails, due to timeout or other
"""
if not self._goal_handle:
raise CancelTaskFailedError("Couldn't cancel the task, goal handle does not exist!")

# In some rare cases the goal might already be done at this point. If not, cancel it.
done_states = [GoalStatus.STATUS_SUCCEEDED, GoalStatus.STATUS_ABORTED, GoalStatus.STATUS_CANCELED]
if self._goal_handle.status not in done_states:
# There seems to be a bug in rclpy, making the return code to be 0 (ERROR_NONE),
# no matter if the cancel was rejected or accepted. So checking instead if the
# goal is within the cancelling goals.
goals_canceling = self._request_canceling(self.cancel_task_timeout)
goal_ids_cancelling = [goal_info.goal_id for goal_info in goals_canceling]
if self._goal_handle.goal_id not in goal_ids_cancelling:
self._node.get_logger().error(
f"Couldn't cancel the task. Action server {self.task_specs.topic} did not "
f"accept to cancel the goal."
)
raise CancelTaskFailedError("Couldn't cancel the task!")
if self._goal_handle.status not in self.DONE_STATES:
response = self._request_canceling(self.cancel_task_timeout)
self._handle_cancel_response(response)

# Wait until _goal_done_cb is called and callbacks have been notified
if not self.goal_done.wait(timeout=self.cancel_task_timeout):
Expand All @@ -176,7 +174,15 @@ def cancel_task(self) -> None:
f"Is the task cancel implemented correctly?"
)

def _request_canceling(self, timeout: float) -> List[GoalInfo]:
def _request_canceling(self, timeout: float) -> CancelGoal.Response:
"""Requests canceling for the goal and returns the cancel response.

:raises CancelTaskFailedError: If the cancel request timed out
:param timeout: Time to wait for cancel request to pass
:return: CancelGoal.Response
"""
if not self._goal_handle:
raise CancelTaskFailedError("Couldn't cancel the task, goal handle does not exist!")
future = self._goal_handle.cancel_goal_async()
try:
self._wait_for_future_to_complete(future, timeout=timeout)
Expand All @@ -185,17 +191,65 @@ def _request_canceling(self, timeout: float) -> List[GoalInfo]:
f"Timeout while waiting response to cancel request from server {self.task_specs.task_name}: {str(e)}."
)
raise CancelTaskFailedError("Cancel request timed out.") from e
return future.result().goals_canceling
return future.result()

def _handle_cancel_response(self, response: CancelGoal.Response) -> None:
"""Handles the response from the cancel request.

:raises CancelTaskFailedError: If the cancel request fails
"""
# There seems to be a bug in rclpy, making the return code to be 0 (ERROR_NONE),
# no matter if the cancel was rejected or accepted. So checking instead if the
# goal is within the cancelling goals.
if response.return_code == CancelGoal.Response.ERROR_UNKNOWN_GOAL_ID:
self._node.get_logger().info(
f"Action server {self.task_specs.topic} did not recognize the goal id. "
f"Maybe server has restarted during the task execution and the goal no longer exists. "
f"Considering the task canceled."
)
if not self._result_future:
raise CancelTaskFailedError("Couldn't cancel the task result future since it does not exist!")
self._result_future.cancel()

elif response.return_code == CancelGoal.Response.ERROR_GOAL_TERMINATED:
self._node.get_logger().info(
f"Action server {self.task_specs.topic} did not accept to cancel the goal. "
f"Goal seems to have already finished. Considering the task canceled."
)
if not self._result_future:
raise CancelTaskFailedError("Couldn't cancel the task result future since it does not exist!")
self._result_future.cancel()

else:
if not self._goal_handle:
raise CancelTaskFailedError("Couldn't cancel the task, goal handle does not exist!")
goal_ids_cancelling = [goal_info.goal_id for goal_info in response.goals_canceling]
if self._goal_handle.goal_id not in goal_ids_cancelling:
self._node.get_logger().error(
f"Couldn't cancel the task. Action server {self.task_specs.topic} did not "
f"accept to cancel the goal."
)
raise CancelTaskFailedError("Couldn't cancel the task!")

def _goal_done_cb(self, future: Future) -> None:
"""Called when the Action Client's goal finishes. Updates the task status and notifies callbacks further the
other callbacks.
"""Called when the Action Client's goal finishes. Updates the task status and invokes task done callbacks.

:param future: Future object giving the result of the action call.
"""
if future.cancelled():
self.task_details.status = TaskStatus.CANCELED
jonipol marked this conversation as resolved.
Show resolved Hide resolved
self.task_details.result = self.task_specs.msg_interface.Result()
jonipol marked this conversation as resolved.
Show resolved Hide resolved
else:
self._fill_in_task_details(future)

for callback in self._task_done_callbacks:
callback(self.task_specs, self.task_details)
self.goal_done.set()

def _fill_in_task_details(self, future: Future) -> None:
"""Fills in the task details based on the future result."""
result = future.result()
goal_status = result.status

try:
end_goal_status = ros_goal_status_to_task_status(goal_status)
except RuntimeError as e:
Expand All @@ -208,13 +262,8 @@ def _goal_done_cb(self, future: Future) -> None:
self.task_details.status = TaskStatus.DONE
else:
self.task_details.status = end_goal_status

self.task_details.result = result.result

for callback in self._task_done_callbacks:
callback(self.task_specs, self.task_details)
self.goal_done.set()

@staticmethod
def _wait_for_future_to_complete(future: Future, timeout: Optional[float]) -> None:
event = Event()
Expand Down
3 changes: 2 additions & 1 deletion task_manager/task_manager/task_registrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def _cancel_task_of_same_type(self, task_name: str) -> None:
active_task_clients = self._active_tasks.get_active_tasks_by_name(task_name)
if not active_task_clients:
return

self._node.get_logger().info(f"Detected running task of the same type '{task_name}', canceling it.")
if len(active_task_clients) > 1:
self._node.get_logger().error("Found multiple task clients with the same name!")

Expand All @@ -167,6 +167,7 @@ def _cancel_active_blocking_task(self) -> None:
return

try:
self._node.get_logger().info("Detected running blocking task, canceling it.")
task_client.cancel_task()
except CancelTaskFailedError as e:
raise TaskStartError(
Expand Down
5 changes: 3 additions & 2 deletions task_manager/test/integration_tests/mock_servers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
from rclpy.action.server import ActionServer, CancelResponse, ServerGoalHandle
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup, ReentrantCallbackGroup
from rclpy.node import Node
from rclpy.service import Service

# ROS messages
from example_interfaces.action import Fibonacci
from example_interfaces.srv import AddTwoInts


def create_fib_action_server(node, action_name):
def create_fib_action_server(node: Node, action_name: str) -> ActionServer:
"""Action server that execution time depends on the given Fibonacci goal."""
return ActionServer(
node=node,
Expand Down Expand Up @@ -97,7 +98,7 @@ def _execute_cb(goal_handle: ServerGoalHandle) -> Fibonacci.Result:
return result


def create_add_two_ints_service(node: Node, service_name):
def create_add_two_ints_service(node: Node, service_name: str) -> Service:
"""Creates a AddTwoInts service."""

def service_callback(req: AddTwoInts.Request, result: AddTwoInts.Response) -> AddTwoInts.Response:
Expand Down
55 changes: 54 additions & 1 deletion task_manager/test/integration_tests/test_action_task_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
# ROS
import rclpy
from rclpy.action import CancelResponse, GoalResponse
from rclpy.action.client import ClientGoalHandle
from rclpy.executors import MultiThreadedExecutor

# Thirdparty
Expand All @@ -37,7 +38,7 @@
from task_manager.task_details import TaskDetails
from task_manager.task_specs import TaskServerType, TaskSpecs

# pylint: disable=duplicate-code
# pylint: disable=duplicate-code, protected-access
# The init is very similar to test_service_task_client, but it is fine in this case


Expand Down Expand Up @@ -191,6 +192,58 @@ def execute_cb(_goal_handle):
# Finally, wait for the goal to finish to avoid error logs
client.goal_done.wait(timeout=5)

def test_cancel_task_goal_not_known_by_server(self) -> None:
"""Action server doesn't know the goal when trying to cancel it."""
client = ActionTaskClient(self._node, self._task_details, self._task_specs, action_clients={})
client.start_task_async(Fibonacci.Goal(order=1))
self.fibonacci_server.destroy()
self.fibonacci_server = create_fib_action_server(self._node, "fibonacci")
client.cancel_task()
self.assertEqual(client.task_details.status, TaskStatus.CANCELED)

def test_goal_done_cb_called_only_once(self) -> None:
"""Checks that goal_done_cb is ran only once per goal and we do not throw error for the cancel."""

def execute_cb(_goal_handle):
_goal_handle.succeed()
return Fibonacci.Result()

client = ActionTaskClient(self._node, self._task_details, self._task_specs, action_clients={})
self.fibonacci_server.register_execute_callback(execute_cb)
client.start_task_async(Fibonacci.Goal(order=1))
client.goal_done.wait(timeout=1)
handle = ClientGoalHandle(
goal_id=client._goal_handle.goal_id, # type: ignore[union-attr]
action_client=client._client,
goal_response=Fibonacci.Result,
)
client._goal_handle = handle
client.cancel_task()
self.assertEqual(client.task_details.status, TaskStatus.DONE)

def test_cancel_task_goal_terminated_before_cancel(self) -> None:
"""Case where the goal has already been finished when trying to cancel it.

Checks that goal_done_cb is ran only once per goal and we do not throw error for the cancel.
"""
client = ActionTaskClient(self._node, self._task_details, self._task_specs, action_clients={})
client.start_task_async(Fibonacci.Goal(order=0))
client.goal_done.wait(timeout=1)

# Reset state. Simulates a state where goal has finished but the response did not arrive to task manager
handle = ClientGoalHandle(
goal_id=client._goal_handle.goal_id, # type: ignore[union-attr]
action_client=client._client,
goal_response=Fibonacci.Result,
)
client._goal_handle = handle
client._result_future._done = False # type: ignore[union-attr]
client._result_future.add_done_callback(client._goal_done_cb) # type: ignore[union-attr]
client.goal_done.clear()

client.cancel_task()
self.assertEqual(client.task_details.status, TaskStatus.CANCELED)


if __name__ == "__main__":
unittest.main()
30 changes: 28 additions & 2 deletions task_manager/test/test_task_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# ------------------------------------------------------------------

import unittest
from unittest.mock import Mock
from unittest.mock import Mock, patch

# ROS
from rclpy.node import Node
Expand All @@ -26,13 +26,14 @@

# ROS messages
from action_msgs.msg import GoalStatus
from action_msgs.srv import CancelGoal
from example_interfaces.action import Fibonacci

# Task Manager messages
from task_manager_msgs.msg import TaskStatus

# Task Manager
from task_manager.task_client import ActionTaskClient, ServiceTaskClient
from task_manager.task_client import ActionTaskClient, CancelTaskFailedError, ServiceTaskClient
from task_manager.task_details import TaskDetails

# pylint: disable=protected-access
Expand Down Expand Up @@ -88,6 +89,31 @@ def test_done_cb_bad_status(self) -> None:
result = extract_values(task_client.task_details.result)
self.assertEqual(result, {"sequence": []}, msg=str(task_client.task_details.result))

def test_cancel_task_no_goal_handle(self):
"""Tests that we do not crash if goal handle does not exist."""
task_client = get_action_task_client("task_1")
self.assertRaises(CancelTaskFailedError, task_client.cancel_task)

def test_request_canceling_no_goal_handle(self):
"""Tests that we do not crash if goal handle does not exist."""
task_client = get_action_task_client("task_1")
self.assertRaises(CancelTaskFailedError, task_client._request_canceling, 1)

@patch("task_manager.task_client.ActionTaskClient._request_canceling")
def test_cancel_task_no_result_future(self, mock_request_canceling: Mock):
"""Tests that we do not crash if result future does not exist."""
cases = [
{"case": "Unknown goal id", "return_code": CancelGoal.Response.ERROR_UNKNOWN_GOAL_ID},
{"case": "Goal terminated", "return_code": CancelGoal.Response.ERROR_GOAL_TERMINATED},
]

task_client = get_action_task_client("task_1")
task_client._goal_handle = Mock()
for case in cases:
mock_request_canceling.return_value = CancelGoal.Response(return_code=case["return_code"])
with self.subTest(case["case"]):
self.assertRaises(CancelTaskFailedError, task_client.cancel_task)


class ServiceTaskClientUnittests(unittest.TestCase):
"""Unittests for ServiceTaskClient.
Expand Down
Loading