From c79fdfbeaa9e197f0c521c0cdd49cf8fd9d2e8ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joni=20P=C3=B6ll=C3=A4nen?= Date: Tue, 10 Sep 2024 13:26:11 +0300 Subject: [PATCH 1/7] A bit of refactoring and adding return types MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Joni Pöllänen --- .../task_manager/task_manager_node.py | 18 ++----- .../task_manager/tasks/system_tasks.py | 53 +++++++++++++------ .../task_manager_test_utils.py | 6 +-- .../integration_tests/test_system_tasks.py | 2 +- 4 files changed, 44 insertions(+), 35 deletions(-) diff --git a/task_manager/task_manager/task_manager_node.py b/task_manager/task_manager/task_manager_node.py index 4692cc2..33ab00c 100755 --- a/task_manager/task_manager/task_manager_node.py +++ b/task_manager/task_manager/task_manager_node.py @@ -29,7 +29,7 @@ from rclpy import Parameter from rclpy.action import ActionServer from rclpy.action.server import CancelResponse, ServerGoalHandle -from rclpy.callback_groups import MutuallyExclusiveCallbackGroup, ReentrantCallbackGroup +from rclpy.callback_groups import ReentrantCallbackGroup from rclpy.executors import MultiThreadedExecutor from rclpy.node import Node from rclpy.qos import QoSDurabilityPolicy, QoSProfile, QoSReliabilityPolicy @@ -40,7 +40,6 @@ # Task Manager messages from task_manager_msgs.action import ExecuteTask from task_manager_msgs.msg import ActiveTask, ActiveTaskArray, TaskDoneResult, TaskStatus -from task_manager_msgs.srv import CancelTasks, StopTasks # Task Manager from task_manager.active_tasks import ActiveTasks @@ -159,19 +158,8 @@ def setup_system_tasks(self): stop_topic = "_" + stop_topic cancel_topic = "_" + cancel_topic - stop_service = StopTasksService(self, active_tasks=self.active_tasks) - self.create_service( - StopTasks, stop_topic, callback=stop_service.service_cb, callback_group=MutuallyExclusiveCallbackGroup() - ) - - cancel_service = CancelTasksService(self, active_tasks=self.active_tasks) - self.create_service( - CancelTasks, - cancel_topic, - callback=cancel_service.service_cb, - callback_group=MutuallyExclusiveCallbackGroup(), - ) - + stop_service = StopTasksService(self, topic=stop_topic, active_tasks=self.active_tasks) + cancel_service = CancelTasksService(self, topic=cancel_topic, active_tasks=self.active_tasks) mission = Mission(self, action_name=mission_topic, execute_task_cb=self.execute_task) self.known_tasks["system/stop"] = stop_service.get_task_specs(stop_topic) diff --git a/task_manager/task_manager/tasks/system_tasks.py b/task_manager/task_manager/tasks/system_tasks.py index 7328b75..0df1a64 100644 --- a/task_manager/task_manager/tasks/system_tasks.py +++ b/task_manager/task_manager/tasks/system_tasks.py @@ -13,9 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. # ------------------------------------------------------------------ - +import time +from abc import ABC, abstractmethod # ROS +import rclpy +from rclpy.action.server import ActionServer, CancelResponse, ServerGoalHandle +from rclpy.callback_groups import MutuallyExclusiveCallbackGroup from rclpy.node import Node # Task Manager messages @@ -27,27 +31,40 @@ from task_manager.task_specs import TaskServerType, TaskSpecs -class StopTasksService: +class SystemTask(ABC): # pylint: disable=too-few-public-methods + """Abstract class for system tasks.""" + + @staticmethod + @abstractmethod + def get_task_specs(topic: str) -> TaskSpecs: + """Returns TaskSpecs object that describes the task properties.""" + + +class StopTasksService(SystemTask): """Implements Stop-command.""" - def __init__(self, node: Node, active_tasks: ActiveTasks): - self.node = node - self.active_tasks = active_tasks + def __init__(self, node: Node, topic: str, active_tasks: ActiveTasks): + self._node = node + self._topic = topic + self._active_tasks = active_tasks + + self._node.create_service( + StopTasks, self._topic, self.service_cb, callback_group=MutuallyExclusiveCallbackGroup() + ) def service_cb(self, _request: StopTasks.Request, response: StopTasks.Response) -> StopTasks.Response: """Stops all the currently active tasks that have 'cancel_on_stop' field set to True.""" try: - self.active_tasks.cancel_tasks_on_stop() + self._active_tasks.cancel_tasks_on_stop() response.success = True except CancelTaskFailedError as e: - self.node.get_logger().error(f"Failed to stop some tasks on STOP command: {e}") + self._node.get_logger().error(f"Failed to stop some tasks on STOP command: {e}") response.success = False return response @staticmethod def get_task_specs(topic: str) -> TaskSpecs: - """Returns TaskSpecs object that describes the task properties.""" return TaskSpecs( task_name="system/stop", blocking=False, @@ -61,12 +78,17 @@ def get_task_specs(topic: str) -> TaskSpecs: ) -class CancelTasksService: +class CancelTasksService(SystemTask): """Cancel any task based on the task_id.""" - def __init__(self, node: Node, active_tasks: ActiveTasks): - self.node = node - self.active_tasks = active_tasks + def __init__(self, node: Node, topic: str, active_tasks: ActiveTasks) -> None: + self._node = node + self._topic = topic + self._active_tasks = active_tasks + + self._node.create_service( + CancelTasks, self._topic, self.service_cb, callback_group=MutuallyExclusiveCallbackGroup() + ) def service_cb(self, request: CancelTasks.Request, response: CancelTasks.Response) -> CancelTasks.Response: """Cancels the currently active tasks by given task_id.""" @@ -74,14 +96,14 @@ def service_cb(self, request: CancelTasks.Request, response: CancelTasks.Respons response.success = True for task_id in request.cancelled_tasks: try: - self.active_tasks.cancel_task(task_id) + self._active_tasks.cancel_task(task_id) except KeyError: - self.node.get_logger().warning( + self._node.get_logger().warning( f"Tried to cancel a task with ID {task_id}, but the task is not active. " f"Considering as a successful cancel." ) except CancelTaskFailedError as e: - self.node.get_logger().error(f"Failed to cancel task with ID {task_id}: {e}") + self._node.get_logger().error(f"Failed to cancel task with ID {task_id}: {e}") response.success = False continue @@ -92,7 +114,6 @@ def service_cb(self, request: CancelTasks.Request, response: CancelTasks.Respons @staticmethod def get_task_specs(topic: str) -> TaskSpecs: - """Returns TaskSpecs object that describes the task properties.""" return TaskSpecs( task_name="system/cancel_task", blocking=False, diff --git a/task_manager/test/integration_tests/task_manager_test_utils.py b/task_manager/test/integration_tests/task_manager_test_utils.py index 5eed7ea..0d9b6eb 100644 --- a/task_manager/test/integration_tests/task_manager_test_utils.py +++ b/task_manager/test/integration_tests/task_manager_test_utils.py @@ -105,7 +105,7 @@ def setUp(self) -> None: ) self._tasks_started = [] - def _active_tasks_cb(self, active_tasks_msg): + def _active_tasks_cb(self, active_tasks_msg) -> None: for active_task in active_tasks_msg.active_tasks: if active_task.task_id not in self._tasks_started: self._tasks_started.append(active_task.task_id) @@ -124,7 +124,7 @@ def _get_response(future, timeout=5) -> ClientGoalHandle: time.sleep(0.01) return future.result() - def wait_for_task_start(self, task_id: str, timeout: float = 5.0): + def wait_for_task_start(self, task_id: str, timeout: float = 5.0) -> None: """Utility function to wait for the task to be started.""" start = time.time() while task_id not in self._tasks_started: @@ -154,7 +154,7 @@ def start_add_two_ints_service_task( goal = create_add_two_ints_task_goal(task_id, task_name, run_time_secs, request) return self._start_task(goal) - def _start_task(self, goal): + def _start_task(self, goal) -> ClientGoalHandle: future: Future = self.execute_task_client.send_goal_async(goal) goal_handle = self._get_response(future) return goal_handle diff --git a/task_manager/test/integration_tests/test_system_tasks.py b/task_manager/test/integration_tests/test_system_tasks.py index 3214f42..c3a38a4 100644 --- a/task_manager/test/integration_tests/test_system_tasks.py +++ b/task_manager/test/integration_tests/test_system_tasks.py @@ -38,7 +38,7 @@ def test_cancel_task_happy_flow(self) -> None: cancel_response = self.execute_cancel_task(task_ids=["111"]) goal_handle.get_result() - self.assertTrue(cancel_response.status, GoalStatus.STATUS_SUCCEEDED) + self.assertEqual(cancel_response.status, GoalStatus.STATUS_SUCCEEDED) self.assertEqual(cancel_response.result.task_status, TaskStatus.DONE) self.assertEqual( From 6a4e86ecd125401aeb319173781de3ef6ca600cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joni=20P=C3=B6ll=C3=A4nen?= Date: Tue, 10 Sep 2024 13:28:07 +0300 Subject: [PATCH 2/7] Add Wait task MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Joni Pöllänen --- .../task_manager/task_manager_node.py | 5 +- .../task_manager/tasks/system_tasks.py | 65 +++++++++++++++++++ .../task_manager_test_utils.py | 17 ++++- .../integration_tests/test_system_tasks.py | 18 +++++ task_manager_msgs/CMakeLists.txt | 1 + task_manager_msgs/action/Wait.action | 4 ++ 6 files changed, 108 insertions(+), 2 deletions(-) create mode 100644 task_manager_msgs/action/Wait.action diff --git a/task_manager/task_manager/task_manager_node.py b/task_manager/task_manager/task_manager_node.py index 33ab00c..ec1d287 100755 --- a/task_manager/task_manager/task_manager_node.py +++ b/task_manager/task_manager/task_manager_node.py @@ -48,7 +48,7 @@ from task_manager.task_registrator import DuplicateTaskIdException, ROSGoalParsingError, TaskRegistrator from task_manager.task_specs import TaskServerType, TaskSpecs from task_manager.tasks.mission import Mission -from task_manager.tasks.system_tasks import CancelTasksService, StopTasksService +from task_manager.tasks.system_tasks import CancelTasksService, StopTasksService, WaitTask from task_manager.tasks.task_action_server import TaskActionServer from task_manager.tasks.task_service_server import TaskServiceServer @@ -151,6 +151,7 @@ def setup_system_tasks(self): stop_topic = f"{self.task_registrator.TASK_TOPIC_PREFIX}/system/stop" cancel_topic = f"{self.task_registrator.TASK_TOPIC_PREFIX}/system/cancel_task" mission_topic = f"{self.task_registrator.TASK_TOPIC_PREFIX}/system/mission" + wait_topic = f"{self.task_registrator.TASK_TOPIC_PREFIX}/system/wait" if not self._enable_task_servers: # Make services hidden. Actions cannot be hidden in a same way as services are, @@ -161,10 +162,12 @@ def setup_system_tasks(self): stop_service = StopTasksService(self, topic=stop_topic, active_tasks=self.active_tasks) cancel_service = CancelTasksService(self, topic=cancel_topic, active_tasks=self.active_tasks) mission = Mission(self, action_name=mission_topic, execute_task_cb=self.execute_task) + wait = WaitTask(self, topic=wait_topic) self.known_tasks["system/stop"] = stop_service.get_task_specs(stop_topic) self.known_tasks["system/cancel_task"] = cancel_service.get_task_specs(cancel_topic) self.known_tasks["system/mission"] = mission.get_task_specs(mission_topic) + self.known_tasks["system/wait"] = wait.get_task_specs(wait_topic) def _execute_task_action_cb(self, goal_handle: ServerGoalHandle): request = goal_handle.request diff --git a/task_manager/task_manager/tasks/system_tasks.py b/task_manager/task_manager/tasks/system_tasks.py index 0df1a64..91831f8 100644 --- a/task_manager/task_manager/tasks/system_tasks.py +++ b/task_manager/task_manager/tasks/system_tasks.py @@ -20,9 +20,11 @@ import rclpy from rclpy.action.server import ActionServer, CancelResponse, ServerGoalHandle from rclpy.callback_groups import MutuallyExclusiveCallbackGroup +from rclpy.duration import Duration from rclpy.node import Node # Task Manager messages +from task_manager_msgs.action import Wait from task_manager_msgs.srv import CancelTasks, StopTasks # Task Manager @@ -125,3 +127,66 @@ def get_task_specs(topic: str) -> TaskSpecs: task_server_type=TaskServerType.SERVICE, service_success_field="success", ) + + +class WaitTask(SystemTask): # pylint: disable=too-few-public-methods + """Wait for a specified amount of time or until a cancel is called.""" + + def __init__(self, node: Node, topic: str): + self._node = node + self._topic = topic + self._server = ActionServer( + self._node, + Wait, + self._topic, + self._execute_cb, + cancel_callback=self._cancel_cb, + callback_group=MutuallyExclusiveCallbackGroup(), + ) + + def _execute_cb(self, goal_handle: ServerGoalHandle) -> Wait.Result: + """Callback for the Wait action server.""" + duration_in_seconds = goal_handle.request.duration + loop_time = 0.1 if duration_in_seconds > 0.1 else duration_in_seconds + start_time = self._node.get_clock().now() + end_time = start_time + Duration(nanoseconds=int(duration_in_seconds * 1e9)) + feedback_frequency_ns = int(1.0 * 1e9) + last_feedback_stamp = start_time + + while rclpy.ok() and self._node.get_clock().now() < end_time: + if goal_handle.is_cancel_requested: + goal_handle.canceled() + return Wait.Result() + + if not goal_handle.is_active: + goal_handle.abort() + return Wait.Result() + + # Publish feedback + if (self._node.get_clock().now() - last_feedback_stamp).nanoseconds >= feedback_frequency_ns: + last_feedback_stamp = self._node.get_clock().now() + goal_handle.publish_feedback( + Wait.Feedback(remaining_time=(end_time - last_feedback_stamp).nanoseconds / 1e9) + ) + + time.sleep(loop_time) + + goal_handle.succeed() + return Wait.Result() + + @staticmethod + def _cancel_cb(_goal_handle: ServerGoalHandle) -> CancelResponse: + return CancelResponse.ACCEPT + + @staticmethod + def get_task_specs(topic: str) -> TaskSpecs: + return TaskSpecs( + task_name="system/wait", + blocking=True, + cancel_on_stop=True, + topic=topic, + cancel_reported_as_success=False, + reentrant=False, + msg_interface=Wait, + task_server_type=TaskServerType.ACTION, + ) diff --git a/task_manager/test/integration_tests/task_manager_test_utils.py b/task_manager/test/integration_tests/task_manager_test_utils.py index 0d9b6eb..3a4b17f 100644 --- a/task_manager/test/integration_tests/task_manager_test_utils.py +++ b/task_manager/test/integration_tests/task_manager_test_utils.py @@ -40,7 +40,7 @@ from example_interfaces.srv import AddTwoInts # Task Manager messages -from task_manager_msgs.action import ExecuteTask +from task_manager_msgs.action import ExecuteTask, Wait from task_manager_msgs.msg import ActiveTaskArray from task_manager_msgs.srv import CancelTasks, StopTasks @@ -175,6 +175,21 @@ def execute_cancel_task(self, task_ids: List[str]): goal_handle = self._start_task(goal=goal) return goal_handle.get_result() + def execute_wait_task(self, duration: float): + """Calls system/wait with given time.""" + wait_goal = Wait.Goal(duration=duration) + task_data = json.dumps(extract_values(wait_goal)) + goal = ExecuteTask.Goal(task_name="system/wait", task_data=task_data, source="") + goal_handle = self._start_task(goal=goal) + return goal_handle.get_result() + + def start_wait_task(self, duration: float, task_id: str = uuid.uuid4()) -> ClientGoalHandle: + """Starts the wait task with given duration and returns the goal handle.""" + wait_goal = Wait.Goal(duration=duration) + task_data = json.dumps(extract_values(wait_goal)) + goal = ExecuteTask.Goal(task_id=task_id, task_name="system/wait", task_data=task_data, source="") + return self._start_task(goal) + class TestTasksNode(Node): """Launches test action servers, each with different behavior or task configuration.""" diff --git a/task_manager/test/integration_tests/test_system_tasks.py b/task_manager/test/integration_tests/test_system_tasks.py index c3a38a4..d569795 100644 --- a/task_manager/test/integration_tests/test_system_tasks.py +++ b/task_manager/test/integration_tests/test_system_tasks.py @@ -101,6 +101,24 @@ def test_stop_task(self) -> None: self.assertEqual(stop_response.result.task_result, json.dumps({"success": False})) self.assertEqual(goal_handle.get_result().status, GoalStatus.STATUS_SUCCEEDED) + def test_wait_task(self) -> None: + """Test cases for Wait system task.""" + with self.subTest("Wait task is successful"): + wait_response = self.execute_wait_task(duration=0.5) + self.assertEqual(wait_response.result.task_status, TaskStatus.DONE) + self.assertEqual(wait_response.result.task_result, json.dumps({})) + + with self.subTest("Wait task is cancelled"): + goal_handle = self.start_wait_task(duration=10.0, task_id="wait_cancel") + self.wait_for_task_start("wait_cancel") + + cancel_response = self.execute_cancel_task(task_ids=["wait_cancel"]) + wait_result = goal_handle.get_result() + + self.assertEqual(cancel_response.status, GoalStatus.STATUS_SUCCEEDED) + self.assertEqual(wait_result.status, GoalStatus.STATUS_ABORTED) + self.assertEqual(wait_result.result.task_status, TaskStatus.CANCELED) + if __name__ == "__main__": unittest.main() diff --git a/task_manager_msgs/CMakeLists.txt b/task_manager_msgs/CMakeLists.txt index 49cf3eb..12dda41 100644 --- a/task_manager_msgs/CMakeLists.txt +++ b/task_manager_msgs/CMakeLists.txt @@ -31,6 +31,7 @@ set(srv_files set(action_files "action/Mission.action" "action/ExecuteTask.action" + "action/Wait.action" ) rosidl_generate_interfaces(${PROJECT_NAME} diff --git a/task_manager_msgs/action/Wait.action b/task_manager_msgs/action/Wait.action new file mode 100644 index 0000000..09380e6 --- /dev/null +++ b/task_manager_msgs/action/Wait.action @@ -0,0 +1,4 @@ +float32 duration +--- +--- +float32 remaining_time From 81863546c32fac11915cff398f344d1b7c260629 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joni=20P=C3=B6ll=C3=A4nen?= Date: Tue, 10 Sep 2024 16:41:19 +0300 Subject: [PATCH 3/7] Ability to wait indefinitely MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Joni Pöllänen --- .../task_manager/tasks/system_tasks.py | 16 ++++++++-- .../integration_tests/test_system_tasks.py | 32 +++++++++++++++++-- task_manager_msgs/action/Wait.action | 2 +- 3 files changed, 43 insertions(+), 7 deletions(-) diff --git a/task_manager/task_manager/tasks/system_tasks.py b/task_manager/task_manager/tasks/system_tasks.py index 91831f8..312c5fc 100644 --- a/task_manager/task_manager/tasks/system_tasks.py +++ b/task_manager/task_manager/tasks/system_tasks.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # ------------------------------------------------------------------ +import sys import time from abc import ABC, abstractmethod @@ -20,8 +21,10 @@ import rclpy from rclpy.action.server import ActionServer, CancelResponse, ServerGoalHandle from rclpy.callback_groups import MutuallyExclusiveCallbackGroup +from rclpy.clock import ClockType from rclpy.duration import Duration from rclpy.node import Node +from rclpy.time import Time # Task Manager messages from task_manager_msgs.action import Wait @@ -147,9 +150,16 @@ def __init__(self, node: Node, topic: str): def _execute_cb(self, goal_handle: ServerGoalHandle) -> Wait.Result: """Callback for the Wait action server.""" duration_in_seconds = goal_handle.request.duration - loop_time = 0.1 if duration_in_seconds > 0.1 else duration_in_seconds start_time = self._node.get_clock().now() - end_time = start_time + Duration(nanoseconds=int(duration_in_seconds * 1e9)) + + # Wait indefinitely if duration is 0.0 + if duration_in_seconds <= 0.0: + loop_time = 0.1 + end_time = Time(nanoseconds=sys.maxsize, clock_type=ClockType.ROS_TIME) + else: + loop_time = 0.1 if duration_in_seconds > 0.1 else duration_in_seconds + end_time = start_time + Duration(nanoseconds=int(duration_in_seconds * 1e9)) + feedback_frequency_ns = int(1.0 * 1e9) last_feedback_stamp = start_time @@ -166,7 +176,7 @@ def _execute_cb(self, goal_handle: ServerGoalHandle) -> Wait.Result: if (self._node.get_clock().now() - last_feedback_stamp).nanoseconds >= feedback_frequency_ns: last_feedback_stamp = self._node.get_clock().now() goal_handle.publish_feedback( - Wait.Feedback(remaining_time=(end_time - last_feedback_stamp).nanoseconds / 1e9) + Wait.Feedback(remaining_time=float((end_time.nanoseconds - last_feedback_stamp.nanoseconds) / 1e9)) ) time.sleep(loop_time) diff --git a/task_manager/test/integration_tests/test_system_tasks.py b/task_manager/test/integration_tests/test_system_tasks.py index d569795..0554b60 100644 --- a/task_manager/test/integration_tests/test_system_tasks.py +++ b/task_manager/test/integration_tests/test_system_tasks.py @@ -104,9 +104,9 @@ def test_stop_task(self) -> None: def test_wait_task(self) -> None: """Test cases for Wait system task.""" with self.subTest("Wait task is successful"): - wait_response = self.execute_wait_task(duration=0.5) - self.assertEqual(wait_response.result.task_status, TaskStatus.DONE) - self.assertEqual(wait_response.result.task_result, json.dumps({})) + wait_result = self.execute_wait_task(duration=0.5) + self.assertEqual(wait_result.result.task_status, TaskStatus.DONE) + self.assertEqual(wait_result.result.task_result, json.dumps({})) with self.subTest("Wait task is cancelled"): goal_handle = self.start_wait_task(duration=10.0, task_id="wait_cancel") @@ -119,6 +119,32 @@ def test_wait_task(self) -> None: self.assertEqual(wait_result.status, GoalStatus.STATUS_ABORTED) self.assertEqual(wait_result.result.task_status, TaskStatus.CANCELED) + with self.subTest("Input duration is negative - Wait indefinitely"): + goal_handle = self.start_wait_task(duration=-1.0, task_id="wait_negative") + self.wait_for_task_start("wait_negative") + + # Cancel the task to not wait for an eternity + cancel_response = self.execute_cancel_task(task_ids=["wait_negative"]) + wait_result = goal_handle.get_result() + + self.assertEqual(cancel_response.status, GoalStatus.STATUS_SUCCEEDED) + # If the task would not stay waiting, the status would be DONE + self.assertEqual(wait_result.result.task_status, TaskStatus.CANCELED) + self.assertEqual(wait_result.result.task_result, json.dumps({})) + + with self.subTest("Input duration is zero - Wait indefinitely"): + goal_handle = self.start_wait_task(duration=0.0, task_id="wait_indef_cancel") + self.wait_for_task_start("wait_indef_cancel") + + # Cancel the task to not wait for an eternity + cancel_response = self.execute_cancel_task(task_ids=["wait_indef_cancel"]) + wait_result = goal_handle.get_result() + + self.assertEqual(cancel_response.status, GoalStatus.STATUS_SUCCEEDED) + # If the task would not stay waiting with duration 0.0, the status would be DONE + self.assertEqual(wait_result.result.task_status, TaskStatus.CANCELED) + self.assertEqual(wait_result.result.task_result, json.dumps({})) + if __name__ == "__main__": unittest.main() diff --git a/task_manager_msgs/action/Wait.action b/task_manager_msgs/action/Wait.action index 09380e6..9618318 100644 --- a/task_manager_msgs/action/Wait.action +++ b/task_manager_msgs/action/Wait.action @@ -1,4 +1,4 @@ float32 duration --- --- -float32 remaining_time +float64 remaining_time From 726f10fefeedc962e78c636d9d4a3f67c3504966 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joni=20P=C3=B6ll=C3=A4nen?= Date: Wed, 11 Sep 2024 09:14:38 +0300 Subject: [PATCH 4/7] Add info of Wait task to readme MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Joni Pöllänen --- README.md | 14 +++++++++----- task_manager/task_manager/tasks/system_tasks.py | 3 +-- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index dab22ef..04a9def 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ Task Manager is your solution! 4. [Missions](#missions) 5. [Task Cancelling](#task-cancelling) 6. [Global STOP-task](#stop) +7. [Wait task](#wait) ### Tasks @@ -91,6 +92,8 @@ Tasks that are implemented using ROS Services cannot be cancelled due to their n ### Global STOP-task Task manager provides a `system/stop` task, which can be called to stop all the active tasks that have their parameter `cancel_on_stop` set to `True`. +### Wait task +The `system/wait` task is a blocking task which waits for a given time (`duration > 0.0`) or until it is cancelled (`duration <= 0.0`). ## Examples Examples and their run instructions can be found in [examples](examples) folder for: @@ -124,11 +127,12 @@ Examples and their run instructions can be found in [examples](examples) folder The following tasks are available by default from the Task Manager -| Task name | Description | Message interface | -|--------------------|----------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------| -| system/mission | Starts a mission. | [Mission](https://github.com/Karelics/task_manager/blob/main/task_manager_msgs/action/Mission.action) | -| system/cancel_task | Cancels the given tasks by Task ID. | [CancelTasks](https://github.com/Karelics/task_manager/blob/main/task_manager_msgs/srv/CancelTasks.srv) | -| system/stop | Cancels all the active tasks that have `cancel_on_stop` parameter set to `True`. | [StopTasks](https://github.com/Karelics/task_manager/blob/main/task_manager_msgs/srv/StopTasks.srv) | +| Task name | Description | Message interface | +|--------------------|-------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------| +| system/mission | Starts a mission. | [Mission](https://github.com/Karelics/task_manager/blob/main/task_manager_msgs/action/Mission.action) | +| system/cancel_task | Cancels the given tasks by Task ID. | [CancelTasks](https://github.com/Karelics/task_manager/blob/main/task_manager_msgs/srv/CancelTasks.srv) | +| system/stop | Cancels all the active tasks that have `cancel_on_stop` parameter set to `True`. | [StopTasks](https://github.com/Karelics/task_manager/blob/main/task_manager_msgs/srv/StopTasks.srv) | +| system/wait | Blocking task that waits for a given time or until cancelled with `duration <= 0.0`. | [Wait](https://github.com/Karelics/task_manager/blob/main/task_manager_msgs/action/Wait.action) ## Parameters diff --git a/task_manager/task_manager/tasks/system_tasks.py b/task_manager/task_manager/tasks/system_tasks.py index 312c5fc..ccbc772 100644 --- a/task_manager/task_manager/tasks/system_tasks.py +++ b/task_manager/task_manager/tasks/system_tasks.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # ------------------------------------------------------------------ -import sys import time from abc import ABC, abstractmethod @@ -155,7 +154,7 @@ def _execute_cb(self, goal_handle: ServerGoalHandle) -> Wait.Result: # Wait indefinitely if duration is 0.0 if duration_in_seconds <= 0.0: loop_time = 0.1 - end_time = Time(nanoseconds=sys.maxsize, clock_type=ClockType.ROS_TIME) + end_time = Time(nanoseconds=2**63 - 1, clock_type=ClockType.ROS_TIME) else: loop_time = 0.1 if duration_in_seconds > 0.1 else duration_in_seconds end_time = start_time + Duration(nanoseconds=int(duration_in_seconds * 1e9)) From 3087c3b32e85b8acea82f5f8328e11a0efec4f9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joni=20P=C3=B6ll=C3=A4nen?= Date: Wed, 11 Sep 2024 09:19:20 +0300 Subject: [PATCH 5/7] Fix trailing whitespace MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Joni Pöllänen --- README.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 04a9def..ace2beb 100644 --- a/README.md +++ b/README.md @@ -93,7 +93,7 @@ Tasks that are implemented using ROS Services cannot be cancelled due to their n Task manager provides a `system/stop` task, which can be called to stop all the active tasks that have their parameter `cancel_on_stop` set to `True`. ### Wait task -The `system/wait` task is a blocking task which waits for a given time (`duration > 0.0`) or until it is cancelled (`duration <= 0.0`). +The `system/wait` task is a blocking task which waits for a given time (`duration > 0.0`) or until it is cancelled (`duration <= 0.0`). ## Examples Examples and their run instructions can be found in [examples](examples) folder for: @@ -127,12 +127,12 @@ Examples and their run instructions can be found in [examples](examples) folder The following tasks are available by default from the Task Manager -| Task name | Description | Message interface | -|--------------------|-------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------| -| system/mission | Starts a mission. | [Mission](https://github.com/Karelics/task_manager/blob/main/task_manager_msgs/action/Mission.action) | -| system/cancel_task | Cancels the given tasks by Task ID. | [CancelTasks](https://github.com/Karelics/task_manager/blob/main/task_manager_msgs/srv/CancelTasks.srv) | -| system/stop | Cancels all the active tasks that have `cancel_on_stop` parameter set to `True`. | [StopTasks](https://github.com/Karelics/task_manager/blob/main/task_manager_msgs/srv/StopTasks.srv) | -| system/wait | Blocking task that waits for a given time or until cancelled with `duration <= 0.0`. | [Wait](https://github.com/Karelics/task_manager/blob/main/task_manager_msgs/action/Wait.action) +| Task name | Description | Message interface | +|--------------------|---------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------| +| system/mission | Starts a mission. | [Mission](https://github.com/Karelics/task_manager/blob/main/task_manager_msgs/action/Mission.action) | +| system/cancel_task | Cancels the given tasks by Task ID. | [CancelTasks](https://github.com/Karelics/task_manager/blob/main/task_manager_msgs/srv/CancelTasks.srv) | +| system/stop | Cancels all the active tasks that have `cancel_on_stop` parameter set to `True`. | [StopTasks](https://github.com/Karelics/task_manager/blob/main/task_manager_msgs/srv/StopTasks.srv) | +| system/wait | A blocking task which waits for a given time (`duration > 0.0`) or until it is cancelled (`duration <= 0.0`). | [Wait](https://github.com/Karelics/task_manager/blob/main/task_manager_msgs/action/Wait.action) ## Parameters From c2f1c6f2f51119e8cf8be4eb9450619310f1c33f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joni=20P=C3=B6ll=C3=A4nen?= Date: Wed, 11 Sep 2024 16:54:40 +0300 Subject: [PATCH 6/7] Change cancel_reported_as_success to True MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Joni Pöllänen --- task_manager/task_manager/tasks/system_tasks.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/task_manager/task_manager/tasks/system_tasks.py b/task_manager/task_manager/tasks/system_tasks.py index ccbc772..92fd9d6 100644 --- a/task_manager/task_manager/tasks/system_tasks.py +++ b/task_manager/task_manager/tasks/system_tasks.py @@ -159,7 +159,7 @@ def _execute_cb(self, goal_handle: ServerGoalHandle) -> Wait.Result: loop_time = 0.1 if duration_in_seconds > 0.1 else duration_in_seconds end_time = start_time + Duration(nanoseconds=int(duration_in_seconds * 1e9)) - feedback_frequency_ns = int(1.0 * 1e9) + feedback_period_ns = int(1.0 * 1e9) last_feedback_stamp = start_time while rclpy.ok() and self._node.get_clock().now() < end_time: @@ -172,7 +172,7 @@ def _execute_cb(self, goal_handle: ServerGoalHandle) -> Wait.Result: return Wait.Result() # Publish feedback - if (self._node.get_clock().now() - last_feedback_stamp).nanoseconds >= feedback_frequency_ns: + if (self._node.get_clock().now() - last_feedback_stamp).nanoseconds >= feedback_period_ns: last_feedback_stamp = self._node.get_clock().now() goal_handle.publish_feedback( Wait.Feedback(remaining_time=float((end_time.nanoseconds - last_feedback_stamp.nanoseconds) / 1e9)) @@ -194,7 +194,7 @@ def get_task_specs(topic: str) -> TaskSpecs: blocking=True, cancel_on_stop=True, topic=topic, - cancel_reported_as_success=False, + cancel_reported_as_success=True, reentrant=False, msg_interface=Wait, task_server_type=TaskServerType.ACTION, From c412b2cca906325b50a476a8749d733bb9edd86f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joni=20P=C3=B6ll=C3=A4nen?= Date: Thu, 12 Sep 2024 10:12:34 +0300 Subject: [PATCH 7/7] Back to cancel_as_success=False but make cancel of indefinite wait to report success. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Joni Pöllänen --- task_manager/task_manager/tasks/system_tasks.py | 7 +++++-- task_manager/test/integration_tests/test_system_tasks.py | 6 ++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/task_manager/task_manager/tasks/system_tasks.py b/task_manager/task_manager/tasks/system_tasks.py index 92fd9d6..03f08a6 100644 --- a/task_manager/task_manager/tasks/system_tasks.py +++ b/task_manager/task_manager/tasks/system_tasks.py @@ -164,7 +164,10 @@ def _execute_cb(self, goal_handle: ServerGoalHandle) -> Wait.Result: while rclpy.ok() and self._node.get_clock().now() < end_time: if goal_handle.is_cancel_requested: - goal_handle.canceled() + if duration_in_seconds <= 0.0: + goal_handle.succeed() + else: + goal_handle.canceled() return Wait.Result() if not goal_handle.is_active: @@ -194,7 +197,7 @@ def get_task_specs(topic: str) -> TaskSpecs: blocking=True, cancel_on_stop=True, topic=topic, - cancel_reported_as_success=True, + cancel_reported_as_success=False, reentrant=False, msg_interface=Wait, task_server_type=TaskServerType.ACTION, diff --git a/task_manager/test/integration_tests/test_system_tasks.py b/task_manager/test/integration_tests/test_system_tasks.py index 0554b60..08053f9 100644 --- a/task_manager/test/integration_tests/test_system_tasks.py +++ b/task_manager/test/integration_tests/test_system_tasks.py @@ -128,8 +128,7 @@ def test_wait_task(self) -> None: wait_result = goal_handle.get_result() self.assertEqual(cancel_response.status, GoalStatus.STATUS_SUCCEEDED) - # If the task would not stay waiting, the status would be DONE - self.assertEqual(wait_result.result.task_status, TaskStatus.CANCELED) + self.assertEqual(wait_result.result.task_status, TaskStatus.DONE) self.assertEqual(wait_result.result.task_result, json.dumps({})) with self.subTest("Input duration is zero - Wait indefinitely"): @@ -141,8 +140,7 @@ def test_wait_task(self) -> None: wait_result = goal_handle.get_result() self.assertEqual(cancel_response.status, GoalStatus.STATUS_SUCCEEDED) - # If the task would not stay waiting with duration 0.0, the status would be DONE - self.assertEqual(wait_result.result.task_status, TaskStatus.CANCELED) + self.assertEqual(wait_result.result.task_status, TaskStatus.DONE) self.assertEqual(wait_result.result.task_result, json.dumps({}))