Skip to content

Commit

Permalink
feat: Skeleton State Machine Factory (LASR-at-Home#196)
Browse files Browse the repository at this point in the history
* fix: incorrect remapping keys for handover

* fix: remove speech server in launch file

* feat: skeleton state machine factory
  • Loading branch information
m-barker authored May 7, 2024
1 parent 52dfe45 commit 7195e86
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 75 deletions.
4 changes: 2 additions & 2 deletions tasks/gpsr/scripts/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import sys
from typing import Dict
from gpsr.load_known_data import GPSRDataLoader
from gpsr.state_machine_factory import build_state_machine
from gpsr.state_machine_factory import GPSRStateMachineFactory
from gpsr.regex_command_parser import Configuration
from gpsr.states import CommandParserStateMachine

Expand Down Expand Up @@ -34,7 +34,7 @@ def main():
command_parser_sm.execute()
parsed_command: Dict = command_parser_sm.userdata.parsed_command
rospy.loginfo(f"Parsed command: {parsed_command}")
sm = build_state_machine(parsed_command)
sm = GPSRStateMachineFactory(parsed_command).build_state_machine()
sm.execute()


Expand Down
6 changes: 3 additions & 3 deletions tasks/gpsr/src/gpsr/regex_command_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ def list_to_regex(list: List[str], key: Union[str, None] = None):
"greet": ["greet", "salute", "say hello to", "introduce yourself to"],
# "remember": ["meet", "contact", "get to know", "get acquainted with"], <--- LOOKS UNUSED
"count": ["tell me how many"],
"describe": ["tell me how", "describe"],
"offer": ["offer"],
# "describe": ["tell me how", "describe"], <---- LOOKS UNUSED
# "offer": ["offer"], <---- LOOKS UNUSED
"follow": ["follow"],
"guide": ["guide", "escort", "take", "lead"],
"accompany": ["accompany"],
# "accompany": ["accompany"], <---- LOOKS UNUSED
}


Expand Down
213 changes: 143 additions & 70 deletions tasks/gpsr/src/gpsr/state_machine_factory.py
Original file line number Diff line number Diff line change
@@ -1,78 +1,151 @@
#!/usr/bin/env python3
import rospy
import smach
from smach_ros import ServiceState
from typing import Dict, List
from lasr_skills import GoToLocation, FindNamedPerson
from lasr_skills import GoToLocation, FindNamedPerson # type: ignore
from gpsr.states import Talk

STATE_COUNT = 0


def increment_state_count() -> int:
global STATE_COUNT
STATE_COUNT += 1
return STATE_COUNT


def build_state_machine(parsed_command: Dict) -> smach.StateMachine:
"""Constructs the parameterized state machine for the GPSR task,
given the parsed command.
Args:
parsed_command (Dict): parsed command.
Returns:
smach.StateMachine: paramaterized state machine ready to be executed.
"""
command_verbs: List[str] = parsed_command["commands"]
command_params: List[Dict] = parsed_command["command_params"]
sm = smach.StateMachine(outcomes=["succeeded", "failed"])
with sm:
for command_verb, command_param in zip(command_verbs, command_params):
if command_verb == "greet":
if "name" in command_param:
location_param_room = (
f"/gpsr/arena/rooms/{command_param['location']}"
)
location_param_pose = f"{location_param_room}/pose"
sm.add(
f"STATE_{increment_state_count()}",
GoToLocation(location_param=location_param_pose),
transitions={
"succeeded": f"STATE_{STATE_COUNT + 1}",
"failed": "failed",
},
)
sm.add(
f"STATE_{increment_state_count()}",
FindNamedPerson(
name=command_param["name"],
location_param=location_param_room,
),
transitions={
"succeeded": f"STATE_{STATE_COUNT + 1}",
"failed": "failed",
},
)
elif "clothes" in command_param:
pass
else:
raise ValueError(
"Greet command received with no name or clothes in command parameters"
)
elif command_verb == "talk":
if "gesture" in command_param:
pass
elif "talk" in command_param:
sm.add(
f"STATE_{increment_state_count()}",
Talk(command_param["talk"]),
transitions={"succeeded": "succeeded", "failed": "failed"},
)

class GPSRStateMachineFactory:
def __init__(self, parsed_command: Dict):
"""Stores the parsed command, initializes the state count,
and initalises the state machine.
Args:
parsed_command (Dict): parsed command output by the
command parser.
"""
self.state_count: int = 0
self.parsed_command: dict = parsed_command
self.sm = smach.StateMachine(outcomes=["succeeded", "failed"])

def _increment_state_count(self):
self.state_count += 1
return self.state_count

def _handle_take_command(self, command_param: Dict):
raise NotImplementedError("Take command not implemented")

def _handle_place_command(self, command_param: Dict):
raise NotImplementedError("Place command not implemented")

def _handle_deliver_command(self, command_param: Dict):
raise NotImplementedError("Deliver command not implemented")

def _handle_bring_command(self, command_param: Dict):
raise NotImplementedError("Bring command not implemented")

def _handle_go_command(self, command_param: Dict):
raise NotImplementedError("Go command not implemented")

def _handle_find_command(self, command_param: Dict):
raise NotImplementedError("Find command not implemented")

def _handle_talk_command(self, command_param: Dict):
if "gesture" in command_param:
raise NotImplementedError("Talk command with gesture not implemented")
elif "text" in command_param:
self.sm.add(
f"STATE_{self._increment_state_count()}",
Talk(text=command_param["text"]),
transitions={
"succeeded": f"STATE_{self.state_count + 1}",
"failed": "failed",
},
)
else:
raise ValueError(
"Talk command received with no text or gesture in command parameters"
)

def _handle_answer_command(self, command_param: Dict):
raise NotImplementedError("Answer command not implemented")

def _handle_meet_command(self, command_param: Dict):
raise NotImplementedError("Meet command not implemented")

def _handle_tell_command(self, command_param: Dict):
raise NotImplementedError("Tell command not implemented")

def _handle_greet_command(self, command_param: Dict):
if "name" in command_param:
location_param_room = f"/gpsr/arena/rooms/{command_param['location']}"
location_param_pose = f"{location_param_room}/pose"
self.sm.add(
f"STATE_{self._increment_state_count()}",
GoToLocation(location_param=location_param_pose),
transitions={
"succeeded": f"STATE_{self.state_count + 1}",
"failed": "failed",
},
)
self.sm.add(
f"STATE_{self._increment_state_count()}",
FindNamedPerson(
name=command_param["name"],
location_param=location_param_room,
),
transitions={
"succeeded": f"STATE_{self.state_count + 1}",
"failed": "failed",
},
)
elif "clothes" in command_param:
raise NotImplementedError("Greet command with clothes not implemented")
else:
raise ValueError(
"Greet command received with no name or clothes in command parameters"
)

def _handle_count_command(self, command_params: Dict):
raise NotImplementedError("Count command not implemented")

def _handle_follow_command(self, command_params: Dict):
raise NotImplementedError("Follow command not implemented")

def _handle_guide_command(self, command_params: Dict):
raise NotImplementedError("Guide command not implemented")

def build_state_machine(self) -> smach.StateMachine:
with self.sm:
command_verbs: List[str] = self.parsed_command["commands"]
command_params: List[Dict] = self.parsed_command["command_params"]
for command_verb, command_param in zip(command_verbs, command_params):
if command_verb == "take":
self._handle_take_command(command_param)
elif command_verb == "place":
self._handle_place_command(command_param)
elif command_verb == "deliver":
self._handle_deliver_command(command_param)
elif command_verb == "bring":
self._handle_bring_command(command_param)
elif command_verb == "go":
self._handle_go_command(command_param)
elif command_verb == "find":
self._handle_find_command(command_param)
elif command_verb == "talk":
self._handle_talk_command(command_param)
elif command_verb == "answer":
self._handle_answer_command(command_param)
elif command_verb == "meet":
self._handle_meet_command(command_param)
elif command_verb == "tell":
self._handle_tell_command(command_param)
elif command_verb == "greet":
self._handle_greet_command(command_param)
elif command_verb == "count":
self._handle_count_command(command_param)
elif command_verb == "follow":
self._handle_follow_command(command_param)
elif command_verb == "guide":
self._handle_guide_command(command_param)
else:
raise ValueError(
"Talk command received with no gesture or talk in command parameters"
)
raise ValueError(f"Invalid command verb: {command_verb}")

self.sm.add(
f"STATE_{self.state_count}",
Talk(text="I have completed the task."),
transitions={"succeeded": "succeeded", "failed": "failed"},
)

return sm
return self.sm

0 comments on commit 7195e86

Please sign in to comment.