Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skeleton State Machine Factory #196

Merged
merged 10 commits into from
May 7, 2024
4 changes: 2 additions & 2 deletions tasks/gpsr/scripts/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import sys
from typing import Dict
from gpsr.load_known_data import GPSRDataLoader
from gpsr.state_machine_factory import build_state_machine
from gpsr.state_machine_factory import GPSRStateMachineFactory
from gpsr.regex_command_parser import Configuration
from gpsr.states import CommandParserStateMachine

Expand Down Expand Up @@ -34,7 +34,7 @@ def main():
command_parser_sm.execute()
parsed_command: Dict = command_parser_sm.userdata.parsed_command
rospy.loginfo(f"Parsed command: {parsed_command}")
sm = build_state_machine(parsed_command)
sm = GPSRStateMachineFactory(parsed_command).build_state_machine()
sm.execute()


Expand Down
6 changes: 3 additions & 3 deletions tasks/gpsr/src/gpsr/regex_command_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ def list_to_regex(list: List[str], key: Union[str, None] = None):
"greet": ["greet", "salute", "say hello to", "introduce yourself to"],
# "remember": ["meet", "contact", "get to know", "get acquainted with"], <--- LOOKS UNUSED
"count": ["tell me how many"],
"describe": ["tell me how", "describe"],
"offer": ["offer"],
# "describe": ["tell me how", "describe"], <---- LOOKS UNUSED
# "offer": ["offer"], <---- LOOKS UNUSED
"follow": ["follow"],
"guide": ["guide", "escort", "take", "lead"],
"accompany": ["accompany"],
# "accompany": ["accompany"], <---- LOOKS UNUSED
}


Expand Down
213 changes: 143 additions & 70 deletions tasks/gpsr/src/gpsr/state_machine_factory.py
Original file line number Diff line number Diff line change
@@ -1,78 +1,151 @@
#!/usr/bin/env python3
import rospy
import smach
from smach_ros import ServiceState
from typing import Dict, List
from lasr_skills import GoToLocation, FindNamedPerson
from lasr_skills import GoToLocation, FindNamedPerson # type: ignore
from gpsr.states import Talk

STATE_COUNT = 0


def increment_state_count() -> int:
global STATE_COUNT
STATE_COUNT += 1
return STATE_COUNT


def build_state_machine(parsed_command: Dict) -> smach.StateMachine:
"""Constructs the parameterized state machine for the GPSR task,
given the parsed command.

Args:
parsed_command (Dict): parsed command.

Returns:
smach.StateMachine: paramaterized state machine ready to be executed.
"""
command_verbs: List[str] = parsed_command["commands"]
command_params: List[Dict] = parsed_command["command_params"]
sm = smach.StateMachine(outcomes=["succeeded", "failed"])
with sm:
for command_verb, command_param in zip(command_verbs, command_params):
if command_verb == "greet":
if "name" in command_param:
location_param_room = (
f"/gpsr/arena/rooms/{command_param['location']}"
)
location_param_pose = f"{location_param_room}/pose"
sm.add(
f"STATE_{increment_state_count()}",
GoToLocation(location_param=location_param_pose),
transitions={
"succeeded": f"STATE_{STATE_COUNT + 1}",
"failed": "failed",
},
)
sm.add(
f"STATE_{increment_state_count()}",
FindNamedPerson(
name=command_param["name"],
location_param=location_param_room,
),
transitions={
"succeeded": f"STATE_{STATE_COUNT + 1}",
"failed": "failed",
},
)
elif "clothes" in command_param:
pass
else:
raise ValueError(
"Greet command received with no name or clothes in command parameters"
)
elif command_verb == "talk":
if "gesture" in command_param:
pass
elif "talk" in command_param:
sm.add(
f"STATE_{increment_state_count()}",
Talk(command_param["talk"]),
transitions={"succeeded": "succeeded", "failed": "failed"},
)

class GPSRStateMachineFactory:
def __init__(self, parsed_command: Dict):
"""Stores the parsed command, initializes the state count,
and initalises the state machine.

Args:
parsed_command (Dict): parsed command output by the
command parser.
"""
self.state_count: int = 0
self.parsed_command: dict = parsed_command
self.sm = smach.StateMachine(outcomes=["succeeded", "failed"])

def _increment_state_count(self):
self.state_count += 1
return self.state_count

def _handle_take_command(self, command_param: Dict):
raise NotImplementedError("Take command not implemented")

def _handle_place_command(self, command_param: Dict):
raise NotImplementedError("Place command not implemented")

def _handle_deliver_command(self, command_param: Dict):
raise NotImplementedError("Deliver command not implemented")

def _handle_bring_command(self, command_param: Dict):
raise NotImplementedError("Bring command not implemented")

def _handle_go_command(self, command_param: Dict):
raise NotImplementedError("Go command not implemented")

def _handle_find_command(self, command_param: Dict):
raise NotImplementedError("Find command not implemented")

def _handle_talk_command(self, command_param: Dict):
if "gesture" in command_param:
raise NotImplementedError("Talk command with gesture not implemented")
elif "text" in command_param:
self.sm.add(
f"STATE_{self._increment_state_count()}",
Talk(text=command_param["text"]),
transitions={
"succeeded": f"STATE_{self.state_count + 1}",
"failed": "failed",
},
)
else:
raise ValueError(
"Talk command received with no text or gesture in command parameters"
)

def _handle_answer_command(self, command_param: Dict):
raise NotImplementedError("Answer command not implemented")

def _handle_meet_command(self, command_param: Dict):
raise NotImplementedError("Meet command not implemented")

def _handle_tell_command(self, command_param: Dict):
raise NotImplementedError("Tell command not implemented")

def _handle_greet_command(self, command_param: Dict):
if "name" in command_param:
location_param_room = f"/gpsr/arena/rooms/{command_param['location']}"
location_param_pose = f"{location_param_room}/pose"
self.sm.add(
f"STATE_{self._increment_state_count()}",
GoToLocation(location_param=location_param_pose),
transitions={
"succeeded": f"STATE_{self.state_count + 1}",
"failed": "failed",
},
)
self.sm.add(
f"STATE_{self._increment_state_count()}",
FindNamedPerson(
name=command_param["name"],
location_param=location_param_room,
),
transitions={
"succeeded": f"STATE_{self.state_count + 1}",
"failed": "failed",
},
)
elif "clothes" in command_param:
raise NotImplementedError("Greet command with clothes not implemented")
else:
raise ValueError(
"Greet command received with no name or clothes in command parameters"
)

def _handle_count_command(self, command_params: Dict):
raise NotImplementedError("Count command not implemented")

def _handle_follow_command(self, command_params: Dict):
raise NotImplementedError("Follow command not implemented")

def _handle_guide_command(self, command_params: Dict):
raise NotImplementedError("Guide command not implemented")

def build_state_machine(self) -> smach.StateMachine:
with self.sm:
command_verbs: List[str] = self.parsed_command["commands"]
command_params: List[Dict] = self.parsed_command["command_params"]
for command_verb, command_param in zip(command_verbs, command_params):
if command_verb == "take":
self._handle_take_command(command_param)
elif command_verb == "place":
self._handle_place_command(command_param)
elif command_verb == "deliver":
self._handle_deliver_command(command_param)
elif command_verb == "bring":
self._handle_bring_command(command_param)
elif command_verb == "go":
self._handle_go_command(command_param)
elif command_verb == "find":
self._handle_find_command(command_param)
elif command_verb == "talk":
self._handle_talk_command(command_param)
elif command_verb == "answer":
self._handle_answer_command(command_param)
elif command_verb == "meet":
self._handle_meet_command(command_param)
elif command_verb == "tell":
self._handle_tell_command(command_param)
elif command_verb == "greet":
self._handle_greet_command(command_param)
elif command_verb == "count":
self._handle_count_command(command_param)
elif command_verb == "follow":
self._handle_follow_command(command_param)
elif command_verb == "guide":
self._handle_guide_command(command_param)
else:
raise ValueError(
"Talk command received with no gesture or talk in command parameters"
)
raise ValueError(f"Invalid command verb: {command_verb}")

self.sm.add(
f"STATE_{self.state_count}",
Talk(text="I have completed the task."),
transitions={"succeeded": "succeeded", "failed": "failed"},
)

return sm
return self.sm