Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Wait task #22

Merged
merged 7 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 7 additions & 16 deletions task_manager/task_manager/task_manager_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from rclpy import Parameter
from rclpy.action import ActionServer
from rclpy.action.server import CancelResponse, ServerGoalHandle
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup, ReentrantCallbackGroup
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node
from rclpy.qos import QoSDurabilityPolicy, QoSProfile, QoSReliabilityPolicy
Expand All @@ -40,7 +40,6 @@
# Task Manager messages
from task_manager_msgs.action import ExecuteTask
from task_manager_msgs.msg import ActiveTask, ActiveTaskArray, TaskDoneResult, TaskStatus
from task_manager_msgs.srv import CancelTasks, StopTasks

# Task Manager
from task_manager.active_tasks import ActiveTasks
Expand All @@ -49,7 +48,7 @@
from task_manager.task_registrator import DuplicateTaskIdException, ROSGoalParsingError, TaskRegistrator
from task_manager.task_specs import TaskServerType, TaskSpecs
from task_manager.tasks.mission import Mission
from task_manager.tasks.system_tasks import CancelTasksService, StopTasksService
from task_manager.tasks.system_tasks import CancelTasksService, StopTasksService, WaitTask
from task_manager.tasks.task_action_server import TaskActionServer
from task_manager.tasks.task_service_server import TaskServiceServer

Expand Down Expand Up @@ -152,31 +151,23 @@ def setup_system_tasks(self):
stop_topic = f"{self.task_registrator.TASK_TOPIC_PREFIX}/system/stop"
cancel_topic = f"{self.task_registrator.TASK_TOPIC_PREFIX}/system/cancel_task"
mission_topic = f"{self.task_registrator.TASK_TOPIC_PREFIX}/system/mission"
wait_topic = f"{self.task_registrator.TASK_TOPIC_PREFIX}/system/wait"

if not self._enable_task_servers:
# Make services hidden. Actions cannot be hidden in a same way as services are,
# so Missions are always public.
stop_topic = "_" + stop_topic
cancel_topic = "_" + cancel_topic

stop_service = StopTasksService(self, active_tasks=self.active_tasks)
self.create_service(
StopTasks, stop_topic, callback=stop_service.service_cb, callback_group=MutuallyExclusiveCallbackGroup()
)

cancel_service = CancelTasksService(self, active_tasks=self.active_tasks)
self.create_service(
CancelTasks,
cancel_topic,
callback=cancel_service.service_cb,
callback_group=MutuallyExclusiveCallbackGroup(),
)

stop_service = StopTasksService(self, topic=stop_topic, active_tasks=self.active_tasks)
cancel_service = CancelTasksService(self, topic=cancel_topic, active_tasks=self.active_tasks)
mission = Mission(self, action_name=mission_topic, execute_task_cb=self.execute_task)
wait = WaitTask(self, topic=wait_topic)

self.known_tasks["system/stop"] = stop_service.get_task_specs(stop_topic)
self.known_tasks["system/cancel_task"] = cancel_service.get_task_specs(cancel_topic)
self.known_tasks["system/mission"] = mission.get_task_specs(mission_topic)
self.known_tasks["system/wait"] = wait.get_task_specs(wait_topic)

def _execute_task_action_cb(self, goal_handle: ServerGoalHandle):
request = goal_handle.request
Expand Down
128 changes: 112 additions & 16 deletions task_manager/task_manager/tasks/system_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ------------------------------------------------------------------

import sys
import time
from abc import ABC, abstractmethod

# ROS
import rclpy
from rclpy.action.server import ActionServer, CancelResponse, ServerGoalHandle
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from rclpy.clock import ClockType
from rclpy.duration import Duration
from rclpy.node import Node
from rclpy.time import Time

# Task Manager messages
from task_manager_msgs.action import Wait
from task_manager_msgs.srv import CancelTasks, StopTasks

# Task Manager
Expand All @@ -27,27 +36,40 @@
from task_manager.task_specs import TaskServerType, TaskSpecs


class StopTasksService:
class SystemTask(ABC): # pylint: disable=too-few-public-methods
"""Abstract class for system tasks."""

@staticmethod
@abstractmethod
def get_task_specs(topic: str) -> TaskSpecs:
"""Returns TaskSpecs object that describes the task properties."""


class StopTasksService(SystemTask):
"""Implements Stop-command."""

def __init__(self, node: Node, active_tasks: ActiveTasks):
self.node = node
self.active_tasks = active_tasks
def __init__(self, node: Node, topic: str, active_tasks: ActiveTasks):
self._node = node
self._topic = topic
self._active_tasks = active_tasks

self._node.create_service(
StopTasks, self._topic, self.service_cb, callback_group=MutuallyExclusiveCallbackGroup()
)

def service_cb(self, _request: StopTasks.Request, response: StopTasks.Response) -> StopTasks.Response:
"""Stops all the currently active tasks that have 'cancel_on_stop' field set to True."""
try:
self.active_tasks.cancel_tasks_on_stop()
self._active_tasks.cancel_tasks_on_stop()
response.success = True
except CancelTaskFailedError as e:
self.node.get_logger().error(f"Failed to stop some tasks on STOP command: {e}")
self._node.get_logger().error(f"Failed to stop some tasks on STOP command: {e}")
response.success = False

return response

@staticmethod
def get_task_specs(topic: str) -> TaskSpecs:
"""Returns TaskSpecs object that describes the task properties."""
return TaskSpecs(
task_name="system/stop",
blocking=False,
Expand All @@ -61,27 +83,32 @@ def get_task_specs(topic: str) -> TaskSpecs:
)


class CancelTasksService:
class CancelTasksService(SystemTask):
"""Cancel any task based on the task_id."""

def __init__(self, node: Node, active_tasks: ActiveTasks):
self.node = node
self.active_tasks = active_tasks
def __init__(self, node: Node, topic: str, active_tasks: ActiveTasks) -> None:
self._node = node
self._topic = topic
self._active_tasks = active_tasks

self._node.create_service(
CancelTasks, self._topic, self.service_cb, callback_group=MutuallyExclusiveCallbackGroup()
)

def service_cb(self, request: CancelTasks.Request, response: CancelTasks.Response) -> CancelTasks.Response:
"""Cancels the currently active tasks by given task_id."""
cancelled = []
response.success = True
for task_id in request.cancelled_tasks:
try:
self.active_tasks.cancel_task(task_id)
self._active_tasks.cancel_task(task_id)
except KeyError:
self.node.get_logger().warning(
self._node.get_logger().warning(
f"Tried to cancel a task with ID {task_id}, but the task is not active. "
f"Considering as a successful cancel."
)
except CancelTaskFailedError as e:
self.node.get_logger().error(f"Failed to cancel task with ID {task_id}: {e}")
self._node.get_logger().error(f"Failed to cancel task with ID {task_id}: {e}")
response.success = False
continue

Expand All @@ -92,7 +119,6 @@ def service_cb(self, request: CancelTasks.Request, response: CancelTasks.Respons

@staticmethod
def get_task_specs(topic: str) -> TaskSpecs:
"""Returns TaskSpecs object that describes the task properties."""
return TaskSpecs(
task_name="system/cancel_task",
blocking=False,
Expand All @@ -104,3 +130,73 @@ def get_task_specs(topic: str) -> TaskSpecs:
task_server_type=TaskServerType.SERVICE,
service_success_field="success",
)


class WaitTask(SystemTask): # pylint: disable=too-few-public-methods
"""Wait for a specified amount of time or until a cancel is called."""

def __init__(self, node: Node, topic: str):
self._node = node
self._topic = topic
self._server = ActionServer(
self._node,
Wait,
self._topic,
self._execute_cb,
cancel_callback=self._cancel_cb,
callback_group=MutuallyExclusiveCallbackGroup(),
)

def _execute_cb(self, goal_handle: ServerGoalHandle) -> Wait.Result:
"""Callback for the Wait action server."""
duration_in_seconds = goal_handle.request.duration
start_time = self._node.get_clock().now()

# Wait indefinitely if duration is 0.0
if duration_in_seconds <= 0.0:
loop_time = 0.1
end_time = Time(nanoseconds=sys.maxsize, clock_type=ClockType.ROS_TIME)
else:
loop_time = 0.1 if duration_in_seconds > 0.1 else duration_in_seconds
end_time = start_time + Duration(nanoseconds=int(duration_in_seconds * 1e9))

feedback_frequency_ns = int(1.0 * 1e9)
tanelikor marked this conversation as resolved.
Show resolved Hide resolved
last_feedback_stamp = start_time

while rclpy.ok() and self._node.get_clock().now() < end_time:
if goal_handle.is_cancel_requested:
goal_handle.canceled()
return Wait.Result()

if not goal_handle.is_active:
goal_handle.abort()
return Wait.Result()

# Publish feedback
if (self._node.get_clock().now() - last_feedback_stamp).nanoseconds >= feedback_frequency_ns:
last_feedback_stamp = self._node.get_clock().now()
goal_handle.publish_feedback(
Wait.Feedback(remaining_time=float((end_time.nanoseconds - last_feedback_stamp.nanoseconds) / 1e9))
)

time.sleep(loop_time)

goal_handle.succeed()
return Wait.Result()

@staticmethod
def _cancel_cb(_goal_handle: ServerGoalHandle) -> CancelResponse:
return CancelResponse.ACCEPT

@staticmethod
def get_task_specs(topic: str) -> TaskSpecs:
return TaskSpecs(
task_name="system/wait",
blocking=True,
cancel_on_stop=True,
topic=topic,
cancel_reported_as_success=False,
tanelikor marked this conversation as resolved.
Show resolved Hide resolved
reentrant=False,
msg_interface=Wait,
task_server_type=TaskServerType.ACTION,
)
23 changes: 19 additions & 4 deletions task_manager/test/integration_tests/task_manager_test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from example_interfaces.srv import AddTwoInts

# Task Manager messages
from task_manager_msgs.action import ExecuteTask
from task_manager_msgs.action import ExecuteTask, Wait
from task_manager_msgs.msg import ActiveTaskArray
from task_manager_msgs.srv import CancelTasks, StopTasks

Expand Down Expand Up @@ -105,7 +105,7 @@ def setUp(self) -> None:
)
self._tasks_started = []

def _active_tasks_cb(self, active_tasks_msg):
def _active_tasks_cb(self, active_tasks_msg) -> None:
for active_task in active_tasks_msg.active_tasks:
if active_task.task_id not in self._tasks_started:
self._tasks_started.append(active_task.task_id)
Expand All @@ -124,7 +124,7 @@ def _get_response(future, timeout=5) -> ClientGoalHandle:
time.sleep(0.01)
return future.result()

def wait_for_task_start(self, task_id: str, timeout: float = 5.0):
def wait_for_task_start(self, task_id: str, timeout: float = 5.0) -> None:
"""Utility function to wait for the task to be started."""
start = time.time()
while task_id not in self._tasks_started:
Expand Down Expand Up @@ -154,7 +154,7 @@ def start_add_two_ints_service_task(
goal = create_add_two_ints_task_goal(task_id, task_name, run_time_secs, request)
return self._start_task(goal)

def _start_task(self, goal):
def _start_task(self, goal) -> ClientGoalHandle:
future: Future = self.execute_task_client.send_goal_async(goal)
goal_handle = self._get_response(future)
return goal_handle
Expand All @@ -175,6 +175,21 @@ def execute_cancel_task(self, task_ids: List[str]):
goal_handle = self._start_task(goal=goal)
return goal_handle.get_result()

def execute_wait_task(self, duration: float):
"""Calls system/wait with given time."""
wait_goal = Wait.Goal(duration=duration)
task_data = json.dumps(extract_values(wait_goal))
goal = ExecuteTask.Goal(task_name="system/wait", task_data=task_data, source="")
goal_handle = self._start_task(goal=goal)
return goal_handle.get_result()

def start_wait_task(self, duration: float, task_id: str = uuid.uuid4()) -> ClientGoalHandle:
"""Starts the wait task with given duration and returns the goal handle."""
wait_goal = Wait.Goal(duration=duration)
task_data = json.dumps(extract_values(wait_goal))
goal = ExecuteTask.Goal(task_id=task_id, task_name="system/wait", task_data=task_data, source="")
return self._start_task(goal)


class TestTasksNode(Node):
"""Launches test action servers, each with different behavior or task configuration."""
Expand Down
46 changes: 45 additions & 1 deletion task_manager/test/integration_tests/test_system_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def test_cancel_task_happy_flow(self) -> None:
cancel_response = self.execute_cancel_task(task_ids=["111"])
goal_handle.get_result()

self.assertTrue(cancel_response.status, GoalStatus.STATUS_SUCCEEDED)
self.assertEqual(cancel_response.status, GoalStatus.STATUS_SUCCEEDED)
self.assertEqual(cancel_response.result.task_status, TaskStatus.DONE)

self.assertEqual(
Expand Down Expand Up @@ -101,6 +101,50 @@ def test_stop_task(self) -> None:
self.assertEqual(stop_response.result.task_result, json.dumps({"success": False}))
self.assertEqual(goal_handle.get_result().status, GoalStatus.STATUS_SUCCEEDED)

def test_wait_task(self) -> None:
"""Test cases for Wait system task."""
with self.subTest("Wait task is successful"):
wait_result = self.execute_wait_task(duration=0.5)
self.assertEqual(wait_result.result.task_status, TaskStatus.DONE)
self.assertEqual(wait_result.result.task_result, json.dumps({}))

with self.subTest("Wait task is cancelled"):
goal_handle = self.start_wait_task(duration=10.0, task_id="wait_cancel")
self.wait_for_task_start("wait_cancel")

cancel_response = self.execute_cancel_task(task_ids=["wait_cancel"])
wait_result = goal_handle.get_result()

self.assertEqual(cancel_response.status, GoalStatus.STATUS_SUCCEEDED)
self.assertEqual(wait_result.status, GoalStatus.STATUS_ABORTED)
self.assertEqual(wait_result.result.task_status, TaskStatus.CANCELED)

with self.subTest("Input duration is negative - Wait indefinitely"):
goal_handle = self.start_wait_task(duration=-1.0, task_id="wait_negative")
self.wait_for_task_start("wait_negative")

# Cancel the task to not wait for an eternity
cancel_response = self.execute_cancel_task(task_ids=["wait_negative"])
wait_result = goal_handle.get_result()

self.assertEqual(cancel_response.status, GoalStatus.STATUS_SUCCEEDED)
# If the task would not stay waiting, the status would be DONE
self.assertEqual(wait_result.result.task_status, TaskStatus.CANCELED)
self.assertEqual(wait_result.result.task_result, json.dumps({}))

with self.subTest("Input duration is zero - Wait indefinitely"):
goal_handle = self.start_wait_task(duration=0.0, task_id="wait_indef_cancel")
self.wait_for_task_start("wait_indef_cancel")

# Cancel the task to not wait for an eternity
cancel_response = self.execute_cancel_task(task_ids=["wait_indef_cancel"])
wait_result = goal_handle.get_result()

self.assertEqual(cancel_response.status, GoalStatus.STATUS_SUCCEEDED)
# If the task would not stay waiting with duration 0.0, the status would be DONE
self.assertEqual(wait_result.result.task_status, TaskStatus.CANCELED)
self.assertEqual(wait_result.result.task_result, json.dumps({}))


if __name__ == "__main__":
unittest.main()
1 change: 1 addition & 0 deletions task_manager_msgs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ set(srv_files
set(action_files
"action/Mission.action"
"action/ExecuteTask.action"
"action/Wait.action"
)

rosidl_generate_interfaces(${PROJECT_NAME}
Expand Down
4 changes: 4 additions & 0 deletions task_manager_msgs/action/Wait.action
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
float32 duration
---
---
float64 remaining_time
Loading