From 7195e868b9179ff9866de1af17ecfb9e51dc5749 Mon Sep 17 00:00:00 2001 From: Matt Barker <105945282+m-barker@users.noreply.github.com> Date: Tue, 7 May 2024 15:12:33 +0100 Subject: [PATCH] feat: Skeleton State Machine Factory (#196) * fix: incorrect remapping keys for handover * fix: remove speech server in launch file * feat: skeleton state machine factory --- tasks/gpsr/scripts/main.py | 4 +- tasks/gpsr/src/gpsr/regex_command_parser.py | 6 +- tasks/gpsr/src/gpsr/state_machine_factory.py | 213 +++++++++++++------ 3 files changed, 148 insertions(+), 75 deletions(-) diff --git a/tasks/gpsr/scripts/main.py b/tasks/gpsr/scripts/main.py index 4c0e15f39..780b91a4e 100644 --- a/tasks/gpsr/scripts/main.py +++ b/tasks/gpsr/scripts/main.py @@ -4,7 +4,7 @@ import sys from typing import Dict from gpsr.load_known_data import GPSRDataLoader -from gpsr.state_machine_factory import build_state_machine +from gpsr.state_machine_factory import GPSRStateMachineFactory from gpsr.regex_command_parser import Configuration from gpsr.states import CommandParserStateMachine @@ -34,7 +34,7 @@ def main(): command_parser_sm.execute() parsed_command: Dict = command_parser_sm.userdata.parsed_command rospy.loginfo(f"Parsed command: {parsed_command}") - sm = build_state_machine(parsed_command) + sm = GPSRStateMachineFactory(parsed_command).build_state_machine() sm.execute() diff --git a/tasks/gpsr/src/gpsr/regex_command_parser.py b/tasks/gpsr/src/gpsr/regex_command_parser.py index ea091f052..88777e44d 100644 --- a/tasks/gpsr/src/gpsr/regex_command_parser.py +++ b/tasks/gpsr/src/gpsr/regex_command_parser.py @@ -36,11 +36,11 @@ def list_to_regex(list: List[str], key: Union[str, None] = None): "greet": ["greet", "salute", "say hello to", "introduce yourself to"], # "remember": ["meet", "contact", "get to know", "get acquainted with"], <--- LOOKS UNUSED "count": ["tell me how many"], - "describe": ["tell me how", "describe"], - "offer": ["offer"], + # "describe": ["tell me how", "describe"], <---- LOOKS UNUSED + # "offer": ["offer"], <---- LOOKS UNUSED "follow": ["follow"], "guide": ["guide", "escort", "take", "lead"], - "accompany": ["accompany"], + # "accompany": ["accompany"], <---- LOOKS UNUSED } diff --git a/tasks/gpsr/src/gpsr/state_machine_factory.py b/tasks/gpsr/src/gpsr/state_machine_factory.py index d7bc33e4d..0208686dd 100644 --- a/tasks/gpsr/src/gpsr/state_machine_factory.py +++ b/tasks/gpsr/src/gpsr/state_machine_factory.py @@ -1,78 +1,151 @@ #!/usr/bin/env python3 import rospy import smach -from smach_ros import ServiceState from typing import Dict, List -from lasr_skills import GoToLocation, FindNamedPerson +from lasr_skills import GoToLocation, FindNamedPerson # type: ignore from gpsr.states import Talk -STATE_COUNT = 0 - - -def increment_state_count() -> int: - global STATE_COUNT - STATE_COUNT += 1 - return STATE_COUNT - - -def build_state_machine(parsed_command: Dict) -> smach.StateMachine: - """Constructs the parameterized state machine for the GPSR task, - given the parsed command. - - Args: - parsed_command (Dict): parsed command. - - Returns: - smach.StateMachine: paramaterized state machine ready to be executed. - """ - command_verbs: List[str] = parsed_command["commands"] - command_params: List[Dict] = parsed_command["command_params"] - sm = smach.StateMachine(outcomes=["succeeded", "failed"]) - with sm: - for command_verb, command_param in zip(command_verbs, command_params): - if command_verb == "greet": - if "name" in command_param: - location_param_room = ( - f"/gpsr/arena/rooms/{command_param['location']}" - ) - location_param_pose = f"{location_param_room}/pose" - sm.add( - f"STATE_{increment_state_count()}", - GoToLocation(location_param=location_param_pose), - transitions={ - "succeeded": f"STATE_{STATE_COUNT + 1}", - "failed": "failed", - }, - ) - sm.add( - f"STATE_{increment_state_count()}", - FindNamedPerson( - name=command_param["name"], - location_param=location_param_room, - ), - transitions={ - "succeeded": f"STATE_{STATE_COUNT + 1}", - "failed": "failed", - }, - ) - elif "clothes" in command_param: - pass - else: - raise ValueError( - "Greet command received with no name or clothes in command parameters" - ) - elif command_verb == "talk": - if "gesture" in command_param: - pass - elif "talk" in command_param: - sm.add( - f"STATE_{increment_state_count()}", - Talk(command_param["talk"]), - transitions={"succeeded": "succeeded", "failed": "failed"}, - ) + +class GPSRStateMachineFactory: + def __init__(self, parsed_command: Dict): + """Stores the parsed command, initializes the state count, + and initalises the state machine. + + Args: + parsed_command (Dict): parsed command output by the + command parser. + """ + self.state_count: int = 0 + self.parsed_command: dict = parsed_command + self.sm = smach.StateMachine(outcomes=["succeeded", "failed"]) + + def _increment_state_count(self): + self.state_count += 1 + return self.state_count + + def _handle_take_command(self, command_param: Dict): + raise NotImplementedError("Take command not implemented") + + def _handle_place_command(self, command_param: Dict): + raise NotImplementedError("Place command not implemented") + + def _handle_deliver_command(self, command_param: Dict): + raise NotImplementedError("Deliver command not implemented") + + def _handle_bring_command(self, command_param: Dict): + raise NotImplementedError("Bring command not implemented") + + def _handle_go_command(self, command_param: Dict): + raise NotImplementedError("Go command not implemented") + + def _handle_find_command(self, command_param: Dict): + raise NotImplementedError("Find command not implemented") + + def _handle_talk_command(self, command_param: Dict): + if "gesture" in command_param: + raise NotImplementedError("Talk command with gesture not implemented") + elif "text" in command_param: + self.sm.add( + f"STATE_{self._increment_state_count()}", + Talk(text=command_param["text"]), + transitions={ + "succeeded": f"STATE_{self.state_count + 1}", + "failed": "failed", + }, + ) + else: + raise ValueError( + "Talk command received with no text or gesture in command parameters" + ) + + def _handle_answer_command(self, command_param: Dict): + raise NotImplementedError("Answer command not implemented") + + def _handle_meet_command(self, command_param: Dict): + raise NotImplementedError("Meet command not implemented") + + def _handle_tell_command(self, command_param: Dict): + raise NotImplementedError("Tell command not implemented") + + def _handle_greet_command(self, command_param: Dict): + if "name" in command_param: + location_param_room = f"/gpsr/arena/rooms/{command_param['location']}" + location_param_pose = f"{location_param_room}/pose" + self.sm.add( + f"STATE_{self._increment_state_count()}", + GoToLocation(location_param=location_param_pose), + transitions={ + "succeeded": f"STATE_{self.state_count + 1}", + "failed": "failed", + }, + ) + self.sm.add( + f"STATE_{self._increment_state_count()}", + FindNamedPerson( + name=command_param["name"], + location_param=location_param_room, + ), + transitions={ + "succeeded": f"STATE_{self.state_count + 1}", + "failed": "failed", + }, + ) + elif "clothes" in command_param: + raise NotImplementedError("Greet command with clothes not implemented") + else: + raise ValueError( + "Greet command received with no name or clothes in command parameters" + ) + + def _handle_count_command(self, command_params: Dict): + raise NotImplementedError("Count command not implemented") + + def _handle_follow_command(self, command_params: Dict): + raise NotImplementedError("Follow command not implemented") + + def _handle_guide_command(self, command_params: Dict): + raise NotImplementedError("Guide command not implemented") + + def build_state_machine(self) -> smach.StateMachine: + with self.sm: + command_verbs: List[str] = self.parsed_command["commands"] + command_params: List[Dict] = self.parsed_command["command_params"] + for command_verb, command_param in zip(command_verbs, command_params): + if command_verb == "take": + self._handle_take_command(command_param) + elif command_verb == "place": + self._handle_place_command(command_param) + elif command_verb == "deliver": + self._handle_deliver_command(command_param) + elif command_verb == "bring": + self._handle_bring_command(command_param) + elif command_verb == "go": + self._handle_go_command(command_param) + elif command_verb == "find": + self._handle_find_command(command_param) + elif command_verb == "talk": + self._handle_talk_command(command_param) + elif command_verb == "answer": + self._handle_answer_command(command_param) + elif command_verb == "meet": + self._handle_meet_command(command_param) + elif command_verb == "tell": + self._handle_tell_command(command_param) + elif command_verb == "greet": + self._handle_greet_command(command_param) + elif command_verb == "count": + self._handle_count_command(command_param) + elif command_verb == "follow": + self._handle_follow_command(command_param) + elif command_verb == "guide": + self._handle_guide_command(command_param) else: - raise ValueError( - "Talk command received with no gesture or talk in command parameters" - ) + raise ValueError(f"Invalid command verb: {command_verb}") + + self.sm.add( + f"STATE_{self.state_count}", + Talk(text="I have completed the task."), + transitions={"succeeded": "succeeded", "failed": "failed"}, + ) - return sm + return self.sm