diff --git a/common/navigation/lasr_person_following/nodes/person_following.py b/common/navigation/lasr_person_following/nodes/person_following.py
index 6c2731e4a..55c467809 100644
--- a/common/navigation/lasr_person_following/nodes/person_following.py
+++ b/common/navigation/lasr_person_following/nodes/person_following.py
@@ -31,7 +31,7 @@ def __init__(self) -> None:
self._server.start()
def _execute_cb(self, _: FollowGoal) -> None:
- while not self._follower.begin_tracking():
+ while not self._follower.begin_tracking(ask=False):
rospy.logwarn("No people found, retrying...")
rospy.sleep(rospy.Duration(1.0))
warnings.warn(
@@ -48,6 +48,6 @@ def _preempt_cb(self) -> None:
if __name__ == "__main__":
rospy.init_node("person_following_server")
- rospy.loginfo("Person following server started")
server = PersonFollowingServer()
+ rospy.loginfo("Person following server started")
rospy.spin()
diff --git a/common/navigation/lasr_person_following/src/lasr_person_following/person_following.py b/common/navigation/lasr_person_following/src/lasr_person_following/person_following.py
index 927c74fea..b6c35f807 100644
--- a/common/navigation/lasr_person_following/src/lasr_person_following/person_following.py
+++ b/common/navigation/lasr_person_following/src/lasr_person_following/person_following.py
@@ -8,6 +8,7 @@
PoseStamped,
Quaternion,
PoseArray,
+ Point,
)
from typing import Union, List
@@ -41,14 +42,12 @@ class PersonFollower:
# Parameters
_start_following_radius: float
_start_following_angle: float
- _min_distance_between_tracks: float
- _n_secs_static: float
- _min_time_between_goals: float
+ _n_secs_static_finished: float
+ _n_secs_static_plan_close: float
_new_goal_threshold: float
_stopping_distance: float
# State
- _latest_tracks: Union[None, PersonArray]
_track_id: Union[None, int]
# Action clients
@@ -65,67 +64,61 @@ class PersonFollower:
_buffer: tf.Buffer
_listener: tf.TransformListener
- # Subscribers
- _track_sub: rospy.Subscriber
-
# Publishers
- _goal_pose_pub: rospy.Publisher
+ _person_trajectory_pub: rospy.Publisher
def __init__(
self,
start_following_radius: float = 2.0,
start_following_angle: float = 45.0,
- min_distance_between_tracks: float = 0.5,
- n_secs_static: float = 15.0,
- min_time_between_goals: float = 1.0,
+ n_secs_static_finished: float = 10.0,
+ n_secs_static_plan_close: float = 5.0,
new_goal_threshold: float = 0.5,
stopping_distance: float = 1.0,
):
self._start_following_radius = start_following_radius
self._start_following_angle = start_following_angle
- self._min_distance_between_tracks = min_distance_between_tracks
- self._n_secs_static = n_secs_static
- self._min_time_between_goals = min_time_between_goals
+ self._n_secs_static_finished = n_secs_static_finished
+ self._n_secs_static_plan_close = n_secs_static_plan_close
self._new_goal_threshold = new_goal_threshold
self._stopping_distance = stopping_distance
- self._latest_tracks = None
self._track_id = None
self._move_base_client = actionlib.SimpleActionClient(
"move_base", MoveBaseAction
)
+ if not self._move_base_client.wait_for_server(rospy.Duration.from_sec(10.0)):
+ rospy.logwarn("Move base client not available")
- self._buffer = tf.Buffer(cache_time=rospy.Duration(10.0))
+ self._buffer = tf.Buffer(cache_time=rospy.Duration.from_sec(10.0))
self._listener = tf.TransformListener(self._buffer)
- self._goal_pose_pub = rospy.Publisher(
- "/follow_poses", PoseArray, queue_size=1000000, latch=True
+ rospy.wait_for_service(
+ "/move_base/make_plan", timeout=rospy.Duration.from_sec(10.0)
)
+ self._make_plan = rospy.ServiceProxy("/move_base/make_plan", GetPlan)
- self._track_sub = rospy.Subscriber(
- "/people_tracked", PersonArray, self._track_callback
+ self._person_trajectory_pub = rospy.Publisher(
+ "/person_trajectory", PoseArray, queue_size=1, latch=True
)
- """
- Apparently, /move_base/NavfnROS/make_plan can be called while move_base is active https://github.com/ros-planning/navigation/issues/12
- """
- rospy.wait_for_service("/move_base/make_plan")
- self._make_plan = rospy.ServiceProxy("/move_base/make_plan", GetPlan)
-
- self._tts_client = actionlib.SimpleActionClient("/tts", TtsAction)
+ self._tts_client = actionlib.SimpleActionClient("tts", TtsAction)
self._tts_client_available = self._tts_client.wait_for_server(
- rospy.Duration(10.0)
+ rospy.Duration.from_sec(10.0)
)
+ if not self._tts_client_available:
+ rospy.logwarn("TTS client not available")
self._transcribe_speech_client = actionlib.SimpleActionClient(
"transcribe_speech", TranscribeSpeechAction
)
self._transcribe_speech_client_available = (
- self._transcribe_speech_client.wait_for_server(rospy.Duration(10.0))
+ self._transcribe_speech_client.wait_for_server(
+ rospy.Duration.from_sec(10.0)
+ )
)
-
- def _track_callback(self, msg: PersonArray) -> None:
- self._latest_tracks = msg
+ if not self._transcribe_speech_client_available:
+ rospy.logwarn("Transcribe speech client not available")
def _tf_pose(self, pose: PoseStamped, target_frame: str):
trans = self._buffer.lookup_transform(
@@ -133,37 +126,39 @@ def _tf_pose(self, pose: PoseStamped, target_frame: str):
)
return do_transform_pose(pose, trans)
- def _robot_pose_in_odom(self) -> PoseStamped:
- current_pose: PoseWithCovarianceStamped = rospy.wait_for_message(
- "/amcl_pose", PoseWithCovarianceStamped
- )
+ def _robot_pose_in_odom(self) -> Union[PoseStamped, None]:
+ try:
+ current_pose: PoseWithCovarianceStamped = rospy.wait_for_message(
+ "/amcl_pose", PoseWithCovarianceStamped
+ )
+ except AttributeError:
+ return None
+
current_pose_stamped = PoseStamped(
pose=current_pose.pose.pose, header=current_pose.header
)
return self._tf_pose(current_pose_stamped, "odom")
- def begin_tracking(self) -> bool:
+ def begin_tracking(self, ask: bool = False) -> bool:
"""
Chooses the closest person as the target
"""
- while self._latest_tracks is None and not rospy.is_shutdown():
- rospy.loginfo("Waiting for people to be tracked")
- rospy.sleep(1)
-
- if self._latest_tracks is None:
- rospy.loginfo("No people to track")
- return False
- people: PersonArray = self._latest_tracks
+ tracks: PersonArray = rospy.wait_for_message("/people_tracked", PersonArray)
+ people: List[Person] = tracks.people
- if len(people.people) == 0:
+ if len(people) == 0:
return False
min_dist: float = np.inf
closest_person: Union[None, Person] = None
robot_pose: PoseStamped = self._robot_pose_in_odom()
- for person in people.people:
+
+ if robot_pose is None:
+ return False
+
+ for person in people:
dist: float = self._euclidian_distance(person.pose, robot_pose.pose)
face_quat: Quaternion = self._compute_face_quat(
@@ -186,24 +181,12 @@ def begin_tracking(self) -> bool:
if not closest_person:
return False
- # Face the person
- pose: PoseStamped = PoseStamped(header=people.header)
- pose.pose = robot_pose.pose
- pose.pose.orientation = self._compute_face_quat(
- robot_pose.pose, closest_person.pose
- )
- pose.header.stamp = rospy.Time.now()
- pose = self._tf_pose(pose, "map")
- self._move_base(pose)
+ if ask:
+ if self._tts_client_available and self._transcribe_speech_client_available:
- if self._tts_client_available:
- # Ask if we should follow
- tts_goal: TtsGoal = TtsGoal()
- tts_goal.rawtext.text = "Should I follow you? Please answer yes or no"
- tts_goal.rawtext.lang_id = "en_GB"
- self._tts_client.send_goal_and_wait(tts_goal)
+ # Ask if we should follow
+ self._tts("Should I follow you? Please answer yes or no", wait=True)
- if self._transcribe_speech_client_available:
# listen
self._transcribe_speech_client.send_goal_and_wait(
TranscribeSpeechGoal()
@@ -213,12 +196,7 @@ def begin_tracking(self) -> bool:
if "yes" not in transcription.lower():
return False
- tts_goal: TtsGoal = TtsGoal()
- tts_goal.rawtext.text = (
- "Okay, I'll begin following you. Please walk slowly."
- )
- tts_goal.rawtext.lang_id = "en_GB"
- self._tts_client.send_goal_and_wait(tts_goal)
+ self._tts("I will follow you", wait=False)
self._track_id = closest_person.id
@@ -227,12 +205,9 @@ def begin_tracking(self) -> bool:
def _recover_track(self) -> bool:
if self._tts_client_available:
- tts_goal: TtsGoal = TtsGoal()
- tts_goal.rawtext.text = "I've lost you, please come back"
- tts_goal.rawtext.lang_id = "en_GB"
- self._tts_client.send_goal_and_wait(tts_goal)
+ self._tts("I lost track of you, please come back", wait=True)
- while not self.begin_tracking() and not rospy.is_shutdown():
+ while not self.begin_tracking(ask=True) and not rospy.is_shutdown():
rospy.loginfo("Recovering track...")
rospy.sleep(1)
@@ -272,15 +247,11 @@ def _cancel_goal(self) -> None:
GoalStatus.ACTIVE,
]:
self._move_base_client.cancel_goal()
+ self._move_base_client.wait_for_result()
def _check_finished(self) -> bool:
if self._tts_client_available:
- tts_goal: TtsGoal = TtsGoal()
- tts_goal.rawtext.text = (
- "Should I stop following you? Please answer yes or no"
- )
- tts_goal.rawtext.lang_id = "en_GB"
- self._tts_client.send_goal_and_wait(tts_goal)
+ self._tts("Have we arrived?", wait=True)
if self._transcribe_speech_client_available:
self._transcribe_speech_client.send_goal_and_wait(
@@ -288,15 +259,15 @@ def _check_finished(self) -> bool:
)
transcription = self._transcribe_speech_client.get_result().sequence
- if "yes" in transcription.lower():
- return True
+ return "yes" in transcription.lower()
- return False
+ return True
return True
def _get_pose_on_path(
self, start_pose: PoseStamped, goal_pose: PoseStamped, dist_to_goal: float
) -> Union[None, PoseStamped]:
+ start: rospy.Time = rospy.Time.now()
assert (
start_pose.header.frame_id == goal_pose.header.frame_id
), "Start and goal poses must be in the same frame"
@@ -306,7 +277,8 @@ def _get_pose_on_path(
# make a plan from start to goal
try:
plan = self._make_plan(start_pose, goal_pose, self._stopping_distance).plan
- except rospy.ServiceException:
+ except rospy.ServiceException as e:
+ rospy.loginfo(e)
return chosen_pose
# select a pose that is dist_to_goal away from the goal_pose
@@ -314,58 +286,55 @@ def _get_pose_on_path(
if self._euclidian_distance(pose.pose, goal_pose.pose) >= dist_to_goal:
chosen_pose = pose
break
+ end: rospy.Time = rospy.Time.now()
+
+ rospy.logwarn(f"Time taken to find a plan: {end.secs - start.secs} seconds")
return chosen_pose
def _move_base(self, pose: PoseStamped) -> MoveBaseGoal:
goal: MoveBaseGoal = MoveBaseGoal()
goal.target_pose = pose
- goal.target_pose.header.stamp = rospy.Time.now()
self._move_base_client.send_goal(goal)
return goal
- def follow(self) -> None:
- """
- The general loop should be:
- - Get persons pose
- - Go to pose - stopping_distance
- - If person moves significantly, cancel goal and go to new pose
- - If person's pose is too close to the previous pose, skip
- - If person is too close to the robot, face them and skip
- - If we are currently executing a goal, it is too soon to send a new one, and the person has not moved significantly, skip
- - If the person has been static for n_secs_static seconds, check if they are finished
+ def _tts(self, text: str, wait: bool) -> None:
+ if self._tts_client_available:
+ tts_goal: TtsGoal = TtsGoal()
+ tts_goal.rawtext.text = text
+ tts_goal.rawtext.lang_id = "en_GB"
+ if wait:
+ self._tts_client.send_goal_and_wait(tts_goal)
+ else:
+ self._tts_client.send_goal(tts_goal)
- """
+ def follow(self) -> None:
+ person_trajectory: PoseArray = PoseArray()
+ person_trajectory.header.frame_id = "odom"
prev_track: Union[None, Person] = None
- last_track_time: Union[None, rospy.Time] = None
- prev_goal: Union[None, MoveBaseActionGoal] = None
+ last_goal_time: Union[None, rospy.Time] = None
+ going_to_person: bool = False
while not rospy.is_shutdown():
- # No tracks, so skip
- if self._latest_tracks is None:
- rospy.loginfo("No tracks, skipping")
- continue
+ tracks: PersonArray = rospy.wait_for_message("/people_tracked", PersonArray)
+
# Get the most recent track of the person we are following
track = next(
- filter(
- lambda track: track.id == self._track_id, self._latest_tracks.people
- ),
+ filter(lambda track: track.id == self._track_id, tracks.people),
None,
)
- # If we have lost track of the person, finish executing the curret goal and recover the track
if track is None:
rospy.loginfo("Lost track of person, recovering...")
- self._move_base_client.wait_for_result()
+ self._cancel_goal()
+ person_trajectory = PoseArray()
self._recover_track()
+ prev_track = None
continue
- # Get the robot's pose in the odom frame
- robot_pose = self._robot_pose_in_odom()
-
# Distance to the previous pose
dist_to_prev = (
self._euclidian_distance(track.pose, prev_track.pose)
@@ -373,75 +342,50 @@ def follow(self) -> None:
else np.inf
)
- # Distance to the goal
- dist_to_goal = self._euclidian_distance(robot_pose.pose, track.pose)
-
- # Check if the person is too close to the previous pose
- if dist_to_prev < self._min_distance_between_tracks:
- rospy.loginfo("Person too close to previous, skipping")
- # prev_track = track
- if last_track_time is not None:
- if (
- rospy.Time.now().secs - last_track_time.secs
- > self._n_secs_static
- ):
- if self._check_finished():
- break
- else:
- last_track_time = None
- continue
- continue
-
- # Check if the person is too close to the robot
-
- if dist_to_goal < self._stopping_distance:
- rospy.loginfo("Person too close to robot, facing them and skipping")
- pose = PoseStamped(header=self._latest_tracks.header)
- pose.pose = robot_pose.pose
- pose.pose.orientation = self._compute_face_quat(
- robot_pose.pose, track.pose
- )
- pose.header.stamp = rospy.Time.now()
- pose = self._tf_pose(pose, "map")
- prev_goal = self._move_base(pose)
- # prev_track = track
- continue
-
- time_since_last_goal: float = (
- rospy.Time.now().secs - prev_goal.target_pose.header.stamp.secs
- if prev_goal is not None
- else np.inf
- )
-
# Check if the person has moved significantly
- if dist_to_prev > self._new_goal_threshold:
- rospy.loginfo("Person has moved significantly, cancelling goal")
- self._cancel_goal()
- # Check if we are currently executing a goal, it is too soon to send a new one, and the person has not moved significantly
- if (
- self._move_base_client.get_state()
- in [
- GoalStatus.PENDING,
- GoalStatus.ACTIVE,
- ]
- and time_since_last_goal < self._min_time_between_goals
- ):
- rospy.loginfo("Currently executing a goal, skipping")
- prev_track = track
- last_track_time = rospy.Time.now()
- continue
+ if dist_to_prev >= self._new_goal_threshold:
- goal_pose: PoseStamped = self._get_pose_on_path(
- self._tf_pose(robot_pose, "map"),
- self._tf_pose(
- PoseStamped(pose=track.pose, header=self._latest_tracks.header),
+ goal_pose = self._tf_pose(
+ PoseStamped(pose=track.pose, header=tracks.header),
"map",
- ),
- self._stopping_distance,
- )
- if goal_pose is None:
- rospy.loginfo("Failed to find a plan, skipping")
- continue
- prev_goal = self._move_base(goal_pose)
- prev_track = track
- last_track_time = rospy.Time.now()
+ )
+ self._move_base(goal_pose)
+ prev_track = track
+ last_goal_time = rospy.Time.now()
+ person_trajectory.poses.append(track.pose)
+ person_trajectory.header.stamp = rospy.Time.now()
+ self._person_trajectory_pub.publish(person_trajectory)
+ going_to_person = False
+ elif last_goal_time is not None:
+ delta_t: float = (rospy.Time.now() - last_goal_time).to_sec()
+ if (
+ self._n_secs_static_plan_close
+ <= delta_t
+ < self._n_secs_static_finished
+ ) and not going_to_person:
+ self._cancel_goal()
+ goal_pose = self._get_pose_on_path(
+ self._tf_pose(self._robot_pose_in_odom(), "map"),
+ self._tf_pose(
+ PoseStamped(pose=track.pose, header=tracks.header),
+ "map",
+ ),
+ self._stopping_distance,
+ )
+ if goal_pose is not None:
+ self._move_base(goal_pose)
+ going_to_person = True
+ else:
+ rospy.logwarn("Could not find a path to the person")
+ elif delta_t >= self._n_secs_static_finished:
+ rospy.loginfo(
+ "Person has been static for too long, checking if we are finished"
+ )
+ self._move_base_client.wait_for_result()
+ if self._check_finished():
+ rospy.loginfo("Finished following person")
+ break
+ else:
+ rospy.loginfo("Person is not finished, continuing")
+ last_goal_time = None
+ continue
diff --git a/common/vision/lasr_vision_feature_extraction/src/lasr_vision_feature_extraction/__init__.py b/common/vision/lasr_vision_feature_extraction/src/lasr_vision_feature_extraction/__init__.py
index cd9e00203..8b1704c56 100644
--- a/common/vision/lasr_vision_feature_extraction/src/lasr_vision_feature_extraction/__init__.py
+++ b/common/vision/lasr_vision_feature_extraction/src/lasr_vision_feature_extraction/__init__.py
@@ -563,8 +563,8 @@ def predict_frame(
).describe()
result = {
- "attributes": {**rst_person["attributes"], **rst_cloth["attributes"]},
- "description": rst_person["description"] + rst_cloth["description"],
+ **rst_person,
+ **rst_cloth,
}
result = json.dumps(result, indent=4)
diff --git a/common/vision/lasr_vision_feature_extraction/src/lasr_vision_feature_extraction/image_with_masks_and_attributes.py b/common/vision/lasr_vision_feature_extraction/src/lasr_vision_feature_extraction/image_with_masks_and_attributes.py
index c0f07947c..235ba7a4c 100644
--- a/common/vision/lasr_vision_feature_extraction/src/lasr_vision_feature_extraction/image_with_masks_and_attributes.py
+++ b/common/vision/lasr_vision_feature_extraction/src/lasr_vision_feature_extraction/image_with_masks_and_attributes.py
@@ -91,116 +91,20 @@ def from_parent_instance(
)
def describe(self) -> dict:
- male = (
- self.attributes["Male"]
- > self.categories_and_attributes.thresholds_pred["Male"],
- self.attributes["Male"],
- )
- has_hair = (
- self.attributes["hair"]
- > self.categories_and_attributes.thresholds_pred["hair"],
- self.attributes["hair"],
- )
- hair_colour = _max_value_tuple(self.selective_attribute_dict["hair_colour"])
- hair_shape = _max_value_tuple(self.selective_attribute_dict["hair_shape"])
- facial_hair = _max_value_tuple(self.selective_attribute_dict["facial_hair"])
- # bangs = (
- # self.attributes['Bangs'] > self.categories_and_attributes.thresholds_pred['Bangs'],
- # self.attributes['Bangs'])
- hat = (
- self.attributes["Wearing_Hat"]
- > self.categories_and_attributes.thresholds_pred["Wearing_Hat"],
- self.attributes["Wearing_Hat"],
- )
- glasses = (
- self.attributes["Eyeglasses"]
- > self.categories_and_attributes.thresholds_pred["Eyeglasses"],
- self.attributes["Eyeglasses"],
- )
- earrings = (
- self.attributes["Wearing_Earrings"]
- > self.categories_and_attributes.thresholds_pred["Wearing_Earrings"],
- self.attributes["Wearing_Earrings"],
- )
- necklace = (
- self.attributes["Wearing_Necklace"]
- > self.categories_and_attributes.thresholds_pred["Wearing_Necklace"],
- self.attributes["Wearing_Necklace"],
- )
- necktie = (
- self.attributes["Wearing_Necktie"]
- > self.categories_and_attributes.thresholds_pred["Wearing_Necktie"],
- self.attributes["Wearing_Necktie"],
- )
-
- description = "This customer has "
- hair_colour_str = "None"
- hair_shape_str = "None"
- if has_hair[0]:
- hair_shape_str = ""
- if hair_shape[0] == "Straight_Hair":
- hair_shape_str = "straight"
- elif hair_shape[0] == "Wavy_Hair":
- hair_shape_str = "wavy"
- if hair_colour[0] == "Black_Hair":
- description += "black %s hair, " % hair_shape_str
- hair_colour_str = "black"
- elif hair_colour[0] == "Blond_Hair":
- description += "blond %s hair, " % hair_shape_str
- hair_colour_str = "blond"
- elif hair_colour[0] == "Brown_Hair":
- description += "brown %s hair, " % hair_shape_str
- hair_colour_str = "brown"
- elif hair_colour[0] == "Gray_Hair":
- description += "gray %s hair, " % hair_shape_str
- hair_colour_str = "gray"
-
- if (
- male
- ): # here 'male' is only used to determine whether it is confident to decide whether the person has beard
- if not facial_hair[0] == "No_Beard":
- description += "and has beard. "
-
- if hat[0] or glasses[0]:
- description += "I can also see this customer wearing "
- if hat[0] and glasses[0]:
- description += "a hat and a pair of glasses. "
- elif hat[0]:
- description += "a hat. "
- else:
- description += "a pair of glasses. "
-
- if earrings[0] or necklace[0] or necktie[0]:
- description += "This customer might also wear "
- wearables = []
- if earrings[0]:
- wearables.append("a pair of earrings")
- if necklace[0]:
- wearables.append("a necklace")
- if necktie[0]:
- wearables.append("a necktie")
- description += (
- ", ".join(wearables[:-2] + [" and ".join(wearables[-2:])]) + ". "
- )
-
- if description == "This customer has ":
- description = (
- "I didn't manage to get any attributes from this customer, I'm sorry."
- )
+ has_hair = self.attributes["hair"] - 0.5
+ hair_colour = _max_value_tuple(self.selective_attribute_dict["hair_colour"])[0]
+ hair_shape = _max_value_tuple(self.selective_attribute_dict["hair_shape"])[0]
+ facial_hair = 1 - self.attributes["No_Beard"] - 0.5
+ glasses = self.attributes["Eyeglasses"] - 0.5
+ hat = self.attributes["Wearing_Hat"] - 0.5
result = {
- "attributes": {
- "has_hair": has_hair[0],
- "hair_colour": hair_colour_str,
- "hair_shape": hair_shape_str,
- "facial_hair": facial_hair[0] != "No_Beard",
- "hat": hat[0],
- "glasses": glasses[0],
- "earrings": earrings[0],
- "necklace": necklace[0],
- "necktie": necktie[0],
- },
- "description": description,
+ "has_hair": has_hair,
+ "hair_colour": hair_colour,
+ "hair_shape": hair_shape,
+ "facial_hair": facial_hair,
+ "glasses": glasses,
+ "hat": hat,
}
return result
@@ -232,77 +136,46 @@ def from_parent_instance(
)
def describe(self) -> dict:
- top = (
- self.attributes["top"]
- > self.categories_and_attributes.thresholds_pred["top"]
- )
- down = (
- self.attributes["down"]
- > self.categories_and_attributes.thresholds_pred["down"]
- )
- dress = (
- self.attributes["dress"]
- > self.categories_and_attributes.thresholds_pred["dress"]
- )
- outwear = (
- self.attributes["outwear"]
- > self.categories_and_attributes.thresholds_pred["outwear"]
- )
-
result = {
- # not in a loop for now, likely to add more logic combined with a classifier of more specific cloth classes.
- "attributes": {},
- "description": "this descrcription will be completed if we find out it is better to do it here.",
+ "top": self.attributes["top"] / 2,
+ "outwear": self.attributes["outwear"] / 2,
+ "dress": self.attributes["dress"] / 2,
}
+ max_prob = 0.0
+ max_attribute = "short sleeve top"
+ for attribute in ["short sleeve top", "long sleeve top", "vest", "sling"]:
+ if self.attributes[attribute] > max_prob:
+ max_prob = self.attributes[attribute]
+ max_attribute = attribute
+ if max_attribute in ["vest", "sling"]:
+ max_attribute = "sleeveless top"
+ result["max_top"] = max_attribute
+
+ max_prob = 0.0
+ max_attribute = "short sleeve outwear"
for attribute in [
- "short sleeve top",
- "long sleeve top",
"short sleeve outwear",
"long sleeve outwear",
+ ]:
+ if self.attributes[attribute] > max_prob:
+ max_prob = self.attributes[attribute]
+ max_attribute = attribute
+ result["max_outwear"] = max_attribute
+
+ max_prob = 0.0
+ max_attribute = "short sleeve dress"
+ for attribute in [
"short sleeve dress",
"long sleeve dress",
"vest dress",
"sling dress",
- "sleeveless top",
]:
- result["attributes"][attribute] = False
-
- if top:
- max_prob = 0.0
- max_attribute = "short sleeve top"
- for attribute in ["short sleeve top", "long sleeve top", "vest", "sling"]:
- if self.attributes[attribute] > max_prob:
- max_prob = self.attributes[attribute]
- max_attribute = attribute
- if max_attribute in ["vest", "sling"]:
- max_attribute = "sleeveless top"
- result["attributes"][max_attribute] = True
-
- if outwear:
- max_prob = 0.0
- max_attribute = "short sleeve outwear"
- for attribute in [
- "short sleeve outwear",
- "long sleeve outwear",
- ]:
- if self.attributes[attribute] > max_prob:
- max_prob = self.attributes[attribute]
- max_attribute = attribute
- result["attributes"][max_attribute] = True
-
- if dress:
- max_prob = 0.0
- max_attribute = "short sleeve dress"
- for attribute in [
- "short sleeve dress",
- "long sleeve dress",
- "vest dress",
- "sling dress",
- ]:
- if self.attributes[attribute] > max_prob:
- max_prob = self.attributes[attribute]
- max_attribute = attribute
- result["attributes"][max_attribute] = True
+ if self.attributes[attribute] > max_prob:
+ max_prob = self.attributes[attribute]
+ max_attribute = attribute
+ if max_attribute in ["vest dress", "sling dress"]:
+ max_attribute = "sleeveless dress"
+ result["max_dress"] = max_attribute
return result
diff --git a/skills/launch/unit_test_describe_people.launch b/skills/launch/unit_test_describe_people.launch
index 8ef21961f..649279233 100644
--- a/skills/launch/unit_test_describe_people.launch
+++ b/skills/launch/unit_test_describe_people.launch
@@ -4,15 +4,18 @@
-
-
-
-
+
+
+
+
+
+
+
+
+
-
-
diff --git a/skills/src/lasr_skills/describe_people.py b/skills/src/lasr_skills/describe_people.py
index 770fdf582..bd55698bf 100755
--- a/skills/src/lasr_skills/describe_people.py
+++ b/skills/src/lasr_skills/describe_people.py
@@ -5,6 +5,7 @@
import smach
import cv2_img
import numpy as np
+
from lasr_skills import Say
from lasr_vision_msgs.srv import (
YoloDetection,
@@ -15,6 +16,7 @@
from numpy2message import numpy2message
from .vision import GetCroppedImage, ImageMsgToCv2
import numpy as np
+from lasr_skills.validate_keypoints import ValidateKeypoints
class DescribePeople(smach.StateMachine):
@@ -42,13 +44,14 @@ def __init__(self):
object_name="person",
crop_method=crop_method,
rgb_topic=rgb_topic,
- use_mask=True,
+ use_mask=False,
),
transitions={
"succeeded": "CONVERT_IMAGE",
"failed": "SAY_GET_IMAGE_AGAIN",
},
)
+
smach.StateMachine.add(
"SAY_GET_IMAGE_AGAIN",
Say(
@@ -60,10 +63,14 @@ def __init__(self):
"aborted": "GET_IMAGE_AGAIN",
},
)
+
smach.StateMachine.add(
"GET_IMAGE_AGAIN",
GetCroppedImage(
- object_name="person", crop_method="closest", use_mask=True
+ object_name="person",
+ crop_method=crop_method,
+ rgb_topic=rgb_topic,
+ use_mask=False,
),
transitions={"succeeded": "CONVERT_IMAGE", "failed": "SAY_CONTINUE"},
)
diff --git a/skills/src/lasr_skills/say.py b/skills/src/lasr_skills/say.py
old mode 100644
new mode 100755
index 5e0bddb8d..99d9e7670
--- a/skills/src/lasr_skills/say.py
+++ b/skills/src/lasr_skills/say.py
@@ -1,45 +1,91 @@
import smach_ros
+import smach
+import rospy
+import os
-from pal_interaction_msgs.msg import TtsGoal, TtsAction, TtsText
+PUBLIC_CONTAINER: bool = False
+
+try:
+ from pal_interaction_msgs.msg import TtsGoal, TtsAction, TtsText
+except ImportError:
+ PUBLIC_CONTAINER = True
+
+SIMULATION: bool = "tiago" not in os.environ["ROS_MASTER_URI"]
from typing import Union
-class Say(smach_ros.SimpleActionState):
- def __init__(
- self, text: Union[str, None] = None, format_str: Union[str, None] = None
- ):
- if text is not None:
- super(Say, self).__init__(
- "tts",
- TtsAction,
- goal=TtsGoal(rawtext=TtsText(text=text, lang_id="en_GB")),
- )
- elif format_str is not None:
- super(Say, self).__init__(
- "tts",
- TtsAction,
- goal_cb=lambda ud, _: (
- TtsGoal(
- rawtext=TtsText(
- text=format_str.format(*ud.placeholders), lang_id="en_GB"
+if PUBLIC_CONTAINER or SIMULATION:
+
+ class Say(smach.State):
+
+ text: Union[str, None] = None
+ format_str: Union[str, None] = None
+
+ def __init__(
+ self, text: Union[str, None] = None, format_str: Union[str, None] = None
+ ):
+
+ super(Say, self).__init__(outcomes=["succeeded", "aborted", "preempted"])
+
+ self.text = text
+ self.format_str = format_str
+ if PUBLIC_CONTAINER:
+ rospy.logwarn(
+ "You are using the public container, the Say skill will not work"
+ )
+ elif SIMULATION:
+ rospy.logwarn(
+ "You are using the simulation, the Say skill will not work"
+ )
+
+ def execute(self, userdata):
+ if self.text is not None:
+ rospy.loginfo(self.text)
+ elif self.format_str is not None:
+ rospy.loginfo(self.format_str.format(*userdata.placeholders))
+ else:
+ rospy.loginfo(userdata.text)
+ return "succeeded"
+
+else:
+
+ class Say(smach_ros.SimpleActionState):
+ def __init__(
+ self, text: Union[str, None] = None, format_str: Union[str, None] = None
+ ):
+ if text is not None:
+ super(Say, self).__init__(
+ "tts",
+ TtsAction,
+ goal=TtsGoal(rawtext=TtsText(text=text, lang_id="en_GB")),
+ )
+ elif format_str is not None:
+ super(Say, self).__init__(
+ "tts",
+ TtsAction,
+ goal_cb=lambda ud, _: (
+ TtsGoal(
+ rawtext=TtsText(
+ text=format_str.format(*ud.placeholders),
+ lang_id="en_GB",
+ )
)
- )
- if isinstance(ud.placeholders, (list, tuple))
- else TtsGoal(
- rawtext=TtsText(
- text=format_str.format(ud.placeholders), lang_id="en_GB"
+ if isinstance(ud.placeholders, (list, tuple))
+ else TtsGoal(
+ rawtext=TtsText(
+ text=format_str.format(ud.placeholders), lang_id="en_GB"
+ )
)
- )
- ),
- input_keys=["placeholders"],
- )
- else:
- super(Say, self).__init__(
- "tts",
- TtsAction,
- goal_cb=lambda ud, _: TtsGoal(
- rawtext=TtsText(text=ud.text, lang_id="en_GB")
- ),
- input_keys=["text"],
- )
+ ),
+ input_keys=["placeholders"],
+ )
+ else:
+ super(Say, self).__init__(
+ "tts",
+ TtsAction,
+ goal_cb=lambda ud, _: TtsGoal(
+ rawtext=TtsText(text=ud.text, lang_id="en_GB")
+ ),
+ input_keys=["text"],
+ )
diff --git a/tasks/receptionist/scripts/main.py b/tasks/receptionist/scripts/main.py
index f93e2bc24..0bb75592c 100755
--- a/tasks/receptionist/scripts/main.py
+++ b/tasks/receptionist/scripts/main.py
@@ -39,6 +39,24 @@
"drink": "wine",
"dataset": "receptionist",
"confidence": 0.5,
+ "attributes": {
+ "has_hair": 0.5,
+ "hair_shape": "straight hair",
+ "hair_colour": "black hair",
+ "facial_hair": 0.5,
+ "earrings": 0,
+ "necklace": 0,
+ "necktie": 0,
+ # "height": "unknown",
+ "glasses": -0.5,
+ "hat": 0,
+ "dress": 0,
+ "top": 0.5,
+ "outwear": 0,
+ "max_dress": "unknown",
+ "max_top": "short sleeved top",
+ "max_outwear": "unknown",
+ },
},
)
diff --git a/tasks/receptionist/src/receptionist/state_machine.py b/tasks/receptionist/src/receptionist/state_machine.py
index c14cf13ab..b2da528af 100755
--- a/tasks/receptionist/src/receptionist/state_machine.py
+++ b/tasks/receptionist/src/receptionist/state_machine.py
@@ -39,7 +39,7 @@ def __init__(
}
self.userdata.guest_name = "zoe"
self.userdata.dataset = "receptionist"
- self.userdata.confidence = 0.2
+ self.userdata.confidence = 0.15
smach.StateMachine.add(
"SAY_START",
@@ -211,6 +211,27 @@ def __init__(
smach.StateMachine.add(
"GET_GUEST_ATTRIBUTES_GUEST_1",
GetGuestAttributes("guest1"),
+ transitions={
+ "succeeded": "SAY_LEARN_FACES",
+ "failed": "SAY_GET_GUEST_ATTRIBUTE_1_FAILED",
+ },
+ )
+
+ smach.StateMachine.add(
+ "SAY_GET_GUEST_ATTRIBUTE_1_FAILED",
+ Say(
+ text="Make sure you're looking into my eyes and facing me, I can't see you."
+ ),
+ transitions={
+ "succeeded": "GET_GUEST_ATTRIBUTES_GUEST_1_AGAIN",
+ "aborted": "GET_GUEST_ATTRIBUTES_GUEST_1_AGAIN",
+ "preempted": "GET_GUEST_ATTRIBUTES_GUEST_1_AGAIN",
+ },
+ )
+
+ smach.StateMachine.add(
+ "GET_GUEST_ATTRIBUTES_GUEST_1_AGAIN",
+ GetGuestAttributes("guest1"),
transitions={
"succeeded": "SAY_LEARN_FACES",
"failed": "SAY_LEARN_FACES",
@@ -505,6 +526,27 @@ def __init__(
smach.StateMachine.add(
"GET_GUEST_ATTRIBUTES_GUEST_2",
GetGuestAttributes("guest2"),
+ transitions={
+ "succeeded": "SAY_LEARN_FACES_GUEST_2",
+ "failed": "SAY_GET_GUEST_ATTRIBUTE_2_FAILED",
+ },
+ )
+
+ smach.StateMachine.add(
+ "SAY_GET_GUEST_ATTRIBUTE_2_FAILED",
+ Say(
+ text="Make sure you're looking into my eyes and facing me, I can't see you."
+ ),
+ transitions={
+ "succeeded": "GET_GUEST_ATTRIBUTES_GUEST_2_AGAIN",
+ "aborted": "GET_GUEST_ATTRIBUTES_GUEST_2_AGAIN",
+ "preempted": "GET_GUEST_ATTRIBUTES_GUEST_2_AGAIN",
+ },
+ )
+
+ smach.StateMachine.add(
+ "GET_GUEST_ATTRIBUTES_GUEST_2_AGAIN",
+ GetGuestAttributes("guest1"),
transitions={
"succeeded": "SAY_LEARN_FACES_GUEST_2",
"failed": "SAY_LEARN_FACES_GUEST_2",
diff --git a/tasks/receptionist/src/receptionist/states/find_and_look_at.py b/tasks/receptionist/src/receptionist/states/find_and_look_at.py
index f952c295b..61efd0977 100755
--- a/tasks/receptionist/src/receptionist/states/find_and_look_at.py
+++ b/tasks/receptionist/src/receptionist/states/find_and_look_at.py
@@ -90,10 +90,14 @@ def check_name(self, ud):
detection.name == ud.guest_data[self.guest_name_in]["name"]
and detection.confidence > ud.confidence
):
- rospy.loginfo(f"Detection {detection.name} has confidence {detection.confidence} > {ud.confidence}")
+ rospy.loginfo(
+ f"Detection {detection.name} has confidence {detection.confidence} > {ud.confidence}"
+ )
return "succeeded"
else:
- rospy.loginfo(f"Detection {detection.name} has confidence {detection.confidence} <= {ud.confidence}")
+ rospy.loginfo(
+ f"Detection {detection.name} has confidence {detection.confidence} <= {ud.confidence}"
+ )
return "failed"
def __init__(
diff --git a/tasks/receptionist/src/receptionist/states/get_attributes.py b/tasks/receptionist/src/receptionist/states/get_attributes.py
index ee8a864a3..6fcca9bc1 100644
--- a/tasks/receptionist/src/receptionist/states/get_attributes.py
+++ b/tasks/receptionist/src/receptionist/states/get_attributes.py
@@ -21,7 +21,7 @@ def execute(self, userdata: UserData) -> str:
return "failed"
userdata.guest_data[self._guest_id]["attributes"] = json.loads(
userdata.people[0]["features"]
- )["attributes"]
+ )
return "succeeded"
def __init__(
diff --git a/tasks/receptionist/src/receptionist/states/introduce.py b/tasks/receptionist/src/receptionist/states/introduce.py
index c184fd14a..cf71d1aed 100644
--- a/tasks/receptionist/src/receptionist/states/introduce.py
+++ b/tasks/receptionist/src/receptionist/states/introduce.py
@@ -8,7 +8,32 @@
import smach
from smach import UserData
from lasr_skills import Say
-from typing import Dict, Any, Optional
+from typing import Dict, List, Any, Optional
+
+
+def find_most_confident_clothes(
+ relevant_guest_data: Dict[str, Any], clothes: List[List[Any]]
+) -> List[Any]:
+ """Find the clothes it's most confident of, after determining the clothes type
+ through confidence values.
+
+ Args:
+ relevant_guest_data (Dict[str, Any]): guest data dictionary.
+ clothes List[List[Any]]: List of the clothes type and their confidence
+
+ Returns:
+ List: Maximum confidence and the relevant clothes
+ """
+ max_clothes_type, max_confidence = max(clothes, key=lambda c: c[1])
+
+ if max_clothes_type == "dress":
+ max_clothes = relevant_guest_data["attributes"]["max_dress"]
+ elif max_clothes_type == "top":
+ max_clothes = relevant_guest_data["attributes"]["max_top"]
+ else:
+ max_clothes = relevant_guest_data["attributes"]["max_outwear"]
+
+ return [max_confidence, max_clothes]
def stringify_guest_data(guest_data: Dict[str, Any], guest_id: str) -> str:
@@ -27,16 +52,22 @@ def stringify_guest_data(guest_data: Dict[str, Any], guest_id: str) -> str:
relevant_guest_data.setdefault(
"attributes",
{
- "has_hair": True,
+ "has_hair": 0,
"hair_shape": "unknown",
"hair_colour": "unknown",
- "facial_hair": "No_Beard",
- "earrings": "unknown",
- "necklace": "unknown",
- "necktie": "unknown",
- "height": "unknown",
- "glasses": False,
- "hat": False,
+ "facial_hair": 0,
+ "earrings": 0,
+ "necklace": 0,
+ "necktie": 0,
+ # "height": "unknown",
+ "glasses": 0,
+ "hat": 0,
+ "dress": 0,
+ "top": 0,
+ "outwear": 0,
+ "max_dress": "unknown",
+ "max_top": "unknown",
+ "max_outwear": "unknown",
},
)
@@ -44,101 +75,90 @@ def stringify_guest_data(guest_data: Dict[str, Any], guest_id: str) -> str:
guest_str += f"{relevant_guest_data['name']}, their favourite drink is {relevant_guest_data['drink']}. "
- known_attributes = {}
- print(relevant_guest_data["attributes"].items())
-
- for attribute, value in relevant_guest_data["attributes"].items():
- if value != "unknown" and value != False and value != "No_Beard":
- known_attributes[attribute] = value
- print("These are the known attributes")
- print(known_attributes)
+ clothes = ["dress", "top", "outwear"]
- has_hair = False
+ filtered_attributes = {}
- for attribute, value in known_attributes.items():
- if attribute == "has_hair":
- has_hair = True
- break
- if has_hair == False:
- guest_str += "They are bald."
+ filtered_attributes["hair"] = {
+ "confidence": relevant_guest_data["attributes"]["has_hair"],
+ "hair_shape": relevant_guest_data["attributes"]["hair_shape"],
+ "hair_colour": relevant_guest_data["attributes"]["hair_colour"],
+ }
+ most_confident_clothes = find_most_confident_clothes(
+ relevant_guest_data,
+ clothes=[
+ ["dress", relevant_guest_data["attributes"]["dress"]],
+ ["top", relevant_guest_data["attributes"]["top"]],
+ ["outwear", relevant_guest_data["attributes"]["outwear"]],
+ ],
+ )
+ filtered_attributes["clothes"] = {
+ "confidence": most_confident_clothes[0],
+ "attribute": most_confident_clothes[1],
+ }
- ignored_attributes = [
+ considered_attributes = [
+ "dress",
"top",
- "down",
"outwear",
- "dress",
- # "short sleeve top",
- # "long sleeve top",
- # "short sleeve outwear",
- # "long sleeve outwear",
- # "vest",
- # "sling",
- "shorts",
- "trousers",
- "skirt",
- # "short sleeve dress",
- # "long sleeve dress",
- # "vest dress",
- # "sling dress",
+ "max_dress",
+ "max_top",
+ "max_outwear",
+ "has_hair",
+ "hair_shape",
+ "hair_colour",
]
+ for attribute, value in relevant_guest_data["attributes"].items():
+ if attribute not in considered_attributes:
+ filtered_attributes[attribute] = {
+ "confidence": relevant_guest_data["attributes"][attribute],
+ "attribute": attribute,
+ }
+
+ sorted_attributes = sorted(
+ filtered_attributes.keys(),
+ key=lambda k: abs(filtered_attributes[k]["confidence"]),
+ reverse=True,
+ )
- ignored_attributes.append("male")
- ignored_attributes.append("has_hair")
-
- for attribute, value in known_attributes.items():
- detection = False # Whenever the an attribute is detected in the for loop, the detection flag is set to true
- # so that multiple attributes are not checked at the same time
- if attribute in ignored_attributes:
- detection = True
- if has_hair:
- if attribute == "hair_shape":
- guest_str += (
- f"Their hair is {relevant_guest_data['attributes'][attribute]}."
- )
- detection = True
- elif attribute == "hair_colour":
- guest_str += (
- f"They have {relevant_guest_data['attributes'][attribute]} hair."
- )
- detection = True
+ top_4_attributes = sorted_attributes[:4]
+
+ top_4_attributes = [
+ attr
+ for attr in top_4_attributes
+ if not (attr == "hair" and filtered_attributes[attr]["confidence"] < 0)
+ ]
+ if len(top_4_attributes) < 4:
+ top_4_attributes.append(sorted_attributes[4])
+
+ for i in range(len(top_4_attributes)):
+ attribute_name = top_4_attributes[i]
+ attribute_value = filtered_attributes[top_4_attributes[i]]
+ confidence = attribute_value["confidence"]
+
+ if attribute_name == "hair":
+ hair_shape = attribute_value["hair_shape"]
+ hair_colour = attribute_value["hair_colour"]
+ guest_str += f"They have {hair_shape} and {hair_colour} . "
+ elif attribute_name == "facial_hair":
+ if confidence < 0:
+ guest_str += "They don't have facial hair. "
+ else:
+ guest_str += "They have facial hair. "
else:
- if attribute == "hair_shape" or attribute == "hair_colour":
- detection = True
- if attribute == "facial_hair":
- guest_str += f"They have facial hair."
- detection = True
- # if relevant_guest_data["attributes"][attribute] != "No_Beard":
- # guest_str += f"They have facial hair."
- # else:
- # guest_str += f"They do not have facial hair."
- # if attribute == "glasses":
- # detection = True
- # if relevant_guest_data["attributes"][attribute]:
- # guest_str += f"They are wearing glasses."
- # else:
- # guest_str += f"They are not wearing glasses."
- # if attribute == "hat":
- # detection = True
- # if relevant_guest_data["attributes"][attribute]:
- # guest_str += f"They are wearing a hat."
- # else:
- # guest_str += f"They are not wearing a hat."
- if not detection:
- if isSingular(attribute):
- guest_str += f"They are wearing a {attribute}."
+ if confidence < 0:
+ if isSingular(attribute_value["attribute"]):
+ guest_str += (
+ f"They are not wearing a {attribute_value['attribute']}."
+ )
+ else:
+ guest_str += f"They are not wearing {attribute_value['attribute']}."
else:
- guest_str += f"They are wearing {attribute}."
-
- # if "attributes" in relevant_guest_data.keys():
-
- # guest_str += f"They have {relevant_guest_data['attributes']['hair_shape']} {relevant_guest_data['attributes']['hair_colour']} hair, and they "
- # guest_str += f"{'have facial hair' if relevant_guest_data['attributes']['facial_hair'] else 'do not have facial hair'}. "
- # guest_str += f"They are {'wearing glasses' if relevant_guest_data['attributes']['glasses'] else 'not wearing glasses'}, "
- # guest_str += f"{'wearing earrings' if relevant_guest_data['attributes']['earrings'] else 'not wearing earrings'}, and "
- # guest_str += f"{'wearing a hat' if relevant_guest_data['attributes']['hat'] else 'not wearing a hat'}. "
- # guest_str += f"They are {'wearing a necklace' if relevant_guest_data['attributes']['necklace'] else 'not wearing a necklace'}, and "
- # guest_str += f"{'wearing a necktie' if relevant_guest_data['attributes']['necktie'] else 'not wearing a necktie'}. "
+ if isSingular(attribute_value["attribute"]):
+ guest_str += f"They are wearing a {attribute_value['attribute']}."
+ else:
+ guest_str += f"They are wearing {attribute_value['attribute']}."
return guest_str