diff --git a/common/helpers/navigation_helpers/src/navigation_helpers/__init__.py b/common/helpers/navigation_helpers/src/navigation_helpers/__init__.py index 969833b49..e04e20881 100644 --- a/common/helpers/navigation_helpers/src/navigation_helpers/__init__.py +++ b/common/helpers/navigation_helpers/src/navigation_helpers/__init__.py @@ -1,9 +1,18 @@ +import rospy + + from geometry_msgs.msg import ( Point, Pose, + PoseStamped, + Quaternion, ) +from nav_msgs.srv import GetPlan +from nav_msgs.msg import Path import numpy as np +import math +from scipy.spatial.transform import Rotation as R from itertools import permutations from typing import Union, List @@ -27,3 +36,49 @@ def min_hamiltonian_path(start: Pose, poses: List[Pose]) -> Union[None, List[Pos best_order = list(perm) return best_order + + +def get_pose_on_path( + p1: PoseStamped, p2: PoseStamped, dist_to_goal: float = 1.0, tolerance: float = 0.5 +) -> Union[None, PoseStamped]: + make_plan: rospy.ServiceProxy = rospy.ServiceProxy("/move_base/make_plan", GetPlan) + + chosen_pose: Union[None, PoseStamped] = None + + rospy.loginfo(f"Getting plan from {p1} to {p2}.") + + if p1.header.frame_id != p2.header.frame_id != "map": + rospy.loginfo( + f"Frames of reference are not 'map' ({p1.header.frame_id} and {p2.header.frame_id})." + ) + return chosen_pose + + try: + make_plan.wait_for_service(timeout=rospy.Duration.from_sec(10.0)) + except rospy.ROSException: + rospy.loginfo("Service /move_base/make_plan not available.") + return chosen_pose + + try: + plan: Path = make_plan(p1, p2, tolerance).plan + except rospy.ServiceException as e: + rospy.loginfo(e) + return chosen_pose + + rospy.loginfo(f"Got plan with {len(plan.poses)} poses.") + + if len(plan.poses) > 0: + for pose in reversed(plan.poses): + if euclidian_distance(pose.pose.position, p2.pose.position) >= dist_to_goal: + chosen_pose = pose + break + + return chosen_pose + + +def compute_face_quat(p1: Pose, p2: Pose) -> Quaternion: + dx: float = p2.position.x - p1.position.x + dy: float = p2.position.y - p1.position.y + theta_deg = np.degrees(math.atan2(dy, dx)) + x, y, z, w = R.from_euler("z", theta_deg, degrees=True).as_quat() + return Quaternion(x, y, z, w) diff --git a/common/speech/lasr_speech_recognition_whisper/nodes/transcribe_microphone_server b/common/speech/lasr_speech_recognition_whisper/nodes/transcribe_microphone_server index 0cfa544af..ced0ac5aa 100644 --- a/common/speech/lasr_speech_recognition_whisper/nodes/transcribe_microphone_server +++ b/common/speech/lasr_speech_recognition_whisper/nodes/transcribe_microphone_server @@ -87,6 +87,7 @@ class TranscribeSpeechAction(object): self._listening = False self._action_server.start() + rospy.loginfo(f"Speech Action server {self._action_name} started") def _configure_microphone(self) -> sr.Microphone: """Configures the microphone for listening to speech based on the @@ -332,8 +333,8 @@ def configure_model_params(config: dict) -> speech_model_params: model_params.mic_device = config["mic_device"] if config["no_warmup"]: model_params.warmup = False - if config["energy_threshold"]: - model_params.energy_threshold = config["energy_threshold"] + # if config["energy_threshold"]: + # model_params.energy_threshold = config["energy_threshold"] if config["pause_threshold"]: model_params.pause_threshold = config["pause_threshold"] diff --git a/common/speech/lasr_speech_recognition_whisper/scripts/microphone_tuning_test.py b/common/speech/lasr_speech_recognition_whisper/scripts/microphone_tuning_test.py index 806e801d0..13211e95b 100644 --- a/common/speech/lasr_speech_recognition_whisper/scripts/microphone_tuning_test.py +++ b/common/speech/lasr_speech_recognition_whisper/scripts/microphone_tuning_test.py @@ -28,6 +28,7 @@ def main(): args = parse_args() recognizer = sr.Recognizer() + recognizer.pause_threshold = 2 microphone = sr.Microphone(device_index=args["device_index"], sample_rate=16000) threshold = 100 recognizer.dynamic_energy_threshold = False @@ -39,7 +40,9 @@ def main(): while transcription_result != "": print(f"Listening...") with microphone as source: - wav_data = recognizer.listen(source).get_wav_data() + wav_data = recognizer.listen( + source, phrase_time_limit=10, timeout=5 + ).get_wav_data() print(f"Processing...") # Magic number 32768.0 is the maximum value of a 16-bit signed integer float_data = ( diff --git a/common/speech/lasr_speech_recognition_whisper/scripts/repeat_after_me.py b/common/speech/lasr_speech_recognition_whisper/scripts/repeat_after_me.py index 2e6b20622..cd401b9b7 100644 --- a/common/speech/lasr_speech_recognition_whisper/scripts/repeat_after_me.py +++ b/common/speech/lasr_speech_recognition_whisper/scripts/repeat_after_me.py @@ -18,6 +18,7 @@ if USE_ACTIONLIB: client = actionlib.SimpleActionClient("transcribe_speech", TranscribeSpeechAction) + rospy.loginfo("Waiting for server...") client.wait_for_server() repeating = False rospy.loginfo("Done waiting") diff --git a/common/speech/lasr_speech_recognition_whisper/scripts/test_microphones.py b/common/speech/lasr_speech_recognition_whisper/scripts/test_microphones.py index 921097691..2ce910f56 100644 --- a/common/speech/lasr_speech_recognition_whisper/scripts/test_microphones.py +++ b/common/speech/lasr_speech_recognition_whisper/scripts/test_microphones.py @@ -34,9 +34,10 @@ def main(args: dict) -> None: output_dir = args["output_dir"] r = sr.Recognizer() - with sr.Microphone(device_index=13, sample_rate=16000) as source: + r.pause_threshold = 2 + with sr.Microphone(device_index=9, sample_rate=16000) as source: print("Say something!") - audio = r.listen(source, timeout=5, phrase_time_limit=5) + audio = r.listen(source, timeout=5, phrase_time_limit=10) print("Finished listening") with open(os.path.join(output_dir, "microphone.raw"), "wb") as f: diff --git a/common/vision/lasr_vision_cropped_detection/src/lasr_vision_cropped_detection/cropped_detection.py b/common/vision/lasr_vision_cropped_detection/src/lasr_vision_cropped_detection/cropped_detection.py index e9360cc57..ad3c09f46 100644 --- a/common/vision/lasr_vision_cropped_detection/src/lasr_vision_cropped_detection/cropped_detection.py +++ b/common/vision/lasr_vision_cropped_detection/src/lasr_vision_cropped_detection/cropped_detection.py @@ -171,10 +171,10 @@ def _3d_bbox_crop( ) for det in detections ] + if crop_method == "closest": detections = [det for _, det in sorted(zip(distances, detections))] distances.sort() - elif crop_method == "furthest": detections = [ det for _, det in sorted(zip(distances, detections), reverse=True) diff --git a/common/vision/lasr_vision_feature_extraction/nodes/service b/common/vision/lasr_vision_feature_extraction/nodes/service index ede18a38d..7a70f7868 100644 --- a/common/vision/lasr_vision_feature_extraction/nodes/service +++ b/common/vision/lasr_vision_feature_extraction/nodes/service @@ -4,8 +4,8 @@ from lasr_vision_msgs.srv import ( TorchFaceFeatureDetectionDescriptionResponse, ) from lasr_vision_feature_extraction.categories_and_attributes import ( - CategoriesAndAttributes, - CelebAMaskHQCategoriesAndAttributes, + # CategoriesAndAttributes, + # CelebAMaskHQCategoriesAndAttributes, DeepFashion2GeneralizedCategoriesAndAttributes, ) diff --git a/common/vision/lasr_vision_feature_extraction/src/lasr_vision_feature_extraction/__init__.py b/common/vision/lasr_vision_feature_extraction/src/lasr_vision_feature_extraction/__init__.py index 65602b818..afe15fd11 100644 --- a/common/vision/lasr_vision_feature_extraction/src/lasr_vision_feature_extraction/__init__.py +++ b/common/vision/lasr_vision_feature_extraction/src/lasr_vision_feature_extraction/__init__.py @@ -1,6 +1,5 @@ import json from os import path - import cv2 import numpy as np import rospkg @@ -11,12 +10,10 @@ import torchvision.models as models from lasr_vision_feature_extraction.categories_and_attributes import ( CategoriesAndAttributes, - CelebAMaskHQCategoriesAndAttributes, DeepFashion2GeneralizedCategoriesAndAttributes, ) from lasr_vision_feature_extraction.image_with_masks_and_attributes import ( ImageWithMasksAndAttributes, - ImageOfPerson, ImageOfCloth, ) from lasr_vision_msgs.srv import Vqa, VqaRequest @@ -55,129 +52,6 @@ def forward(self, x_copy, x): return x -class UNetWithResnetEncoder(nn.Module): - def __init__(self, num_classes, in_channels=3, freeze_bn=False, sigmoid=True): - super(UNetWithResnetEncoder, self).__init__() - self.sigmoid = sigmoid - self.resnet = models.resnet34( - pretrained=False - ) # Initialize with a ResNet model - if in_channels != 3: - self.resnet.conv1 = nn.Conv2d( - in_channels, 64, kernel_size=7, stride=2, padding=3, bias=False - ) - - self.encoder1 = nn.Sequential( - self.resnet.conv1, self.resnet.bn1, self.resnet.relu - ) - self.encoder2 = self.resnet.layer1 - self.encoder3 = self.resnet.layer2 - self.encoder4 = self.resnet.layer3 - self.encoder5 = self.resnet.layer4 - - self.up1 = Decoder(512, 256, 256) - self.up2 = Decoder(256, 128, 128) - self.up3 = Decoder(128, 64, 64) - self.up4 = Decoder(64, 64, 64) - - self.final_conv = nn.Conv2d(64, num_classes, kernel_size=1) - self._initialize_weights() - - if freeze_bn: - self.freeze_bn() - - def _initialize_weights(self): - for module in self.modules(): - if isinstance(module, nn.Conv2d) or isinstance(module, nn.ConvTranspose2d): - nn.init.kaiming_normal_(module.weight) - if module.bias is not None: - module.bias.data.zero_() - elif isinstance(module, nn.BatchNorm2d): - module.weight.data.fill_(1) - module.bias.data.zero_() - - def forward(self, x): - x1 = self.encoder1(x) - x2 = self.encoder2(x1) - x3 = self.encoder3(x2) - x4 = self.encoder4(x3) - x5 = self.encoder5(x4) - - x = self.up1(x4, x5) - x = self.up2(x3, x) - x = self.up3(x2, x) - x = self.up4(x1, x) - x = F.interpolate( - x, size=(x.size(2) * 2, x.size(3) * 2), mode="bilinear", align_corners=True - ) - - x = self.final_conv(x) - - if self.sigmoid: - x = torch.sigmoid(x) - return x - - def freeze_bn(self): - for module in self.modules(): - if isinstance(module, nn.BatchNorm2d): - module.eval() - - def unfreeze_bn(self): - for module in self.modules(): - if isinstance(module, nn.BatchNorm2d): - module.train() - - -class MultiLabelResNet(nn.Module): - def __init__(self, num_labels, input_channels=3, sigmoid=True): - super(MultiLabelResNet, self).__init__() - self.model = models.resnet34(pretrained=False) - self.sigmoid = sigmoid - - if input_channels != 3: - self.model.conv1 = nn.Conv2d( - input_channels, 64, kernel_size=7, stride=2, padding=3, bias=False - ) - - num_ftrs = self.model.fc.in_features - - self.model.fc = nn.Linear(num_ftrs, num_labels) - - def forward(self, x): - x = self.model(x) - if self.sigmoid: - x = torch.sigmoid(x) - return x - - -class CombinedModel(nn.Module): - def __init__( - self, segment_model: nn.Module, predict_model: nn.Module, cat_layers: int = None - ): - super(CombinedModel, self).__init__() - self.segment_model = segment_model - self.predict_model = predict_model - self.cat_layers = cat_layers - self.freeze_seg = False - - def forward(self, x: torch.Tensor): - seg_masks = self.segment_model(x) - seg_masks_ = seg_masks.detach() - if self.cat_layers: - seg_masks_ = seg_masks_[:, 0 : self.cat_layers] - x = torch.cat((x, seg_masks_), dim=1) - else: - x = torch.cat((x, seg_masks_), dim=1) - logic_outputs = self.predict_model(x) - return seg_masks, logic_outputs - - def freeze_segment_model(self): - self.segment_model.eval() - - def unfreeze_segment_model(self): - self.segment_model.train() - - class SegmentPredictor(nn.Module): def __init__(self, num_masks, num_labels, in_channels=3, sigmoid=True): super(SegmentPredictor, self).__init__() @@ -445,32 +319,6 @@ def predict(self, rgb_image: np.ndarray) -> ImageWithMasksAndAttributes: return image_obj -def load_face_classifier_model(): - cat_layers = CelebAMaskHQCategoriesAndAttributes.merged_categories.keys().__len__() - segment_model = UNetWithResnetEncoder(num_classes=cat_layers) - predictions = ( - len(CelebAMaskHQCategoriesAndAttributes.attributes) - - len(CelebAMaskHQCategoriesAndAttributes.avoided_attributes) - + len(CelebAMaskHQCategoriesAndAttributes.mask_labels) - ) - predict_model = MultiLabelResNet( - num_labels=predictions, input_channels=cat_layers + 3 - ) - model = CombinedModel(segment_model, predict_model, cat_layers=cat_layers) - model.eval() - - r = rospkg.RosPack() - model, _, _, _ = load_torch_model( - model, - None, - path=path.join( - r.get_path("lasr_vision_feature_extraction"), "models", "face_model.pth" - ), - cpu_only=True, - ) - return model - - def load_cloth_classifier_model(): num_classes = len(DeepFashion2GeneralizedCategoriesAndAttributes.attributes) model = SegmentPredictorBbox( @@ -597,7 +445,12 @@ def predict_frame( rst_person["hair_shape"] = "long hair" result = { - **rst_person, + "has_hair": 0.0, + "hair_colour": "Not used.", + "hair_shape": "Not used.", + "facial_hair": 0.0, + "glasses": 0.0, + "hat": 0.0, **rst_cloth, } diff --git a/common/vision/lasr_vision_feature_extraction/src/lasr_vision_feature_extraction/categories_and_attributes.py b/common/vision/lasr_vision_feature_extraction/src/lasr_vision_feature_extraction/categories_and_attributes.py index 87980c3e2..2b3663915 100644 --- a/common/vision/lasr_vision_feature_extraction/src/lasr_vision_feature_extraction/categories_and_attributes.py +++ b/common/vision/lasr_vision_feature_extraction/src/lasr_vision_feature_extraction/categories_and_attributes.py @@ -10,166 +10,6 @@ class CategoriesAndAttributes: thresholds_pred: dict[str, float] = {} -class CelebAMaskHQCategoriesAndAttributes(CategoriesAndAttributes): - mask_categories = [ - "cloth", - "r_ear", - "hair", - "l_brow", - "l_eye", - "l_lip", - "mouth", - "neck", - "nose", - "r_brow", - "r_ear", - "r_eye", - "skin", - "u_lip", - "hat", - "l_ear", - "neck_l", - "eye_g", - ] - merged_categories = { - "ear": [ - "l_ear", - "r_ear", - ], - "brow": [ - "l_brow", - "r_brow", - ], - "eye": [ - "l_eye", - "r_eye", - ], - "mouth": [ - "l_lip", - "u_lip", - "mouth", - ], - } - _categories_to_merge = [] - for key in sorted(list(merged_categories.keys())): - for cat in merged_categories[key]: - _categories_to_merge.append(cat) - for key in mask_categories: - if key not in _categories_to_merge: - merged_categories[key] = [key] - mask_labels = ["hair"] - selective_attributes = { - "facial_hair": [ - "5_o_Clock_Shadow", - "Goatee", - "Mustache", - "No_Beard", - "Sideburns", - ], - "hair_colour": [ - "Black_Hair", - "Blond_Hair", - "Brown_Hair", - "Gray_Hair", - ], - "hair_shape": [ - "Straight_Hair", - "Wavy_Hair", - ], - } - plane_attributes = [ - "Bangs", - "Eyeglasses", - "Wearing_Earrings", - "Wearing_Hat", - "Wearing_Necklace", - "Wearing_Necktie", - "Male", - ] - avoided_attributes = [ - "Arched_Eyebrows", - "Bags_Under_Eyes", - "Big_Lips", - "Big_Nose", - "Bushy_Eyebrows", - "Chubby", - "Double_Chin", - "High_Cheekbones", - "Narrow_Eyes", - "Oval_Face", - "Pointy_Nose", - "Receding_Hairline", - "Rosy_Cheeks", - "Heavy_Makeup", - "Wearing_Lipstick", - "Attractive", - "Blurry", - "Mouth_Slightly_Open", - "Pale_Skin", - "Smiling", - "Young", - ] - attributes = [ - "5_o_Clock_Shadow", - "Arched_Eyebrows", - "Attractive", - "Bags_Under_Eyes", - "Bald", - "Bangs", - "Big_Lips", - "Big_Nose", - "Black_Hair", - "Blond_Hair", - "Blurry", - "Brown_Hair", - "Bushy_Eyebrows", - "Chubby", - "Double_Chin", - "Eyeglasses", - "Goatee", - "Gray_Hair", - "Heavy_Makeup", - "High_Cheekbones", - "Male", - "Mouth_Slightly_Open", - "Mustache", - "Narrow_Eyes", - "No_Beard", - "Oval_Face", - "Pale_Skin", - "Pointy_Nose", - "Receding_Hairline", - "Rosy_Cheeks", - "Sideburns", - "Smiling", - "Straight_Hair", - "Wavy_Hair", - "Wearing_Earrings", - "Wearing_Hat", - "Wearing_Lipstick", - "Wearing_Necklace", - "Wearing_Necktie", - "Young", - ] - - thresholds_mask: dict[str, float] = {} - thresholds_pred: dict[str, float] = {} - - # set default thresholds: - for key in sorted(merged_categories.keys()): - thresholds_mask[key] = 0.5 - for key in attributes + mask_labels: - if key not in avoided_attributes: - thresholds_pred[key] = 0.5 - - # set specific thresholds: - thresholds_mask["eye_g"] = 0.5 - thresholds_pred["Eyeglasses"] = 0.5 - thresholds_pred["Wearing_Earrings"] = 0.5 - thresholds_pred["Wearing_Necklace"] = 0.5 - thresholds_pred["Wearing_Necktie"] = 0.5 - - class DeepFashion2GeneralizedCategoriesAndAttributes(CategoriesAndAttributes): mask_categories = [ "short sleeve top", diff --git a/common/vision/lasr_vision_feature_extraction/src/lasr_vision_feature_extraction/image_with_masks_and_attributes.py b/common/vision/lasr_vision_feature_extraction/src/lasr_vision_feature_extraction/image_with_masks_and_attributes.py index 235ba7a4c..acac6d7a1 100644 --- a/common/vision/lasr_vision_feature_extraction/src/lasr_vision_feature_extraction/image_with_masks_and_attributes.py +++ b/common/vision/lasr_vision_feature_extraction/src/lasr_vision_feature_extraction/image_with_masks_and_attributes.py @@ -3,6 +3,120 @@ CategoriesAndAttributes, ) +reference_colours = { + "blue_very_light": np.array([240, 248, 255]), # Alice blue + "blue_light": np.array([173, 216, 230]), # Light blue + "blue_sky": np.array([135, 206, 235]), # Sky blue + "blue_powder": np.array([176, 224, 230]), # Powder blue + "blue_celeste": np.array([178, 255, 255]), # Celeste, very pale blue shade + "blue_periwinkle": np.array( + [204, 204, 255] + ), # Periwinkle, a mix of light blue and lavender + "blue_cadet": np.array([95, 158, 160]), # Cadet blue, a muted blue-green + "blue": np.array([0, 0, 255]), # Standard blue + "blue_royal": np.array([65, 105, 225]), # Royal blue + "blue_deep": np.array([0, 0, 139]), # Deep blue + "blue_dark": np.array([0, 0, 128]), # Dark blue + # "blue_navy": np.array([0, 0, 80]), # Navy blue + "yellow_very_light": np.array([255, 255, 204]), # Very light yellow + "yellow_light": np.array([255, 255, 224]), # Light yellow + "yellow": np.array([255, 255, 0]), # Standard yellow + "yellow_gold": np.array([255, 215, 0]), # Gold yellow + "yellow_dark": np.array([204, 204, 0]), # Dark yellow + "yellow_mustard": np.array([255, 219, 88]), # Mustard yellow + "red_very_light": np.array([255, 204, 204]), # Very light red + "red_light": np.array([255, 102, 102]), # Light red + "red": np.array([255, 0, 0]), # Standard red + "red_dark": np.array([139, 0, 0]), # Dark red + "red_maroon": np.array([128, 0, 0]), # Maroon + "orange_very_light": np.array([255, 229, 180]), # Very light orange + "orange_light": np.array([255, 179, 71]), # Light orange + "orange": np.array([255, 165, 0]), # Standard orange + "orange_dark": np.array([255, 140, 0]), # Dark orange + "orange_burnt": np.array([204, 85, 0]), # Burnt orange + "black": np.array([30, 30, 30]), # Black + "white": np.array([255, 255, 255]), # White + "gray": np.array([160, 160, 160]), # Gray +} + +colour_group_map = { + "blue_very_light": "blue", + "blue_light": "blue", + "blue_sky": "blue", + "blue_powder": "blue", + "blue_celeste": "blue", + "blue_periwinkle": "blue", + "blue_cadet": "blue", + "blue": "blue", + "blue_royal": "blue", + "blue_deep": "blue", + "blue_dark": "blue", + "yellow_very_light": "yellow", + "yellow_light": "yellow", + "yellow": "yellow", + "yellow_gold": "yellow", + "yellow_dark": "yellow", + "yellow_mustard": "yellow", + "red_very_light": "red", + "red_light": "red", + "red": "red", + "red_dark": "red", + "red_maroon": "red", + "orange_very_light": "orange", + "orange_light": "orange", + "orange": "orange", + "orange_dark": "orange", + "orange_burnt": "orange", + "black": "black", + "white": "white", + "gray": "gray", +} + +possible_colours = ["blue", "yellow", "red", "orange", "black", "white", "gray"] + + +def estimate_colour(rgb_array): + # Calculate distances to each reference colour + # distances = {colour: cie76_distance(rgb_array, ref_rgb) for colour, ref_rgb in reference_colours.items()} + distances = { + colour: np.linalg.norm(rgb_array - ref_rgb) + for colour, ref_rgb in reference_colours.items() + } + + # Find the colour with the smallest distance + estimated_colour = min(distances, key=distances.get) + + return estimated_colour + + +def split_and_sample_colours(image, mask, square_size): + height, width, _ = image.shape + squares_colours = {} + valid_squares = set() + + square_index = 0 + + for y in range(0, height, square_size): + for x in range(0, width, square_size): + square = image[y : y + square_size, x : x + square_size] + mask_square = mask[y : y + square_size, x : x + square_size] + + # Calculate the average colour + average_colour = square.mean(axis=(0, 1)) + + # Save the average colour in the dictionary + squares_colours[square_index] = estimate_colour(average_colour) + # squares_colours[square_index] = average_colour # estimate_colour(average_colour) + + # Check the mask condition + a = np.sum(mask_square) + if np.sum(mask_square) > 0.5 * square_size * square_size: + valid_squares.add(square_index) + + square_index += 1 + + return squares_colours, valid_squares + def _softmax(x: list[float]) -> list[float]: """Compute softmax values for each set of scores in x without using NumPy.""" @@ -60,56 +174,6 @@ def describe(self) -> str: pass -def _max_value_tuple(some_dict: dict[str, float]) -> tuple[str, float]: - max_key = max(some_dict, key=some_dict.get) - return max_key, some_dict[max_key] - - -class ImageOfPerson(ImageWithMasksAndAttributes): - def __init__( - self, - image: np.ndarray, - masks: dict[str, np.ndarray], - attributes: dict[str, float], - categories_and_attributes: CategoriesAndAttributes, - ): - super().__init__(image, masks, attributes, categories_and_attributes) - - @classmethod - def from_parent_instance( - cls, parent_instance: ImageWithMasksAndAttributes - ) -> "ImageOfPerson": - """ - Creates an instance of ImageOfPerson using the properties of an - instance of ImageWithMasksAndAttributes. - """ - return cls( - image=parent_instance.image, - masks=parent_instance.masks, - attributes=parent_instance.attributes, - categories_and_attributes=parent_instance.categories_and_attributes, - ) - - def describe(self) -> dict: - has_hair = self.attributes["hair"] - 0.5 - hair_colour = _max_value_tuple(self.selective_attribute_dict["hair_colour"])[0] - hair_shape = _max_value_tuple(self.selective_attribute_dict["hair_shape"])[0] - facial_hair = 1 - self.attributes["No_Beard"] - 0.5 - glasses = self.attributes["Eyeglasses"] - 0.5 - hat = self.attributes["Wearing_Hat"] - 0.5 - - result = { - "has_hair": has_hair, - "hair_colour": hair_colour, - "hair_shape": hair_shape, - "facial_hair": facial_hair, - "glasses": glasses, - "hat": hat, - } - - return result - - class ImageOfCloth(ImageWithMasksAndAttributes): def __init__( self, @@ -136,6 +200,8 @@ def from_parent_instance( ) def describe(self) -> dict: + # Maybe not keep ‘sleeveless’ anymore but might do this in the actual environment. + result = { "top": self.attributes["top"] / 2, "outwear": self.attributes["outwear"] / 2, @@ -178,4 +244,38 @@ def describe(self) -> dict: max_attribute = "sleeveless dress" result["max_dress"] = max_attribute + # QUICK FIX that won't break the code but not returning dress anymore. + result["dress"] = 0.0 + + # ========= colour estimation below: ========= + # blurred_imagge kept here so that we can quickly make it work if we need. + # blurred_image = gaussian_blur(self.image, kernel_size=13, rep=3) + blurred_image = self.image + for cloth in ["top", "outwear", "dress"]: # "down", + mask = self.masks[cloth] + squares_colours, valid_squares = split_and_sample_colours( + blurred_image, mask, 20 + ) + _squares_colours = {} + for k in squares_colours.keys(): + if k in valid_squares: + _squares_colours[k] = squares_colours[k] + squares_colours = { + k: colour_group_map[colour] for k, colour in _squares_colours.items() + } + squares_colours_count = {} + for k, colour in squares_colours.items(): + if colour not in squares_colours_count.keys(): + squares_colours_count[colour] = 1 + else: + squares_colours_count[colour] += 1 + print(squares_colours_count) + tag = cloth + "_colour" + result[tag] = {} + for k in possible_colours: + if k in squares_colours_count.keys(): + result[tag][k] = squares_colours_count[k] / len(squares_colours) + else: + result[tag][k] = 0.0 + return result diff --git a/common/vision/lasr_vision_msgs/srv/Vqa.srv b/common/vision/lasr_vision_msgs/srv/Vqa.srv index f1894242b..5675240f7 100644 --- a/common/vision/lasr_vision_msgs/srv/Vqa.srv +++ b/common/vision/lasr_vision_msgs/srv/Vqa.srv @@ -1,3 +1,4 @@ +sensor_msgs/Image image string[] possible_answers sensor_msgs/Image image_raw diff --git a/skills/config/motions.yaml b/skills/config/motions.yaml index 9409d156b..8af6ac21f 100644 --- a/skills/config/motions.yaml +++ b/skills/config/motions.yaml @@ -84,33 +84,33 @@ play_motion: u3l: joints: [torso_lift_joint, head_1_joint, head_2_joint] points: - - positions: [0.4, 0.35, 0.15] + - positions: [0.4, 0.35, 0.1] time_from_start: 0.0 u3m: joints: [torso_lift_joint, head_1_joint, head_2_joint] points: - - positions: [0.4, 0.0, 0.15] + - positions: [0.4, 0.0, 0.1] time_from_start: 0.0 u3r: joints: [torso_lift_joint, head_1_joint, head_2_joint] points: - - positions: [0.4, -0.35, 0.15] + - positions: [0.4, -0.35, 0.1] time_from_start: 0.0 u2l: joints: [torso_lift_joint, head_1_joint, head_2_joint] points: - - positions: [0.4, 0.35, 0.05] + - positions: [0.35, 0.35, 0.05] time_from_start: 0.0 u2m: joints: [torso_lift_joint, head_1_joint, head_2_joint] points: - - positions: [0.4, 0.0, 0.05] + - positions: [0.35, 0.0, 0.05] time_from_start: 0.0 u2r: joints: [torso_lift_joint, head_1_joint, head_2_joint] points: - - positions: [0.4, -0.35, 0.05] + - positions: [0.35, -0.35, 0.05] time_from_start: 0.0 u1l: joints: [torso_lift_joint, head_1_joint, head_2_joint] diff --git a/skills/launch/unit_test_describe_people.launch b/skills/launch/unit_test_describe_people.launch index f1b76530e..29427f35b 100644 --- a/skills/launch/unit_test_describe_people.launch +++ b/skills/launch/unit_test_describe_people.launch @@ -3,9 +3,10 @@ - + + diff --git a/skills/package.xml b/skills/package.xml index 053a2c85b..834ad7fc2 100644 --- a/skills/package.xml +++ b/skills/package.xml @@ -58,6 +58,6 @@ - + diff --git a/skills/scripts/unit_test_adjust_camera.py b/skills/scripts/unit_test_adjust_camera.py index 51fe74251..3fcd68d91 100644 --- a/skills/scripts/unit_test_adjust_camera.py +++ b/skills/scripts/unit_test_adjust_camera.py @@ -11,7 +11,11 @@ with sm: sm.add( "AdjustCamera", - AdjustCamera(debug=True), + AdjustCamera( + max_attempts=3, + debug=False, + init_state="u1m", + ), transitions={"finished": "end", "failed": "end", "truncated": "end"}, ) sm.execute() diff --git a/skills/src/lasr_skills/__init__.py b/skills/src/lasr_skills/__init__.py index ce4f199ce..56f0619c3 100755 --- a/skills/src/lasr_skills/__init__.py +++ b/skills/src/lasr_skills/__init__.py @@ -12,8 +12,6 @@ from .listen import Listen from .listen_for import ListenFor from .ask_and_listen import AskAndListen -from .find_person import FindPerson -from .find_named_person import FindNamedPerson from .receive_object import ReceiveObject from .handover_object import HandoverObject from .ask_and_listen import AskAndListen @@ -22,5 +20,11 @@ from .recognise import Recognise from .detect_gesture import DetectGesture from .look_at_person import LookAtPerson -from .find_gesture_person import FindGesturePerson +# from .find_gesture_person import FindGesturePerson from .adjust_camera import AdjustCamera +from .guide import Guide +from .detect_clothing import DetectClothing +from .detect_pose import DetectPose +from .find_person import FindPerson +from .xml_question_answer import XmlQuestionAnswer +from .find_person_and_tell import FindPersonAndTell diff --git a/skills/src/lasr_skills/adjust_camera.py b/skills/src/lasr_skills/adjust_camera.py index c885398ae..46057e4ff 100644 --- a/skills/src/lasr_skills/adjust_camera.py +++ b/skills/src/lasr_skills/adjust_camera.py @@ -9,6 +9,7 @@ import rospkg import rosparam import os +import math LEFT = { @@ -77,14 +78,17 @@ (0, 1): "mr", } +inverse_position_dict = {value: key for key, value in position_dict.items()} + class AdjustCamera(smach.StateMachine): def __init__( self, bodypix_model: str = "resnet50", bodypix_confidence: float = 0.7, - max_attempts=1000, + max_attempts=5, debug=False, + init_state="u1m", ): smach.StateMachine.__init__( self, @@ -105,8 +109,8 @@ def __init__( with self: smach.StateMachine.add( - "init_u2m", - PlayMotion(motion_name="u2m"), + "init", + PlayMotion(motion_name=init_state), transitions={ "succeeded": "GET_IMAGE", "aborted": "GET_IMAGE", @@ -122,7 +126,7 @@ def __init__( else: _transitions = { "succeeded": "DECIDE_ADJUST_CAMERA", - "failed": "failed", + "failed": "GET_IMAGE", } smach.StateMachine.add( "GET_IMAGE", @@ -131,10 +135,7 @@ def __init__( method="closest", use_mask=True, ), - transitions={ - "succeeded": "DECIDE_ADJUST_CAMERA", - "failed": "failed", - }, + transitions=_transitions, ) if debug: @@ -158,6 +159,7 @@ def __init__( bodypix_model=bodypix_model, bodypix_confidence=bodypix_confidence, max_attempts=max_attempts, + init_state=init_state, ), transitions=_transitions, ) @@ -176,10 +178,10 @@ def __init__( class DecideAdjustCamera(smach.State): def __init__( self, - # keypoints_to_detect: List[str] = ALL_KEYS, bodypix_model: str = "resnet50", bodypix_confidence: float = 0.7, - max_attempts=1000, + max_attempts=5, + init_state="u1m", ): smach.State.__init__( self, @@ -195,18 +197,17 @@ def __init__( output_keys=[], ) self.max_attempts = max_attempts - # self._keypoints_to_detect = keypoints_to_detect self._bodypix_model = bodypix_model self._bodypix_confidence = bodypix_confidence self._bodypix_client = rospy.ServiceProxy( "/bodypix/keypoint_detection", BodyPixKeypointDetection ) - self.position = [2, 0] + self.position = [i for i in inverse_position_dict[init_state]] self.counter = 0 def execute(self, userdata): - + rospy.logwarn(f"Start attempt number {self.counter}.") req = BodyPixKeypointDetectionRequest() req.image_raw = userdata.img_msg req.dataset = self._bodypix_model @@ -243,10 +244,15 @@ def execute(self, userdata): rospy.logwarn( f"missing shoulders: {missing_keypoints.intersection(MIDDLE)}, missing eyes: {missing_keypoints.intersection(HEAD)}" ) - # has_torso = len(missing_keypoints.intersection(TORSO)) <= 1 if not has_more_than_one_shoulder and not has_more_than_one_one_eye: - # 'Try recovery behaviour or give up, need a bit polish + # This is the case that not any centre points can be used, + # In this case most likely it is the guest standing either too close or not in the camera at all. + # However we may still try to get this person back into the frame if some part of them are detected. + # Otherwise we say something like "Please stand in front of me but keep a bit distance.". + rospy.logwarn( + "The person might not actually be in the frame, trying to recover." + ) miss_head = len(missing_keypoints.intersection(HEAD)) >= 2 miss_middle = len(missing_keypoints.intersection(MIDDLE)) >= 2 miss_torso = len(missing_keypoints.intersection(TORSO)) >= 4 @@ -263,14 +269,7 @@ def execute(self, userdata): f"Needs to move up: {needs_to_move_up}, down: {needs_to_move_down}, left: {needs_to_move_left}, right: {needs_to_move_right}." ) - # if counter > maxmum, check if head is in, if not, move up to get head, otherwise return finished. - if self.counter > self.max_attempts: - if not miss_head or self.counter > self.max_attempts + 2: - return "truncated" - - # self.counter += 1 if not (needs_to_move_left and needs_to_move_right): - # return "failed" if needs_to_move_left: self.position = ( self.position[0], @@ -280,8 +279,7 @@ def execute(self, userdata): else self.position[1] ), ) - return position_dict[self.position] - if needs_to_move_right: + elif needs_to_move_right: self.position = ( self.position[0], ( @@ -290,32 +288,28 @@ def execute(self, userdata): else self.position[1] ), ) - return position_dict[self.position] - if needs_to_move_up and needs_to_move_down: - return "failed" - if needs_to_move_up: - self.position = ( - ( - self.position[0] + 1 - if self.position[0] < 3 - else self.position[0] - ), - self.position[1], - ) - return position_dict[userdata.position] - if needs_to_move_down: - self.position = ( - ( - self.position[0] - 1 - if self.position[0] > 0 - else self.position[0] - ), - self.position[1], - ) - return position_dict[userdata.position] - return "finished" + if not needs_to_move_up and needs_to_move_down: + if needs_to_move_up: + self.position = ( + ( + self.position[0] + 1 + if self.position[0] < 3 + else self.position[0] + ), + self.position[1], + ) + elif needs_to_move_down: + self.position = ( + ( + self.position[0] - 1 + if self.position[0] > 0 + else self.position[0] + ), + self.position[1], + ) + elif has_both_eyes and not has_both_shoulders: - # in this case try to make eyes into the upper 1/3 of the frame, + # in this case try to make eyes into the upper 1/5 of the frame, eyes_middle = ( (keypoint_info["leftEye"][0] + keypoint_info["rightEye"][0]) / 2, (keypoint_info["leftEye"][1] + keypoint_info["rightEye"][1]) / 2, @@ -329,13 +323,14 @@ def execute(self, userdata): # if y at upper 1/3: wonder why no shoulders but never mind in this case else: pass - # if x at left 1/3 or left shoulder dissappear, move left 1 step - if eyes_middle[0] <= 1 / 3: + # if x at left 2/7 or left shoulder dissappear, move left 1 step + if eyes_middle[0] <= 2 / 7: self.position[1] -= 1 - # if x at right 1/3 or right shoulder dissappear, move right 1 step - elif eyes_middle[0] >= 2 / 3: + # if x at right 2/7 or right shoulder dissappear, move right 1 step + elif eyes_middle[0] >= 5 / 7: self.position[1] += 1 pass + elif not has_both_eyes and has_both_shoulders: shoulders_middle = ( ( @@ -351,13 +346,14 @@ def execute(self, userdata): # if y at upper 1/4: up move 1 step elif shoulders_middle[1] <= 1 / 4: self.position[0] += 1 - # if x at left 1/3, move left 1 step - if shoulders_middle[0] <= 1 / 3: + # if x at left 2/7, move left 1 step + if shoulders_middle[0] <= 2 / 7: self.position[1] -= 1 - # if x at right 1/3, move right 1 step - elif shoulders_middle[0] >= 2 / 3: + # if x at right 2/7, move right 1 step + elif shoulders_middle[0] >= 5 / 7: self.position[1] += 1 pass + elif has_both_eyes and has_both_shoulders: eyes_middle = ( (keypoint_info["leftEye"][0] + keypoint_info["rightEye"][0]) / 2, @@ -376,63 +372,71 @@ def execute(self, userdata): (eyes_middle[1] + shoulders_middle[1]) / 2, ) rospy.logwarn(f"very middle {very_middle}") - # if y at upper 1/5 for eyes: move up 1 step - if eyes_middle[1] <= 1 / 5: - self.position[0] += 1 - print("if y at upper 1/5 for eyes: move up 1 step") + eyes_to_shoulders_distance = math.sqrt( + (eyes_middle[0] - shoulders_middle[0]) ** 2 + + (eyes_middle[1] - shoulders_middle[1]) + ) + rospy.logwarn(f"eyes to shoulder distance {eyes_to_shoulders_distance}") + if eyes_to_shoulders_distance > 0.14: + # person is kind of close to the camera, + if eyes_to_shoulders_distance >= 0.24: + # person if very close to the camera. + # ACTUALLY, maybe should just tell person to stand back? + # if y at upper 1/5 for eyes: move up 1 step + if eyes_middle[1] <= 1 / 5: + self.position[0] += 1 + print("if y at upper 1/4 for eyes: move up 1 step") + else: + # if y at down 1/5: down move 1 step + if very_middle[1] >= 4 / 5: + self.position[0] -= 1 + print("if y at down 1/5: down move 1 step.") + else: + # if y at upper 1/7 for eyes: move up 1 step + if eyes_middle[1] <= 1 / 7: + self.position[0] += 1 + print("if y at upper 1/7 for eyes: move up 1 step") + elif eyes_middle[1] >= 2 / 9: + self.position[0] -= 1 + print( + "if y lower than upper 2/9 for eyes: move down 1 step" + ) + else: + if ( + 1 / 4 <= very_middle[1] <= 1 / 2 + and 2 / 7 <= very_middle[0] <= 5 / 7 + ): + print("finished.") + return "finished" + # if y at down 3/7: down move 1 step + if very_middle[1] >= 4 / 7: + self.position[0] -= 1 + print("if y at down 3/7: down move 1 step.") + # if y at upper 1/4: up move 1 step + elif very_middle[1] <= 1 / 4: + self.position[0] += 1 + print("if y at upper 1/4: up move 1 step.") else: - if ( - 1 / 4 <= very_middle[1] <= 2 / 3 - and 1 / 3 <= very_middle[0] <= 2 / 3 - ): - print("finished.") - return "finished" - # if y at down 1/3: down move 1 step - if very_middle[1] >= 2 / 3: + # person is kind of far from the camera. + # in this case simply try to the middle-point of the shoulder to the centre up + if shoulders_middle[1] >= 1 / 2: self.position[0] -= 1 - print("if y at down 1/3: down move 1 step.") - # if y at upper 1/4: up move 1 step - elif very_middle[1] <= 1 / 4: + elif shoulders_middle[1] <= 1 / 2 - 2 / 7: self.position[0] += 1 - print("if y at upper 1/3: up move 1 step.") - # if x at left 1/3, move left 1 step - if very_middle[0] <= 1 / 3: + elif 2 / 7 <= very_middle[0] <= 5 / 7: + print("finished.") + return "finished" + # if x at left 2/7, move left 1 step + if very_middle[0] <= 2 / 7: self.position[1] -= 1 - print("if x at left 1/3, move left 1 step.") - # if x at right 1/3, move right 1 step - elif very_middle[0] >= 2 / 3: + print("if x at left 2/7, move left 1 step.") + # if x at right 2/7, move right 1 step + elif very_middle[0] >= 5 / 7: self.position[1] += 1 - print("if x at right 1/3, move right 1 step.") - pass - elif has_more_than_one_shoulder: # but not both - # shoulders_middle = ((keypoint_info["leftShoulder"][0] + keypoint_info["rightShoulder"][0]) / 2, (keypoint_info["leftEye"][1] + keypoint_info["rightEye"][1]) / 2) - # # move one step opposite left or right - # # if x at left 1/3, move left 1 step - # if shoulders_middle[0] <= 1/3: - # position[1] -= 1 - # # if x at right 1/3, move right 1 step - # elif shoulders_middle[0] >= 2/3: - # position[1] += 1 - # pass - # if not has_more_than_one_one_eye: - # # move up! - # position[0] += 1 - # pass - pass - else: # has_more_than_one_one_eye: - # eyes_middle = ((keypoint_info["leftEye"][0] + keypoint_info["rightEye"][0]) / 2, (keypoint_info["leftEye"][1] + keypoint_info["rightEye"][1]) / 2) - # # move one step opposite, - # # if x at left 1/3, move left 1 step - # if eyes_middle[0] <= 1/3: - # position[1] += 1 - # # if x at right 1/3, move right 1 step - # elif eyes_middle[0] >= 2/3: - # position[1] -= 1 - # # probably move down - # position[0] -= 1 - # pass + print("if x at right 2/7, move right 1 step.") pass + # keep the position in the range. if self.position[0] < 0: self.position[0] = 0 elif self.position[0] > 3: @@ -442,4 +446,9 @@ def execute(self, userdata): elif self.position[1] > 1: self.position[1] = 1 + # if counter > maxmum. + if self.counter > self.max_attempts: + return "truncated" + self.counter += 1 + return position_dict[(self.position[0], self.position[1])] diff --git a/skills/src/lasr_skills/clip_vqa.py b/skills/src/lasr_skills/clip_vqa.py index ad95ad302..b0722f99a 100755 --- a/skills/src/lasr_skills/clip_vqa.py +++ b/skills/src/lasr_skills/clip_vqa.py @@ -1,3 +1,4 @@ +import smach import smach_ros from lasr_vision_msgs.srv import Vqa, VqaRequest @@ -6,17 +7,24 @@ class QueryImage(smach_ros.ServiceState): def __init__(self, possible_answers: Union[None, List[str]] = None): + if possible_answers is not None: - super(QueryImage, self).__init__( - "/clip_vqa/query_service", - Vqa, - request=VqaRequest(possible_answers=possible_answers), - response_slots=["answer", "similarity"], - ) + + @smach.cb_interface(input_keys=["img_msg"]) + def request_cb(userdata, request): + return VqaRequest( + possible_answers=possible_answers, image=userdata.img_msg + ) + else: - super(QueryImage, self).__init__( - "/clip_vqa/query_service", - Vqa, - request_slots=["possible_answers"], - response_slots=["answer", "similarity"], - ) + + @smach.cb_interface(input_keys=["img_msg"]) + def request_cb(userdata, request): + return VqaRequest(image=userdata.img_msg) + + super(QueryImage, self).__init__( + "/clip_vqa/query_service", + Vqa, + request_cb=request_cb, + response_slots=["answer", "similarity"], + ) diff --git a/skills/src/lasr_skills/detect_clothing.py b/skills/src/lasr_skills/detect_clothing.py new file mode 100644 index 000000000..0c599905b --- /dev/null +++ b/skills/src/lasr_skills/detect_clothing.py @@ -0,0 +1,18 @@ +import smach + +from typing import Optional + + +class DetectClothing(smach.State): + + _clothing_to_detect: Optional[str] + + def __init__(self, clothing_to_detect: Optional[str] = None): + smach.State.__init__( + self, outcomes=["succeeded", "failed"], input_keys=["img_msg"] + ) + + self._clothing_to_detect = clothing_to_detect + + def execute(self, userdata): + return "succeeded" diff --git a/skills/src/lasr_skills/detect_gesture.py b/skills/src/lasr_skills/detect_gesture.py index 62e348342..58240690d 100755 --- a/skills/src/lasr_skills/detect_gesture.py +++ b/skills/src/lasr_skills/detect_gesture.py @@ -1,10 +1,7 @@ -#!/usr/bin/env python3 -from typing import Optional, List import smach import rospy import cv2 import cv2_img -from lasr_skills.vision import GetCroppedImage, GetImage from lasr_vision_msgs.srv import ( BodyPixKeypointDetection, BodyPixKeypointDetectionRequest, @@ -16,6 +13,7 @@ import tf2_ros as tf from visualization_msgs.msg import Marker from markers import create_and_publish_marker +from typing import Tuple, Union class DetectGesture(smach.State): @@ -25,13 +23,12 @@ class DetectGesture(smach.State): def __init__( self, - gesture_to_detect: Optional[str] = None, + gesture_to_detect: Union[str, None] = None, bodypix_model: str = "resnet50", bodypix_confidence: float = 0.1, buffer_width: int = 50, debug_publisher: str = "/skills/gesture_detection/debug", ): - """Optionally stores the gesture to detect. If None, it will infer the gesture from the keypoints.""" smach.State.__init__( self, outcomes=["succeeded", "missing_keypoints", "failed"], @@ -71,6 +68,8 @@ def execute(self, userdata): detected_keypoints = res.keypoints + detected_gesture = "none" + keypoint_info = { keypoint.keypoint_name: {"x": keypoint.x, "y": keypoint.y} for keypoint in detected_keypoints @@ -169,43 +168,9 @@ def execute(self, userdata): # Publish the image self.debug_publisher.publish(cv2_img.cv2_img_to_msg(cv2_gesture_img)) - return "succeeded" - - -### For example usage: -class GestureDetectionSM(smach.StateMachine): - """ - State machine for detecting gestures. - """ - - def __init__(self, gesture_to_detect: Optional[str] = None): - smach.StateMachine.__init__(self, outcomes=["succeeded", "failed"]) - self.gesture_to_detect = gesture_to_detect - with self: - smach.StateMachine.add( - "GET_IMAGE", - GetCroppedImage( - "person", "centered", use_mask=True, rgb_topic="/usb_cam/image_raw" - ), - transitions={"succeeded": "BODY_PIX_DETECTION", "failed": "failed"}, - ) - - smach.StateMachine.add( - "BODY_PIX_DETECTION", - DetectGesture(gesture_to_detect=self.gesture_to_detect), - transitions={ - "succeeded": "succeeded", - "failed": "failed", - "missing_keypoints": "failed", - }, + if self.gesture_to_detect is not None: + return ( + "succeeded" if detected_gesture == self.gesture_to_detect else "failed" ) - - -if __name__ == "__main__": - rospy.init_node("gesture_detection_sm") - ### Example usage: - while not rospy.is_shutdown(): - sm = GestureDetectionSM() - sm.execute() - - rospy.spin() + else: + return "succeeded" diff --git a/skills/src/lasr_skills/detect_pose.py b/skills/src/lasr_skills/detect_pose.py new file mode 100644 index 000000000..c5d2c4f10 --- /dev/null +++ b/skills/src/lasr_skills/detect_pose.py @@ -0,0 +1,67 @@ +import smach + +from typing import Optional, List, Dict + +from lasr_skills import QueryImage + + +class DetectPose(smach.StateMachine): + + _pose_dict: Dict[str, str] = { + "a person standing": "standing", + "a person sitting": "sitting", + "a person laying down": "lying_down", + } + + _possible_answers: List[str] = [ + "a person standing", + "a person sitting", + "a person laying down", + ] + _pose_to_detect: Optional[str] + + class HandleResponse(smach.State): + + _pose_dict: Dict[str, str] + _pose_to_detect: Optional[str] + + def __init__( + self, pose_dict: Dict[str, str], pose_to_detect: Optional[str] = None + ): + smach.State.__init__( + self, + outcomes=["succeeded", "failed"], + input_keys=["answer"], + output_keys=[] if pose_to_detect is None else ["detected_pose"], + ) + + self._pose_dict = pose_dict + self._pose_to_detect = pose_to_detect + + def execute(self, userdata): + if self._pose_to_detect is None: + userdata.detected_pose = self._pose_dict[userdata.answer] + return "succeeded" + else: + if self._pose_dict[userdata.answer] == self._pose_to_detect: + return "succeeded" + return "failed" + + def __init__(self, pose_to_detect: Optional[str] = None): + smach.StateMachine.__init__( + self, outcomes=["succeeded", "failed"], input_keys=["img_msg"] + ) + + self._pose_to_detect = pose_to_detect + + with self: + smach.StateMachine.add( + "QUERY_IMAGE", + QueryImage(possible_answers=self._possible_answers), + transitions={"succeeded": "HANDLE_RESPONSE", "failed": "failed"}, + ) + smach.StateMachine.add( + "HANDLE_RESPONSE", + self.HandleResponse(self._pose_dict, self._pose_to_detect), + transitions={"succeeded": "succeeded", "failed": "failed"}, + ) diff --git a/skills/src/lasr_skills/find_gesture_person.py b/skills/src/lasr_skills/find_gesture_person.py deleted file mode 100644 index ae195494a..000000000 --- a/skills/src/lasr_skills/find_gesture_person.py +++ /dev/null @@ -1,221 +0,0 @@ -import smach -import rospy - -from lasr_skills import Detect3D, GoToLocation, DetectGesture, PlayMotion -from lasr_skills.vision import GetImage -import navigation_helpers - -from geometry_msgs.msg import Pose, PoseWithCovarianceStamped, Point, Quaternion - - -from typing import List, Union - - -class FindGesturePerson(smach.StateMachine): - class GetLocation(smach.State): - def __init__(self): - smach.State.__init__( - self, - outcomes=["succeeded", "failed"], - input_keys=["location_index", "waypoints"], - output_keys=["location"], - ) - - def execute(self, userdata): - userdata.location = userdata.waypoints[userdata.location_index] - return "succeeded" - - class GetPose(smach.State): - def __init__(self): - smach.State.__init__( - self, - outcomes=["succeeded", "failed"], - output_keys=["current_pose"], - ) - - def execute(self, userdata): - userdata.current_pose = rospy.wait_for_message( - "/amcl_pose", PoseWithCovarianceStamped - ).pose.pose - return "succeeded" - - class ComputePath(smach.State): - def __init__(self, waypoints: List[Pose]): - smach.State.__init__( - self, - outcomes=["succeeded", "failed"], - input_keys=["current_pose"], - output_keys=["waypoints"], - ) - self._waypoints = waypoints - - def execute(self, userdata): - userdata.waypoints = navigation_helpers.min_hamiltonian_path( - userdata.current_pose, self._waypoints - ) - return "succeeded" - - class HandleDetections(smach.State): - def __init__(self): - smach.State.__init__( - self, - outcomes=["succeeded", "failed"], - input_keys=["detections_3d"], - output_keys=["person_point"], - ) - - def execute(self, userdata): - if len(userdata.detections_3d.detected_objects) == 0: - rospy.logwarn("No person detected, returning failed.") - return "failed" - userdata.person_point = userdata.detections_3d.detected_objects[0].point - return "succeeded" - - class HandleResponse(smach.State): - def __init__(self): - smach.State.__init__( - self, - outcomes=["succeeded", "failed"], - input_keys=["gesture_detected"], - ) - - def execute(self, userdata): - if userdata.gesture_detected: - return "succeeded" - return "failed" - - def __init__( - self, - gesture: str, - waypoints: Union[List[Pose], None] = None, - location_param: Union[str, None] = None, - ): - smach.StateMachine.__init__( - self, outcomes=["succeeded", "failed"], output_keys=["person_point"] - ) - if waypoints is None and location_param is None: - raise ValueError("Either waypoints or location_param must be provided") - - if gesture == "person raising their left arm": - self.gesture = "raising_left_arm" - elif gesture == "person raising their right arm": - self.gesture = "raising_right_arm" - elif gesture == "person pointing to the right": - self.gesture == "pointing_to_the_right" - elif gesture == "person pointing to the left": - self.gesture == "pointing_to_the_left" - else: - raise ValueError(f"Gesture {gesture} not recognized") - - if waypoints is None: - waypoints_to_iterate: List[Pose] = [] - - room = rospy.get_param(location_param) - beacons = room["beacons"] - for beacon in beacons: - waypoint: Pose = Pose( - position=Point(**beacons[beacon]["near_pose"]["position"]), - orientation=Quaternion( - **beacons[beacon]["near_pose"]["orientation"] - ), - ) - waypoints_to_iterate.append(waypoint) - else: - waypoints_to_iterate: List[Pose] = waypoints - - with self: - smach.StateMachine.add( - "GET_POSE", - self.GetPose(), - transitions={"succeeded": "COMPUTE_PATH", "failed": "failed"}, - ) - - smach.StateMachine.add( - "COMPUTE_PATH", - self.ComputePath(waypoints_to_iterate), - transitions={"succeeded": "WAYPOINT_ITERATOR", "failed": "failed"}, - ) - - waypoint_iterator = smach.Iterator( - outcomes=["succeeded", "failed"], - it=lambda: range(len(waypoints_to_iterate)), - it_label="location_index", - input_keys=["waypoints"], - output_keys=["person_point"], - exhausted_outcome="failed", - ) - - with waypoint_iterator: - container_sm = smach.StateMachine( - outcomes=["succeeded", "failed", "continue"], - input_keys=["location_index", "waypoints"], - output_keys=["person_point"], - ) - - with container_sm: - smach.StateMachine.add( - "GET_LOCATION", - self.GetLocation(), - transitions={"succeeded": "GO_TO_LOCATION", "failed": "failed"}, - ) - - smach.StateMachine.add( - "GO_TO_LOCATION", - GoToLocation(), - transitions={ - "succeeded": "DETECT3D", - "failed": "failed", - }, - ) - smach.StateMachine.add( - "DETECT3D", - Detect3D(filter=["person"]), - transitions={ - "succeeded": "HANDLE_DETECTIONS", - "failed": "failed", - }, - ) - smach.StateMachine.add( - "HANDLE_DETECTIONS", - self.HandleDetections(), - transitions={ - "succeeded": "PLAY_MOTION", - "failed": "continue", - }, - ), - smach.StateMachine.add( - "PLAY_MOTION", - PlayMotion(motion_name="raise_torso"), - transitions={ - "succeeded": "GET_IMAGE", - "aborted": "failed", - "preempted": "failed", - }, - ), - smach.StateMachine.add( - "GET_IMAGE", - GetImage(), - transitions={"succeeded": "CHECK_GESTURE", "failed": "failed"}, - ) - smach.StateMachine.add( - "CHECK_GESTURE", - DetectGesture(self.gesture), - transitions={ - "succeeded": "HANDLE_RESPONSE", - "failed": "failed", - }, - ) - smach.StateMachine.add( - "HANDLE_RESPONSE", - self.HandleResponse(), - transitions={"succeeded": "succeeded", "failed": "continue"}, - ) - - waypoint_iterator.set_contained_state( - "CONTAINER_STATE", container_sm, loop_outcomes=["continue"] - ) - smach.StateMachine.add( - "WAYPOINT_ITERATOR", - waypoint_iterator, - {"succeeded": "succeeded", "failed": "failed"}, - ) diff --git a/skills/src/lasr_skills/find_named_person.py b/skills/src/lasr_skills/find_named_person.py deleted file mode 100755 index 0631bbc57..000000000 --- a/skills/src/lasr_skills/find_named_person.py +++ /dev/null @@ -1,196 +0,0 @@ -import smach -import rospy - -from lasr_skills import Detect3D, GoToLocation, AskAndListen -import navigation_helpers - -from geometry_msgs.msg import Pose, PoseWithCovarianceStamped, Point, Quaternion - -from typing import List, Union - - -class FindNamedPerson(smach.StateMachine): - class GetLocation(smach.State): - def __init__(self): - smach.State.__init__( - self, - outcomes=["succeeded", "failed"], - input_keys=["location_index", "waypoints"], - output_keys=["location"], - ) - - def execute(self, userdata): - userdata.location = userdata.waypoints[userdata.location_index] - return "succeeded" - - class GetPose(smach.State): - def __init__(self): - smach.State.__init__( - self, - outcomes=["succeeded", "failed"], - output_keys=["current_pose"], - ) - - def execute(self, userdata): - userdata.current_pose = rospy.wait_for_message( - "/amcl_pose", PoseWithCovarianceStamped - ).pose.pose - return "succeeded" - - class ComputePath(smach.State): - def __init__(self, waypoints: List[Pose]): - smach.State.__init__( - self, - outcomes=["succeeded", "failed"], - input_keys=["current_pose"], - output_keys=["waypoints"], - ) - self._waypoints = waypoints - - def execute(self, userdata): - userdata.waypoints = navigation_helpers.min_hamiltonian_path( - userdata.current_pose, self._waypoints - ) - return "succeeded" - - class HandleDetections(smach.State): - def __init__(self): - smach.State.__init__( - self, - outcomes=["succeeded", "failed"], - input_keys=["detections_3d"], - output_keys=["person_point"], - ) - - def execute(self, userdata): - if len(userdata.detections_3d.detected_objects) == 0: - rospy.logwarn("No person detected, returning failed.") - return "failed" - userdata.person_point = userdata.detections_3d.detected_objects[0].point - return "succeeded" - - class HandleResponse(smach.State): - def __init__(self): - smach.State.__init__( - self, - outcomes=["succeeded", "failed"], - input_keys=["transcribed_speech"], - ) - - def execute(self, userdata): - # TODO: make this smarter,e.g. levenshtein distance - if "yes" in userdata.transcribed_speech.lower(): - return "succeeded" - return "failed" - - def __init__( - self, - name: str, - waypoints: Union[List[Pose], None] = None, - location_param: Union[str, None] = None, - ): - smach.StateMachine.__init__( - self, outcomes=["succeeded", "failed"], output_keys=["person_point"] - ) - - if waypoints is None and location_param is None: - raise ValueError("Either waypoints or location_param must be provided") - - if waypoints is None: - waypoints_to_iterate: List[Pose] = [] - - room = rospy.get_param(location_param) - beacons = room["beacons"] - for beacon in beacons: - waypoint: Pose = Pose( - position=Point(**beacons[beacon]["near_pose"]["position"]), - orientation=Quaternion( - **beacons[beacon]["near_pose"]["orientation"] - ), - ) - waypoints_to_iterate.append(waypoint) - else: - waypoints_to_iterate: List[Pose] = waypoints - - with self: - smach.StateMachine.add( - "GET_POSE", - self.GetPose(), - transitions={"succeeded": "COMPUTE_PATH", "failed": "failed"}, - ) - - smach.StateMachine.add( - "COMPUTE_PATH", - self.ComputePath(waypoints_to_iterate), - transitions={"succeeded": "WAYPOINT_ITERATOR", "failed": "failed"}, - ) - - waypoint_iterator = smach.Iterator( - outcomes=["succeeded", "failed"], - it=lambda: range(len(waypoints_to_iterate)), - it_label="location_index", - input_keys=["waypoints"], - output_keys=["person_point"], - exhausted_outcome="failed", - ) - - with waypoint_iterator: - container_sm = smach.StateMachine( - outcomes=["succeeded", "failed", "continue"], - input_keys=["location_index", "waypoints"], - output_keys=["person_point"], - ) - - with container_sm: - smach.StateMachine.add( - "GET_LOCATION", - self.GetLocation(), - transitions={"succeeded": "GO_TO_LOCATION", "failed": "failed"}, - ) - - smach.StateMachine.add( - "GO_TO_LOCATION", - GoToLocation(), - transitions={ - "succeeded": "DETECT3D", - "failed": "failed", - }, - ) - smach.StateMachine.add( - "DETECT3D", - Detect3D(filter=["person"]), - transitions={ - "succeeded": "HANDLE_DETECTIONS", - "failed": "failed", - }, - ) - smach.StateMachine.add( - "HANDLE_DETECTIONS", - self.HandleDetections(), - transitions={ - "succeeded": "CHECK_NAME", - "failed": "continue", - }, - ) - smach.StateMachine.add( - "CHECK_NAME", - AskAndListen(f"I'm looking for {name}. Are you {name}?"), - transitions={ - "succeeded": "HANDLE_RESPONSE", - "failed": "failed", - }, - ) - smach.StateMachine.add( - "HANDLE_RESPONSE", - self.HandleResponse(), - transitions={"succeeded": "succeeded", "failed": "continue"}, - ) - - waypoint_iterator.set_contained_state( - "CONTAINER_STATE", container_sm, loop_outcomes=["continue"] - ) - smach.StateMachine.add( - "WAYPOINT_ITERATOR", - waypoint_iterator, - {"succeeded": "succeeded", "failed": "failed"}, - ) diff --git a/skills/src/lasr_skills/find_person.py b/skills/src/lasr_skills/find_person.py index e975f25d5..59c452275 100755 --- a/skills/src/lasr_skills/find_person.py +++ b/skills/src/lasr_skills/find_person.py @@ -1,15 +1,38 @@ +#!/usr/bin/env python3 import smach +import smach_ros import rospy -from lasr_skills import Detect3D, GoToLocation +from lasr_skills import ( + GoToLocation, + AskAndListen, + DetectGesture, + DetectClothing, + DetectPose, +) import navigation_helpers -from geometry_msgs.msg import Pose, PoseWithCovarianceStamped +from geometry_msgs.msg import ( + Pose, + PoseWithCovarianceStamped, + Polygon, + PoseStamped, + Point, + Quaternion, +) +from lasr_vision_msgs.msg import CDRequest, CDResponse +from lasr_vision_msgs.srv import ( + CroppedDetectionRequest, + CroppedDetection, +) -from typing import List + +from typing import List, Literal +import itertools class FindPerson(smach.StateMachine): + class GetLocation(smach.State): def __init__(self): smach.State.__init__( @@ -19,24 +42,10 @@ def __init__(self): output_keys=["location"], ) - def execute(self, userdata): + def execute(self, userdata) -> str: userdata.location = userdata.waypoints[userdata.location_index] return "succeeded" - class GetPose(smach.State): - def __init__(self): - smach.State.__init__( - self, - outcomes=["succeeded", "failed"], - output_keys=["current_pose"], - ) - - def execute(self, userdata): - userdata.current_pose = rospy.wait_for_message( - "/amcl_pose", PoseWithCovarianceStamped - ).pose.pose - return "succeeded" - class ComputePath(smach.State): def __init__(self, waypoints: List[Pose]): smach.State.__init__( @@ -47,39 +56,322 @@ def __init__(self, waypoints: List[Pose]): ) self._waypoints = waypoints - def execute(self, userdata): + def execute(self, userdata) -> str: + current_pose: Pose = rospy.wait_for_message( + "/robot_pose", PoseWithCovarianceStamped + ).pose.pose userdata.waypoints = navigation_helpers.min_hamiltonian_path( - userdata.current_pose, self._waypoints + current_pose, self._waypoints ) return "succeeded" - class HandleDetections(smach.State): - def __init__(self): - smach.State.__init__( + class HandleDetections(smach.StateMachine): + + class GetResponse(smach.State): + def __init__(self): + smach.State.__init__( + self, + outcomes=["succeeded", "failed"], + input_keys=["responses"], + output_keys=[ + "response", + "responses", + "person_point", + "cropped_image", + ], + ) + + def execute(self, userdata): + if len(userdata.responses[0].detections_3d) == 0: + rospy.logwarn("No response available, returning failed.") + return "failed" + response = userdata.responses[0].detections_3d.pop(0) + userdata.response = response + userdata.cropped_image = userdata.responses[0].cropped_imgs.pop(0) + userdata.person_point = response.point + return "succeeded" + + class ApproachPerson(smach.StateMachine): + + class ComputeApproachPose(smach.State): + + def __init__(self): + smach.State.__init__( + self, + outcomes=["succeeded", "failed"], + input_keys=["person_point"], + output_keys=["approach_pose"], + ) + + def execute(self, userdata): + robot_pose_with_covariance = rospy.wait_for_message( + "/robot_pose", PoseWithCovarianceStamped + ) + robot_pose = PoseStamped( + pose=robot_pose_with_covariance.pose.pose, + header=robot_pose_with_covariance.header, + ) + + person_pose = PoseStamped( + pose=Pose( + position=userdata.person_point, + orientation=robot_pose.pose.orientation, + ), + header=robot_pose.header, + ) + approach_pose = navigation_helpers.get_pose_on_path( + robot_pose, + person_pose, + ) + rospy.loginfo(approach_pose) + + if approach_pose is None: + return "failed" + + approach_pose.pose.orientation = ( + navigation_helpers.compute_face_quat( + approach_pose.pose, + person_pose.pose, + ) + ) + userdata.approach_pose = approach_pose.pose + + return "succeeded" + + def __init__(self): + smach.StateMachine.__init__( + self, + outcomes=["succeeded", "failed"], + input_keys=["person_point"], + ) + + with self: + + smach.StateMachine.add( + "COMPUTE_APPROACH_POSE", + self.ComputeApproachPose(), + transitions={"succeeded": "GO_TO_PERSON", "failed": "failed"}, + ) + + smach.StateMachine.add( + "GO_TO_PERSON", + GoToLocation(), + transitions={ + "succeeded": "succeeded", + "failed": "failed", + }, + remapping={"location": "approach_pose"}, + ) + + def __init__( + self, + criteria: Literal["name", "pose", "gesture", "clothes"], + criteria_value: str, + ): + smach.StateMachine.__init__( self, outcomes=["succeeded", "failed"], - input_keys=["detections_3d"], - output_keys=["person_point"], + input_keys=["responses"], + output_keys=["responses", "person_point"], ) - def execute(self, userdata): - if len(userdata.detections_3d.detected_objects) == 0: - rospy.logwarn("No person detected, returning failed.") - return "failed" - userdata.person_point = userdata.detections_3d.detected_objects[0].point - return "succeeded" + with self: + + if criteria == "name": + + class HandleSpeechResponse(smach.State): + + def __init__(self): + smach.State.__init__( + self, + outcomes=["succeeded", "failed"], + input_keys=["transcribed_speech"], + ) + + def execute(self, userdata): + # TODO: make this smarter,e.g. levenshtein distance + if "yes" in userdata.transcribed_speech.lower(): + return "succeeded" + return "failed" + + smach.StateMachine.add( + "GET_RESPONSE", + self.GetResponse(), + transitions={ + "succeeded": "GO_TO_PERSON", + "failed": "failed", + }, + remapping={"img_msg": "cropped_image"}, + ) + + smach.StateMachine.add( + "GO_TO_PERSON", + self.ApproachPerson(), + transitions={ + "succeeded": "CHECK_NAME", + "failed": "GET_RESPONSE", + }, + remapping={"location": "approach_pose"}, + ) + + smach.StateMachine.add( + "CHECK_NAME", + AskAndListen( + f"I'm looking for {criteria_value}. Are you {criteria_value}?" + ), + transitions={ + "succeeded": "HANDLE_SPEECH_RESPONSE", + "failed": "failed", + }, + ) + smach.StateMachine.add( + "HANDLE_SPEECH_RESPONSE", + HandleSpeechResponse(), + transitions={ + "succeeded": "succeeded", + "failed": "GET_RESPONSE", + }, + ) + + elif criteria == "gesture": + + smach.StateMachine.add( + "GET_RESPONSE", + self.GetResponse(), + transitions={"succeeded": "DETECT_GESTURE", "failed": "failed"}, + ) + + smach.StateMachine.add( + "DETECT_GESTURE", + DetectGesture(criteria_value), + transitions={ + "succeeded": "GO_TO_PERSON", + "missing_keypoints": "GET_RESPONSE", + "failed": "GET_RESPONSE", + }, + remapping={"img_msg": "cropped_image"}, + ) + + smach.StateMachine.add( + "GO_TO_PERSON", + self.ApproachPerson(), + transitions={ + "succeeded": "succeeded", + "failed": "GET_RESPONSE", + }, + remapping={"location": "approach_pose"}, + ) + + elif criteria == "pose": + + smach.StateMachine.add( + "GET_RESPONSE", + self.GetResponse(), + transitions={ + "succeeded": "DETECT_POSE", + "failed": "failed", + }, + ) + + smach.StateMachine.add( + "DETECT_POSE", + DetectPose(criteria_value), + transitions={ + "succeeded": "GO_TO_PERSON", + "failed": "GET_RESPONSE", + }, + remapping={"img_msg": "cropped_image"}, + ) + + smach.StateMachine.add( + "GO_TO_PERSON", + self.ApproachPerson(), + transitions={ + "succeeded": "succeeded", + "failed": "GET_RESPONSE", + }, + remapping={"location": "approach_pose"}, + ) + + elif criteria == "clothes": + + smach.StateMachine.add( + "GET_RESPONSE", + self.GetResponse(), + transitions={ + "succeeded": "DETECT_CLOTHING", + "failed": "failed", + }, + ) - def __init__(self, waypoints: List[Pose]): - smach.StateMachine.__init__( - self, outcomes=["succeeded", "failed"], output_keys=["person_point"] - ) + smach.StateMachine.add( + "DETECT_CLOTHING", + DetectClothing(criteria_value), + transitions={ + "succeeded": "GO_TO_PERSON", + "failed": "GET_RESPONSE", + }, + remapping={"img_msg": "cropped_image"}, + ) + + smach.StateMachine.add( + "GO_TO_PERSON", + self.ApproachPerson(), + transitions={ + "succeeded": "succeeded", + "failed": "GET_RESPONSE", + }, + remapping={"location": "approach_pose"}, + ) + + def __init__( + self, + waypoints: List[Pose], + polygon: Polygon, + criteria: Literal["name", "pose", "gesture", "clothes"], + criteria_value: str, + ): + + assert criteria in ["name", "pose", "gesture", "clothes"], "Invalid criteria" + + if criteria == "gesture": + assert criteria_value in [ + "raising_left_arm", + "raising_right_arm", + "pointing_to_the_right", + "pointing_to_the_left", + "waving", + ], "Invalid gesture" + elif criteria == "pose": + assert criteria_value in [ + "sitting", + "standing", + "lying_down", + ], "Invalid pose" + elif criteria == "clothes": + color_list = ["blue", "yellow", "black", "white", "red", "orange", "gray"] + clothe_list = ["t shirt", "shirt", "blouse", "sweater", "coat", "jacket"] + clothes_list = [ + "t shirts", + "shirts", + "blouses", + "sweaters", + "coats", + "jackets", + ] + color_clothe_list: List[str] = [] + for a, b in list(itertools.product(color_list, clothe_list)): + color_clothe_list = color_clothe_list + [a + " " + b] + color_clothes_list: List[str] = [] + for a, b in list(itertools.product(color_list, clothes_list)): + color_clothes_list = color_clothes_list + [a + " " + b] + assert ( + criteria_value in color_clothe_list + color_clothes_list + ), "Invalid clothing" + + smach.StateMachine.__init__(self, outcomes=["succeeded", "failed"]) with self: - smach.StateMachine.add( - "GET_POSE", - self.GetPose(), - transitions={"succeeded": "COMPUTE_PATH", "failed": "failed"}, - ) smach.StateMachine.add( "COMPUTE_PATH", @@ -114,27 +406,48 @@ def __init__(self, waypoints: List[Pose]): "GO_TO_LOCATION", GoToLocation(), transitions={ - "succeeded": "DETECT3D", + "succeeded": "DETECT", "failed": "failed", }, ) + smach.StateMachine.add( - "DETECT3D", - Detect3D(filter=["person"]), + "DETECT", + smach_ros.ServiceState( + "/vision/cropped_detection", + CroppedDetection, + request=CroppedDetectionRequest( + requests=[ + CDRequest( + method="closest", + use_mask=True, + yolo_model="yolov8x-seg.pt", + yolo_model_confidence=0.5, + yolo_nms_threshold=0.3, + return_sensor_reading=False, + object_names=["person"], + polygons=[polygon], + ) + ] + ), + output_keys=["responses"], + response_slots=["responses"], + ), transitions={ "succeeded": "HANDLE_DETECTIONS", - "failed": "failed", + "aborted": "failed", + "preempted": "failed", }, ) + smach.StateMachine.add( "HANDLE_DETECTIONS", - self.HandleDetections(), + self.HandleDetections(criteria, criteria_value), transitions={ "succeeded": "succeeded", "failed": "continue", }, ) - waypoint_iterator.set_contained_state( "CONTAINER_STATE", container_sm, loop_outcomes=["continue"] ) @@ -143,3 +456,42 @@ def __init__(self, waypoints: List[Pose]): waypoint_iterator, {"succeeded": "succeeded", "failed": "failed"}, ) + + +if __name__ == "__main__": + import rospy + from geometry_msgs.msg import Point, Quaternion, Pose + + rospy.init_node("find_person") + + waypoints = [ + Pose( + position=Point(x=2.46, y=-2.15, z=0), + orientation=Quaternion(x=0, y=0, z=-0.993, w=0.116), + ), + Pose( + position=Point(x=0.88, y=1.04, z=0), + orientation=Quaternion(x=0, y=0, z=0.125, w=0.992), + ), + Pose( + position=Point(x=4.05, y=-2.16, z=0), + orientation=Quaternion(x=0, y=0, z=-0.995, w=0.090), + ), + ] + + polygon = Polygon( + [ + Point(x=-0.4806538224220276, y=-5.140193462371826, z=0.002532958984375), + Point(x=6.777748107910156, y=-5.0068678855896, z=0.002532958984375), + Point(x=7.11236047744751, y=2.4408864974975586, z=-0.001434326171875), + Point(x=-4.766469955444336, y=2.666473627090454, z=-0.005340576171875), + ] + ) + + find_person = FindPerson( + waypoints=waypoints, + polygon=polygon, + criteria="name", + criteria_value="jared", + ) + print(find_person.execute()) diff --git a/skills/src/lasr_skills/find_person_and_tell.py b/skills/src/lasr_skills/find_person_and_tell.py new file mode 100644 index 000000000..2e3fc2764 --- /dev/null +++ b/skills/src/lasr_skills/find_person_and_tell.py @@ -0,0 +1,354 @@ +#!/usr/bin/env python3 +import smach +import smach_ros +import rospy + +from lasr_skills import ( + GoToLocation, + AskAndListen, + DetectGesture, + DetectClothing, + DetectPose, +) +import navigation_helpers + +from geometry_msgs.msg import ( + Pose, + PoseWithCovarianceStamped, + Polygon, + PoseStamped, + Point, + Quaternion, +) +from lasr_vision_msgs.msg import CDRequest, CDResponse +from lasr_vision_msgs.srv import ( + CroppedDetectionRequest, + CroppedDetection, +) + + +from typing import List, Literal + + +class FindPersonAndTell(smach.StateMachine): + + class GetLocation(smach.State): + def __init__(self): + smach.State.__init__( + self, + outcomes=["succeeded", "failed"], + input_keys=["location_index", "waypoints"], + output_keys=["location"], + ) + + def execute(self, userdata) -> str: + userdata.location = userdata.waypoints[userdata.location_index] + return "succeeded" + + class ComputePath(smach.State): + def __init__(self, waypoints: List[Pose]): + smach.State.__init__( + self, + outcomes=["succeeded", "failed"], + input_keys=["current_pose"], + output_keys=["waypoints"], + ) + self._waypoints = waypoints + + def execute(self, userdata) -> str: + current_pose: Pose = rospy.wait_for_message( + "/robot_pose", PoseWithCovarianceStamped + ).pose.pose + userdata.waypoints = navigation_helpers.min_hamiltonian_path( + current_pose, self._waypoints + ) + return "succeeded" + + class HandleDetections(smach.StateMachine): + + class GetResponse(smach.State): + def __init__(self): + smach.State.__init__( + self, + outcomes=["succeeded", "failed"], + input_keys=["responses"], + output_keys=[ + "response", + "responses", + "person_point", + "cropped_image", + ], + ) + + def execute(self, userdata): + if len(userdata.responses[0].detections_3d) == 0: + rospy.logwarn("No response available, returning failed.") + return "failed" + response = userdata.responses[0].detections_3d.pop(0) + userdata.response = response + userdata.cropped_image = userdata.responses[0].cropped_imgs.pop(0) + userdata.person_point = response.point + return "succeeded" + + class ApproachPerson(smach.StateMachine): + + class ComputeApproachPose(smach.State): + + def __init__(self): + smach.State.__init__( + self, + outcomes=["succeeded", "failed"], + input_keys=["person_point"], + output_keys=["approach_pose"], + ) + + def execute(self, userdata): + robot_pose_with_covariance = rospy.wait_for_message( + "/robot_pose", PoseWithCovarianceStamped + ) + robot_pose = PoseStamped( + pose=robot_pose_with_covariance.pose.pose, + header=robot_pose_with_covariance.header, + ) + + person_pose = PoseStamped( + pose=Pose( + position=userdata.person_point, + orientation=robot_pose.pose.orientation, + ), + header=robot_pose.header, + ) + approach_pose = navigation_helpers.get_pose_on_path( + robot_pose, + person_pose, + ) + rospy.loginfo(approach_pose) + + if approach_pose is None: + return "failed" + + approach_pose.pose.orientation = ( + navigation_helpers.compute_face_quat( + approach_pose.pose, + person_pose.pose, + ) + ) + userdata.approach_pose = approach_pose.pose + + return "succeeded" + + def __init__(self): + smach.StateMachine.__init__( + self, + outcomes=["succeeded", "failed"], + input_keys=["person_point"], + ) + + with self: + + smach.StateMachine.add( + "COMPUTE_APPROACH_POSE", + self.ComputeApproachPose(), + transitions={"succeeded": "GO_TO_PERSON", "failed": "failed"}, + ) + + smach.StateMachine.add( + "GO_TO_PERSON", + GoToLocation(), + transitions={ + "succeeded": "succeeded", + "failed": "failed", + }, + remapping={"location": "approach_pose"}, + ) + + def __init__( + self, + query: Literal["name", "pose", "gesture", "clothes"], + ): + smach.StateMachine.__init__( + self, + outcomes=["succeeded", "failed"], + input_keys=["responses"], + output_keys=["responses", "query_result"], + ) + + with self: + + if query == "name": + + smach.StateMachine.add( + "GET_RESPONSE", + self.GetResponse(), + transitions={ + "succeeded": "GO_TO_PERSON", + "failed": "failed", + }, + remapping={"img_msg": "cropped_image"}, + ) + + smach.StateMachine.add( + "GO_TO_PERSON", + self.ApproachPerson(), + transitions={ + "succeeded": "ASK_NAME", + "failed": "GET_RESPONSE", + }, + remapping={"location": "approach_pose"}, + ) + + smach.StateMachine.add( + "ASK_NAME", + AskAndListen( + "What is your name?", + ), + transitions={ + "succeeded": "succeeded", + "failed": "failed", + }, + remapping={"transcribed_speceh": "query_result"}, + ) + + elif query == "gesture": + + smach.StateMachine.add( + "GET_RESPONSE", + self.GetResponse(), + transitions={"succeeded": "DETECT_GESTURE", "failed": "failed"}, + ) + + smach.StateMachine.add( + "DETECT_GESTURE", + DetectGesture(), + transitions={ + "succeeded": "succeeded", + "failed": "GET_RESPONSE", + }, + remapping={ + "img_msg": "cropped_image", + "detected_gesture": "query_result", + }, + ) + + elif query == "pose": + + smach.StateMachine.add( + "GET_RESPONSE", + self.GetResponse(), + transitions={ + "succeeded": "DETECT_POSE", + "failed": "failed", + }, + ) + + smach.StateMachine.add( + "DETECT_POSE", + DetectPose(), + transitions={ + "succeeded": "GO_TO_PERSON", + "failed": "GET_RESPONSE", + }, + remapping={ + "img_msg": "cropped_image", + "detected_pose": "query_result", + }, + ) + + def __init__( + self, + waypoints: List[Pose], + polygon: Polygon, + query: Literal["name", "pose", "gesture"], + ): + + assert query in ["name", "pose", "gesture"], "Invalid query" + + smach.StateMachine.__init__( + self, outcomes=["succeeded", "failed"], output_keys=["query_result"] + ) + + with self: + + smach.StateMachine.add( + "COMPUTE_PATH", + self.ComputePath(waypoints), + transitions={"succeeded": "WAYPOINT_ITERATOR", "failed": "failed"}, + ) + + waypoint_iterator = smach.Iterator( + outcomes=["succeeded", "failed"], + it=lambda: range(len(waypoints)), + it_label="location_index", + input_keys=["waypoints"], + output_keys=["person_point"], + exhausted_outcome="failed", + ) + + with waypoint_iterator: + container_sm = smach.StateMachine( + outcomes=["succeeded", "failed", "continue"], + input_keys=["location_index", "waypoints"], + output_keys=["person_point"], + ) + + with container_sm: + smach.StateMachine.add( + "GET_LOCATION", + self.GetLocation(), + transitions={"succeeded": "GO_TO_LOCATION", "failed": "failed"}, + ) + + smach.StateMachine.add( + "GO_TO_LOCATION", + GoToLocation(), + transitions={ + "succeeded": "DETECT", + "failed": "failed", + }, + ) + + smach.StateMachine.add( + "DETECT", + smach_ros.ServiceState( + "/vision/cropped_detection", + CroppedDetection, + request=CroppedDetectionRequest( + requests=[ + CDRequest( + method="closest", + use_mask=True, + yolo_model="yolov8x-seg.pt", + yolo_model_confidence=0.5, + yolo_nms_threshold=0.3, + return_sensor_reading=False, + object_names=["person"], + polygons=[polygon], + ) + ] + ), + output_keys=["responses"], + response_slots=["responses"], + ), + transitions={ + "succeeded": "HANDLE_DETECTIONS", + "aborted": "failed", + "preempted": "failed", + }, + ) + + smach.StateMachine.add( + "HANDLE_DETECTIONS", + self.HandleDetections(query), + transitions={ + "succeeded": "succeeded", + "failed": "continue", + }, + ) + waypoint_iterator.set_contained_state( + "CONTAINER_STATE", container_sm, loop_outcomes=["continue"] + ) + smach.StateMachine.add( + "WAYPOINT_ITERATOR", + waypoint_iterator, + {"succeeded": "succeeded", "failed": "failed"}, + ) diff --git a/skills/src/lasr_skills/go_to_location.py b/skills/src/lasr_skills/go_to_location.py index 3d30129e8..881a2535f 100755 --- a/skills/src/lasr_skills/go_to_location.py +++ b/skills/src/lasr_skills/go_to_location.py @@ -10,6 +10,11 @@ from lasr_skills import PlayMotion +import rospkg +import rosparam + +import os + PUBLIC_CONTAINER = False try: @@ -36,6 +41,13 @@ def __init__( outcomes=["succeeded", "failed"], input_keys=["location"] ) + r = rospkg.RosPack() + els = rosparam.load_file( + os.path.join(r.get_path("lasr_skills"), "config", "motions.yaml") + ) + for param, ns in els: + rosparam.upload_params(ns, param) + IS_SIMULATION = ( "/pal_startup_control/start" not in rosservice.get_service_list() ) diff --git a/skills/src/lasr_skills/guide.py b/skills/src/lasr_skills/guide.py new file mode 100644 index 000000000..8eb912b72 --- /dev/null +++ b/skills/src/lasr_skills/guide.py @@ -0,0 +1,43 @@ +import smach + +from geometry_msgs.msg import Pose + +from lasr_skills import GoToLocation, Say + + +class Guide(smach.StateMachine): + + def __init__(self, location_name: str, location_pose: Pose): + + smach.StateMachine.__init__(self, outcomes=["succeeded", "failed"]) + + with self: + + smach.StateMachine.add( + "SAY_FOLLOW_ME", + Say(text=f"Please follow me, I will guide you to the {location_name}"), + transitions={ + "succeeded": "GO_TO_LOCATION", + "aborted": "GO_TO_LOCATION", + "preempted": "GO_TO_LOCATION", + }, + ) + + smach.StateMachine.add( + "GO_TO_LOCATION", + GoToLocation(location_pose), + transitions={ + "succeeded": "SAY_DONE", + "failed": "failed", + }, + ) + + smach.StateMachine.add( + "SAY_DONE", + Say(text=f"We have arrived at the {location_name}."), + transitions={ + "succeeded": "succeeded", + "aborted": "failed", + "preempted": "failed", + }, + ) diff --git a/skills/src/lasr_skills/vision/get_cropped_image.py b/skills/src/lasr_skills/vision/get_cropped_image.py index 0a03771f2..fc0b1f5f1 100755 --- a/skills/src/lasr_skills/vision/get_cropped_image.py +++ b/skills/src/lasr_skills/vision/get_cropped_image.py @@ -20,7 +20,7 @@ class GetCroppedImage(smach.State): def __init__( self, object_name: str, - method: str = "centered", + method: str = "closest", use_mask: bool = True, yolo_model: str = "yolov8x-seg.pt", yolo_model_confidence: float = 0.5, diff --git a/tasks/gpsr/config/floor_6.yaml b/tasks/gpsr/config/floor_6.yaml new file mode 100644 index 000000000..dc5f14a3d --- /dev/null +++ b/tasks/gpsr/config/floor_6.yaml @@ -0,0 +1,377 @@ +arena: + polygon: + # TODO: this is a dummy polygon, replace it with the actual polygon + - x: -1.7358531951904297 + y: 1.9025657176971436 + z: 0.0 + - x: -0.7934322357177734 + y: -1.303222417831421 + z: 0.0 + - x: 6.543567180633545 + y: -4.914205074310303 + z: 0.0 + - x: 5.1799726486206055 + y: -0.0733039379119873 + z: 0.0 + rooms: + bedroom: + pose: + position: + x: 2.367072562284141 + y: -2.1529118634999143 + z: 0.0 + orientation: + x: 0.0 + y: 0.0 + z: -0.6141410947219685 + w: 0.7891962466799384 + people_search_poses: + - position: + x: 0.11676260559392795 + y: -1.6380017166320768 + z: 0.0 + orientation: + x: 0.0 + y: 0.0 + z: -0.33460244331795674 + w: 0.9423593820436307 + - position: + x: 4.462289676930589 + y: -0.5284883202773818 + z: 0.0 + orientation: + x: 0.0 + y: 0.0 + z: -0.8599135222234082 + w: 0.5104397460007714 + room_polygon: + - x: -0.9275723695755005 + y: -1.6387131214141846 + z: 0.0 + - x: -0.043175578117370605 + y: -5.141121864318848 + z: 0.0 + - x: 6.543567180633545 + y: -4.914205074310303 + z: 0.0 + - x: 5.1799726486206055 + y: -0.0733039379119873 + z: 0.0 + beacons: + bed: + placeable: true + object_category: all + object_detection_pose: + position: + x: 2.562624240536362 + y: -3.1575592820569067 + z: 0.0 + orientation: + x: 0.0 + y: 0.0 + z: -0.6357181444855956 + w: 0.771921265914984 + object_detection_point: + x: 2.8841753005981445 + y: -4.464163780212402 + z: 0.5 + object_detection_polygon: + # TODO: this is a dummy polygon, replace it with the actual polygon + - x: 2.8841753005981445 + y: -4.464163780212402 + z: 0.5 + - x: 2.8841753005981445 + y: -3.464163780212402 + z: 0.5 + - x: 2.8841753005981445 + y: -2.464163780212402 + z: 0.5 + - x: 2.8841753005981445 + y: -1.464163780212402 + z: 0.5 + person_detection_pose: + position: + x: 2.1915819264044605 + y: -1.849978880570599 + z: 0.0 + orientation: + x: 0.0 + y: 0.0 + z: -0.6104570104528535 + w: 0.7920493913822324 + person_detection_polygon: + - x: 3.733201026916504 + y: -4.791120529174805 + z: 0.0 + - x: 2.2753207683563232 + y: -5.15361213684082 + z: 0.0 + - x: 0.8193349242210388 + y: -3.415123701095581 + z: 0.0 + - x: 3.4637396335601807 + y: -2.8448991775512695 + z: 0.0 + bedside_table: + placeable: true + object_category: all + object_detection_pose: + position: + x: 0.46844828109331477 + y: -2.7787364857911587 + z: 0.0 + orientation: + x: 0.0 + y: 0.0 + z: -0.5940872852706002 + w: 0.8044005827197097 + object_detection_point: + x: 0.7542681694030762 + y: -3.81427001953125 + z: 0.5 + object_detection_polygon: + # TODO: this is a dummy polygon, replace it with the actual polygon + - x: 2.8841753005981445 + y: -4.464163780212402 + z: 0.5 + - x: 2.8841753005981445 + y: -3.464163780212402 + z: 0.5 + - x: 2.8841753005981445 + y: -2.464163780212402 + z: 0.5 + - x: 2.8841753005981445 + y: -1.464163780212402 + z: 0.5 + person_detection_pose: + position: + x: 1.466431032745179 + y: -2.110786589677801 + z: 0.0 + orientation: + x: 0.0 + y: 0.0 + z: -0.8783946915245289 + w: 0.477935943303627 + person_detection_polygon: + # TODO: this is a dummy polygon, replace it with the actual polygon + - x: 2.8841753005981445 + y: -4.464163780212402 + z: 0.5 + - x: 2.8841753005981445 + y: -3.464163780212402 + z: 0.5 + - x: 2.8841753005981445 + y: -2.464163780212402 + z: 0.5 + - x: 2.8841753005981445 + y: -1.464163780212402 + z: 0.5 + shelf: + placeable: true + object_category: cleaning_supplies + object_detection_pose: + position: + x: 1.1910431437118454 + y: -1.4156583938830964 + z: 0.0 + orientation: + x: 0.0 + y: 0.0 + z: -0.9895964310973278 + w: 0.14387113525454617 + object_detection_point: + x: -0.2082158327102661 + y: -1.8019976615905762 + z: 1.0 + object_detection_polygon: + # TODO: this is a dummy polygon, replace it with the actual polygon + - x: 2.8841753005981445 + y: -4.464163780212402 + z: 0.5 + - x: 2.8841753005981445 + y: -3.464163780212402 + z: 0.5 + - x: 2.8841753005981445 + y: -2.464163780212402 + z: 0.5 + - x: 2.8841753005981445 + y: -1.464163780212402 + z: 0.5 + person_detection_pose: + position: + x: 1.2790687447617268 + y: -1.8264770041619776 + z: 0.0 + orientation: + x: 0.0 + y: 0.0 + z: -0.9999274005163251 + w: 0.012049634710841183 + person_detection_polygon: + # TODO: this is a dummy polygon, replace it with the actual polygon + - x: 2.8841753005981445 + y: -4.464163780212402 + z: 0.5 + - x: 2.8841753005981445 + y: -3.464163780212402 + z: 0.5 + - x: 2.8841753005981445 + y: -2.464163780212402 + z: 0.5 + - x: 2.8841753005981445 + y: -1.464163780212402 + z: 0.5 + living_room: + pose: + position: + x: 1.5507337906536043 + y: 1.1521238898558481 + z: 0.0 + orientation: + x: 0.0 + y: 0.0 + z: -0.6061077408064555 + w: 0.7953825535768902 + people_search_poses: + - position: + x: 4.410522430463428 + y: 0.15863509537529977 + z: 0.0 + orientation: + x: 0.0 + y: 0.0 + z: 0.9586891958958602 + w: 0.2844556655659172 + - position: + x: -0.44435292073486515 + y: -0.2827648285579283 + z: 0.0 + orientation: + x: 0.0 + y: 0.0 + z: 0.23620648127419586 + w: 0.9717028857650176 + room_polygon: + - x: -0.7934322357177734 + y: -1.303222417831421 + z: 0.0 + - x: -1.7358531951904297 + y: 1.9025657176971436 + z: 0.0 + - x: 4.142002105712891 + y: 3.2640256881713867 + z: 0.0 + - x: 4.7735161781311035 + y: -0.06541615724563599 + z: 0.0 + beacons: + sofa: + placeable: true + object_category: "any" + object_detection_pose: + position: + x: 1.1086318163079976 + y: 1.375791007054298 + z: 0.0 + orientation: + x: 0.0 + y: 0.0 + z: 0.7888655924246054 + w: 0.6145657630299431 + object_detection_point: + x: 0.7934732437133789 + y: 2.2658238410949707 + z: 0.5 + object_detection_polygon: + # TODO: this is a dummy polygon, replace it with the actual polygon + - x: 2.8841753005981445 + y: -4.464163780212402 + z: 0.5 + - x: 2.8841753005981445 + y: -3.464163780212402 + z: 0.5 + - x: 2.8841753005981445 + y: -2.464163780212402 + z: 0.5 + - x: 2.8841753005981445 + y: -1.464163780212402 + z: 0.5 + person_detection_pose: + position: + x: 1.6487950425937603 + y: -0.7383865655062152 + z: 0.0 + orientation: + x: 0.0 + y: 0.0 + z: 0.7984934210067214 + w: 0.6020035353791394 + person_detection_polygon: + - x: 3.733201026916504 + y: -4.791120529174805 + z: 0.0 + - x: 2.2753207683563232 + y: -5.15361213684082 + z: 0.0 + - x: 0.8193349242210388 + y: -3.415123701095581 + z: 0.0 + - x: 3.4637396335601807 + y: -2.8448991775512695 + z: 0.0 + bookshelf: + placeable: true + object_category: toys + object_detection_pose: + position: + x: 3.1121494229032702 + y: 2.1099085797580597 + z: 0.0 + orientation: + x: 0.0 + y: 0.0 + z: 0.43746361630651703 + w: 0.8992361116014105 + object_detection_point: + x: 3.399003744125366 + y: 2.6125359535217285 + z: 0.5 + object_detection_polygon: + # TODO: this is a dummy polygon, replace it with the actual polygon + - x: 2.8841753005981445 + y: -4.464163780212402 + z: 0.5 + - x: 2.8841753005981445 + y: -3.464163780212402 + z: 0.5 + - x: 2.8841753005981445 + y: -2.464163780212402 + z: 0.5 + - x: 2.8841753005981445 + y: -1.464163780212402 + z: 0.5 + person_detection_pose: + position: + x: 2.442729631618631 + y: 1.0970585856117665 + z: 0.0 + orientation: + x: 0.0 + y: 0.0 + z: 0.4160179084935318 + w: 0.9093564206694026 + person_detection_polygon: + # TODO: this is a dummy polygon, replace it with the actual polygon + - x: 2.8841753005981445 + y: -4.464163780212402 + z: 0.5 + - x: 2.8841753005981445 + y: -3.464163780212402 + z: 0.5 + - x: 2.8841753005981445 + y: -2.464163780212402 + z: 0.5 + - x: 2.8841753005981445 + y: -1.464163780212402 + z: 0.5 \ No newline at end of file diff --git a/tasks/gpsr/launch/setup.launch b/tasks/gpsr/launch/setup.launch new file mode 100644 index 000000000..48ae7cb34 --- /dev/null +++ b/tasks/gpsr/launch/setup.launch @@ -0,0 +1,44 @@ + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/tasks/gpsr/scripts/main.py b/tasks/gpsr/scripts/main.py index 780b91a4e..c9222d626 100644 --- a/tasks/gpsr/scripts/main.py +++ b/tasks/gpsr/scripts/main.py @@ -1,16 +1,18 @@ #!/usr/bin/env python3 +import os import smach import rospy +import rospkg import sys from typing import Dict from gpsr.load_known_data import GPSRDataLoader -from gpsr.state_machine_factory import GPSRStateMachineFactory +from gpsr.state_machine_factory import build_state_machine from gpsr.regex_command_parser import Configuration from gpsr.states import CommandParserStateMachine def load_gpsr_configuration() -> Configuration: - gpsr_data_dir = sys.argv[1] + gpsr_data_dir = os.path.join(rospkg.RosPack().get_path("gpsr"), "data", "mock_data") """Loads the configuration for the GPSR command parser""" data_loader = GPSRDataLoader(data_dir=gpsr_data_dir) gpsr_known_data: Dict = data_loader.load_data() @@ -34,7 +36,7 @@ def main(): command_parser_sm.execute() parsed_command: Dict = command_parser_sm.userdata.parsed_command rospy.loginfo(f"Parsed command: {parsed_command}") - sm = GPSRStateMachineFactory(parsed_command).build_state_machine() + sm = build_state_machine(parsed_command) sm.execute() diff --git a/tasks/gpsr/src/gpsr/regex_command_parser.py b/tasks/gpsr/src/gpsr/regex_command_parser.py old mode 100644 new mode 100755 index 88777e44d..b1abff2a2 --- a/tasks/gpsr/src/gpsr/regex_command_parser.py +++ b/tasks/gpsr/src/gpsr/regex_command_parser.py @@ -1,3 +1,4 @@ +#!/usr/bin/env python3 import itertools import re @@ -262,7 +263,7 @@ def get_possible_sub_commands(type: str) -> str: f"{verb('follow')} them {prep('toLocPrep')} the {Configuration.pick(configuration, 'location', ['room'])}" ) sub_commands.append( - f"{verb('guide')} them {prep('toLocPrep')} the {Configuration.pick(configuration, 'location', ['room'])}" + f"{verb('guide')} them {prep('toLocPrep')} the {Configuration.pick(configuration, 'location', ['room', 'loc'])}" ) elif type == "foundObj": sub_commands.append( @@ -400,6 +401,7 @@ def gpsr_parse(matches: Dict[str, str]) -> Dict[str, Any]: if key_to_check == "verb": result["commands"].append(reverse_translate_verb_dict(value)) result["command_params"].append({}) + elif key_to_check in [ "object", "location", @@ -409,8 +411,10 @@ def gpsr_parse(matches: Dict[str, str]) -> Dict[str, Any]: "start", "end", "objectcomp", + "personinfo", "clothes", "talk", + "pose", ]: value_to_add = value try: @@ -436,10 +440,14 @@ def gpsr_compile_and_parse(config: Configuration, input: str) -> Dict: object_categories = ( config["object_categories_singular"] + config["object_categories_plural"] ) - return parse_result_dict(gpsr_parse(matches), object_categories) + return parse_result_dict( + gpsr_parse(matches), object_categories, config["room_names"] + ) -def parse_result_dict(result: Dict, object_categories: List[str]) -> Dict: +def parse_result_dict( + result: Dict, object_categories: List[str], rooms: List[str] +) -> Dict: """Parses the result dictionary output by the gpsr parse to handle missing parameters. @@ -458,6 +466,15 @@ def parse_result_dict(result: Dict, object_categories: List[str]) -> Dict: "command_params" ][i]["object"] del result["command_params"][i]["object"] + + # Rename location to room if it is a room + if "location" in result["command_params"][i]: + if result["command_params"][i]["location"] in rooms: + result["command_params"][i]["room"] = result["command_params"][i][ + "location" + ] + del result["command_params"][i]["location"] + # Update command params based on the previous commands params if i > 0: if "location" not in result["command_params"][i]: @@ -470,7 +487,6 @@ def parse_result_dict(result: Dict, object_categories: List[str]) -> Dict: result["command_params"][i - 1]["room"] = result["command_params"][ i - 1 ]["room"] - del result["command_params"][i]["room"] if "name" not in result["command_params"][i]: if "name" in result["command_params"][i - 1]: result["command_params"][i]["name"] = result["command_params"][ @@ -481,7 +497,11 @@ def parse_result_dict(result: Dict, object_categories: List[str]) -> Dict: result["command_params"][i]["object"] = result["command_params"][ i - 1 ]["object"] - + if "object_category" not in result["command_params"][i]: + if "object_category" in result["command_params"][i - 1]: + result["command_params"][i]["object_category"] = result[ + "command_params" + ][i - 1]["object_category"] return result @@ -498,9 +518,9 @@ def reverse_translate_verb_dict(verb: str) -> str: object_categories = object_categories_singular + object_categories_plural config: Configuration = { "person_names": ["guest1", "guest2"], - "location_names": ["sofa", "piano", "kitchen table"], + "location_names": ["sofa", "piano", "kitchen table", "bed"], "placement_location_names": ["kitchen table"], - "room_names": ["living room", "kitchen"], + "room_names": ["living room", "kitchen", "bedroom"], "object_names": ["cup", "television"], "object_categories_plural": ["sticks"], "object_categories_singular": ["stick"], @@ -512,38 +532,40 @@ def reverse_translate_verb_dict(verb: str) -> str: def execute(input: str, object_categories: List[str]): matches = regex.match(input).groupdict() - return parse_result_dict(gpsr_parse(matches), object_categories) + return parse_result_dict( + gpsr_parse(matches), object_categories, config["room_names"] + ) print( execute( - "go to the kitchen then meet guest1 and tell the time", + "go to the sofa then find a cup and take it and bring it to me", object_categories, ) ) - # print( - # execute( - # "navigate to the kitchen then find a cup and get it and bring it to the person pointing to the right in the kitchen", - # object_categories, - # ) - # ) + print( + execute( + "navigate to the kitchen then find a cup and get it and bring it to the person pointing to the right in the kitchen", + object_categories, + ) + ) - # print( - # execute( - # "navigate to the kitchen then find a cup and get it and bring it to me", - # object_categories, - # ) - # ) - # print( - # execute( - # "navigate to the kitchen table then find a stick and fetch it and deliver it to guest1 in the living room", - # object_categories, - # ) - # ) + print( + execute( + "navigate to the kitchen then find a cup and get it and bring it to me", + object_categories, + ) + ) + print( + execute( + "tell me the pose of the person at the sofa", + object_categories, + ) + ) # print( # execute( - # "lead the person wearing a red shirt from the sofa to the living room", + # "navigate to the kitchen table then find a stick.", # object_categories, # ) # ) diff --git a/tasks/gpsr/src/gpsr/state_machine_factory.py b/tasks/gpsr/src/gpsr/state_machine_factory.py index 2e005919b..74974e0e5 100644 --- a/tasks/gpsr/src/gpsr/state_machine_factory.py +++ b/tasks/gpsr/src/gpsr/state_machine_factory.py @@ -1,165 +1,706 @@ #!/usr/bin/env python3 import rospy import smach +import smach_ros from typing import Dict, List -from lasr_skills import GoToLocation, FindNamedPerson, FindGesturePerson # type: ignore -from gpsr.states import Talk +from lasr_skills import ( + GoToLocation, + FindPerson, + Guide, + HandoverObject, + Say, + ReceiveObject, +) +from lasr_person_following.msg import FollowAction, FollowGoal -class GPSRStateMachineFactory: - def __init__(self, parsed_command: Dict): - """Stores the parsed command, initializes the state count, - and initalises the state machine. +from gpsr.states import Talk, QuestionAnswer, GoFindTheObject, ObjectComparison +import os +import rospkg - Args: - parsed_command (Dict): parsed command output by the - command parser. - """ - self.state_count: int = 0 - self.parsed_command: dict = parsed_command - self.sm = smach.StateMachine(outcomes=["succeeded", "failed"]) +from geometry_msgs.msg import ( + Pose, + Point, + Quaternion, + Polygon, + PoseWithCovarianceStamped, +) - def _increment_state_count(self): - self.state_count += 1 - return self.state_count +STATE_COUNT = 0 +from lasr_skills import GoToLocation - def _handle_take_command(self, command_param: Dict): - raise NotImplementedError("Take command not implemented") - def _handle_place_command(self, command_param: Dict): - raise NotImplementedError("Place command not implemented") +""" +Helpers +""" - def _handle_deliver_command(self, command_param: Dict): - raise NotImplementedError("Deliver command not implemented") - def _handle_bring_command(self, command_param: Dict): - raise NotImplementedError("Bring command not implemented") +def increment_state_count() -> int: + global STATE_COUNT + STATE_COUNT += 1 + return STATE_COUNT - def _handle_go_command(self, command_param: Dict): - raise NotImplementedError("Go command not implemented") - def _handle_find_command(self, command_param: Dict): - raise NotImplementedError("Find command not implemented") +def get_location_room(location: str) -> str: + rooms = rospy.get_param("/gpsr/arena/rooms") + for room in rooms: + if location in rooms[room]["beacons"]: + return room + raise ValueError(f"Location {location} not found in the arena") - def _handle_talk_command(self, command_param: Dict): - if "gesture" in command_param: - if not "location" in command_param: - raise ValueError( - "Talk command with gesture but no room in command parameters" - ) - location_param_room = f"/gpsr/arena/rooms/{command_param['location']}" - self.sm.add( - f"STATE_{self._increment_state_count()}", - FindGesturePerson( - location_param=location_param_room, gesture=command_param["gesture"] - ), - transitions={ - "succeeded": f"STATE_{self.state_count + 1}", - "failed": "failed", - }, - ) - if "talk" in command_param: - self.sm.add( - f"STATE_{self._increment_state_count()}", - Talk(talk_phrase=command_param["talk"]), - transitions={ - "succeeded": f"STATE_{self.state_count + 1}", - "failed": "failed", - }, + +def get_current_pose() -> Pose: + pose = rospy.wait_for_message("/amcl_pose", PoseWithCovarianceStamped) + return pose.pose.pose + + +def get_location_pose(location: str, person: bool) -> Pose: + location_room = get_location_room(location) + if person: + location_pose: Dict = rospy.get_param( + f"/gpsr/arena/rooms/{location_room}/beacons/{location}/person_detection_pose" + ) + else: + location_pose: Dict = rospy.get_param( + f"/gpsr/arena/rooms/{location_room}/beacons/{location}/object_detection_pose" + ) + + return Pose( + position=Point(**location_pose["position"]), + orientation=Quaternion(**location_pose["orientation"]), + ) + + +def get_room_pose(room: str) -> Pose: + location_pose: Dict = rospy.get_param(f"/gpsr/arena/rooms/{room}/pose") + return Pose( + position=Point(**location_pose["position"]), + orientation=Quaternion(**location_pose["orientation"]), + ) + + +def get_person_detection_poses(room: str) -> List[Pose]: + poses = [] + for pose in rospy.get_param(f"/gpsr/arena/rooms/{room}/people_search_poses"): + poses.append( + Pose( + position=Point(**pose["position"]), + orientation=Quaternion(**pose["orientation"]), ) - else: - raise ValueError( - "Talk command received with no text or gesture in command parameters" + ) + return poses + + +def get_room_polygon(room: str) -> Polygon: + return Polygon( + [Point(**p) for p in rospy.get_param(f"/gpsr/arena/rooms/{room}/room_polygon")] + ) + + +def get_person_detection_polygon(location: str) -> Polygon: + location_room = get_location_room(location) + return Polygon( + [ + Point(**p) + for p in rospy.get_param( + f"/gpsr/arena/rooms/{location_room}/beacons/{location}/person_detection_polygon" ) + ] + ) - def _handle_answer_command(self, command_param: Dict): - raise NotImplementedError("Answer command not implemented") - - def _handle_meet_command(self, command_param: Dict): - raise NotImplementedError("Meet command not implemented") - - def _handle_tell_command(self, command_param: Dict): - raise NotImplementedError("Tell command not implemented") - - def _handle_greet_command(self, command_param: Dict): - if "name" in command_param: - location_param_room = f"/gpsr/arena/rooms/{command_param['location']}" - location_param_pose = f"{location_param_room}/pose" - self.sm.add( - f"STATE_{self._increment_state_count()}", - GoToLocation(location_param=location_param_pose), - transitions={ - "succeeded": f"STATE_{self.state_count + 1}", - "failed": "failed", - }, + +def get_object_detection_polygon(location: str) -> Polygon: + location_room = get_location_room(location) + return Polygon( + [ + Point(**p) + for p in rospy.get_param( + f"/gpsr/arena/rooms/{location_room}/beacons/{location}/object_detection_polygon" ) - self.sm.add( - f"STATE_{self._increment_state_count()}", - FindNamedPerson( - name=command_param["name"], - location_param=location_param_room, - ), - transitions={ - "succeeded": f"STATE_{self.state_count + 1}", - "failed": "failed", - }, + ] + ) + + +""" +Verbs +""" + + +def greet(command_param: Dict, sm: smach.StateMachine) -> None: + if "room" in command_param: + waypoints: List[Pose] = get_person_detection_poses(command_param["room"]) + polygon: Polygon = get_room_polygon(command_param["room"]) + elif "location" in command_param: + waypoints: List[Pose] = [get_location_pose(command_param["location"], True)] + polygon: Polygon = get_person_detection_polygon(command_param["location"]) + else: + raise ValueError( + "Greet command received with no room or location in command parameters" + ) + + if "name" in command_param: + criteria = "name" + criteria_value = command_param["name"] + elif "clothes" in command_param: + criteria = "clothes" + criteria_value = command_param["clothes"] + elif "gesture" in command_param: + criteria = "gesture" + criteria_value = command_param["gesture"] + elif "pose" in command_param: + criteria = "pose" + criteria_value = command_param["pose"] + else: + raise ValueError( + "Greet command received with no name, clothes, gesture, or pose in command parameters" + ) + + sm.add( + f"STATE_{increment_state_count()}", + FindPerson( + waypoints=waypoints, + polygon=polygon, + criteria=criteria, + criteria_value=criteria_value, + ), + transitions={ + "succeeded": f"STATE_{STATE_COUNT + 1}", + "failed": "failed", + }, + ) + + +def talk(command_param: Dict, sm: smach.StateMachine) -> None: + """ + This combines talk and tell as they use the same verb. + + Talk: + - Say a given response + - Say a given response to a person in a given gesture at a given location. + + Tell: + People + + Need additional find person skill, then handle the following commands here + + - Tell me the name of the person at a given room or location. + - Tell me the pose of the person at a given room or location. + - Tell me the gesture of the person at a given room or location. + + Objects + + This can all be a single skill that takes img_msg, runs inference, outputs object name + + - Tell me what is the biggest object at a given placement location + - Tell me what is the largest object at a given placement location + - Tell me what is the smallest object at a given placement location + - Tell me what is the heaviest object at a given placement location + - Tell me what is the lightest object at a given placement location + - Tell me what is the thinnest object at a given placement location + + - Tell me what is the biggest object in a given category at a given placement location + - Tell me what is the largest object in a given category at a given placement location + - Tell me what is the smallest object in a given category at a given placement location + - Tell me what is the heaviest object in a given category at a given placement location + - Tell me what is the lightest object in a given category at a given placement location + - Tell me what is the thinnest object at in a given category a given placement location + + + """ + if "gesture" in command_param or "pose" in command_param or "name" in command_param: + find(command_param, sm) + + # go back to the start, and report + + if "talk" in command_param: + sm.add( + f"STATE_{increment_state_count()}", + Talk(command_param["talk"]), + transitions={"succeeded": "succeeded", "failed": "failed"}, + ) + elif "object_category" in command_param: + if not "location" in command_param: + raise ValueError( + "Tell command with object but no room in command parameters" ) - elif "clothes" in command_param: - raise NotImplementedError("Greet command with clothes not implemented") + location_param_room = f"/gpsr/arena/rooms/{command_param['location']}" + sm.add( + f"STATE_{increment_state_count()}", + GoToLocation(location_param=f"{location_param_room}/pose"), + transitions={ + "succeeded": f"STATE_{STATE_COUNT + 1}", + "failed": "failed", + }, + ) + # TODO: combine the weight list within + # TODO: add speak out the result + weight_list = rospy.get_param("/Object_list/Object") + sm.add( + f"STATE_{increment_state_count()}", + ObjectComparison( + filter=command_param["object_category"], + operation_label="weight", + weight=weight_list, + ), + transitions={ + "succeeded": f"STATE_{STATE_COUNT + 1}", + "failed": "failed", + }, + ) + + sm.add( + f"STATE_{increment_state_count()}", + ObjectComparison( + filter=command_param["object_category"], + operation_label="size", # need the relation between command and operation + weight=weight_list, + ), + transitions={ + "succeeded": f"STATE_{STATE_COUNT + 1}", + "failed": "failed", + }, + ) + + elif "personinfo" in command_param: + # personinfo: pose, gesture, name + pass + + +def guide(command_param: Dict, sm: smach.StateMachine) -> None: + """ + Guide commands can be of the following form: + + - Guide a person in front of the robot to a given room. + - Guide a named person from a start location to a destination room or location. + - Guide a person in a given gesture from a given start location to a destination room or location. + - Guide a person in a given pose from a given start location to a destination room or location. + - Guide a person in a given set of clothes from a given start location to a destination room or location. + + """ + + find_person = False + if "start" not in command_param: + pass + elif "name" in command_param: + criteria = "name" + criteria_value = command_param["name"] + find_person = True + elif "clothes" in command_param: + criteria = "clothes" + criteria_value = command_param["clothes"] + find_person = True + elif "gesture" in command_param: + criteria = "gesture" + criteria_value = command_param["gesture"] + find_person = True + elif "pose" in command_param: + criteria = "pose" + criteria_value = command_param["pose"] + find_person = True + else: + location_pose = get_room_pose(command_param["room"]) + location_name = command_param["room"] + + if find_person: + start_loc = command_param["start"] + start_pose = get_location_pose(start_loc, person=True) + room = get_location_room(start_loc) + polygon: Polygon = get_room_polygon(room) + + sm.add( + f"STATE_{increment_state_count()}", + FindPerson( + waypoints=[start_pose], + polygon=polygon, + criteria=criteria, + criteria_value=criteria_value, + ), + transitions={ + "succeeded": f"STATE_{STATE_COUNT + 1}", + "failed": "failed", + }, + ) + + try: + location_pose = get_location_pose(command_param["end"], person=True) + location_name = command_param["end"] + except KeyError: + location_pose = get_room_pose(command_param["end"]) + location_name = command_param["end"] + else: + if "room" in command_param: + location_pose = get_room_pose(command_param["room"]) + location_name = command_param["room"] + elif "location" in command_param: + location_pose = get_location_pose(command_param["location"], True) + location_name = command_param["location"] else: raise ValueError( - "Greet command received with no name or clothes in command parameters" + "Guide command received with no room or location in command parameters" + ) + + sm.add( + f"STATE_{increment_state_count()}", + Guide(location_name=location_name, location_pose=location_pose), + transitions={"succeeded": "succeeded", "failed": "failed"}, + ) + + +def deliver(command_param: Dict, sm: smach.StateMachine) -> None: + """ + The possible deliver commands are: + - deliver an object the robot aleady has to me + - deliver an object the robot already has to a person in a given gesture in a given room. + - deliver an object the robot already has to a person in a given pose in a given room. + - deliver an object the robot already has to a person with a given name in a given room. + + + """ + + if "room" in command_param: + waypoints: List[Pose] = get_person_detection_poses(command_param["room"]) + polygon: Polygon = get_room_polygon(command_param["room"]) + + if "name" in command_param: + criteria = "name" + criteria_value = command_param["name"] + elif "gesture" in command_param: + criteria = "gesture" + criteria_value = command_param["gesture"] + elif "pose" in command_param: + criteria = "pose" + criteria_value = command_param["pose"] + else: + criteria = None + waypoints = [get_current_pose()] + + sm.add( + f"STATE_{increment_state_count()}", + FindPerson( + waypoints=waypoints, + polygon=polygon, + criteria=criteria, + criteria_value=criteria_value, + ), + transitions={ + "succeeded": f"STATE_{STATE_COUNT + 1}", + "failed": "failed", + }, + ) + + if "object" in command_param: + object_name = command_param["object"] + elif "object_category" in command_param: + object_name = command_param["object_category"] + else: + raise ValueError( + "Deliver command received with no object or object category in command parameters" + ) + + sm.add( + f"STATE_{increment_state_count()}", + HandoverObject(object_name=object_name), + transitions={ + "succeeded": f"STATE_{STATE_COUNT + 1}", + "failed": "failed", + }, + ) + + +def place(command_param: Dict, sm: smach.StateMachine) -> None: + + if "location" in command_param: + location_pose = get_location_pose(command_param["location"], False) + else: + raise ValueError( + "Place command received with no location in command parameters" + ) + + sm.add( + f"STATE_{increment_state_count()}", + GoToLocation(location=location_pose), + transitions={ + "succeeded": f"STATE_{STATE_COUNT + 1}", + "failed": "failed", + }, + ) + + if "object" in command_param: + object_name = command_param["object"] + elif "object_category" in command_param: + object_name = command_param["object_category"] + else: + raise ValueError( + "Place command received with no object or object category in command parameters" + ) + + sm.add( + f"STATE_{increment_state_count()}", + HandoverObject(object_name=object_name), + transitions={ + "succeeded": f"STATE_{STATE_COUNT + 1}", + "failed": "failed", + }, + ) + + sm.add( + f"STATE_{increment_state_count()}", + Say(text=f"Please place the object on the {command_param['location']}"), + transitions={ + "succeeded": f"STATE_{STATE_COUNT + 1}", + "aborted": "failed", + "preempted": "failed", + }, + ) + + +def take(command_param: Dict, sm: smach.StateMachine) -> None: + + if "object" in command_param: + criteria = "object" + criteria_value = command_param["object"] + elif "object_category" in command_param: + criteria = "object_category" + criteria_value = command_param["object_category"] + else: + raise ValueError( + "Take command received with no object or object category in command parameters" + ) + if "location" in command_param: + location_pose = get_location_pose(command_param["location"], False) + else: + raise ValueError("Take command received with no location in command parameters") + + # TODO: this should use find, to find the object at the location + sm.add( + f"STATE_{increment_state_count()}", + GoToLocation(location=location_pose), + transitions={ + "succeeded": f"STATE_{STATE_COUNT + 1}", + "failed": "failed", + }, + ) + + sm.add( + f"STATE_{increment_state_count()}", + Say( + text=f"Please pick up the {criteria_value} on the {command_param['location']} for me." + ), + transitions={ + "succeeded": f"STATE_{STATE_COUNT + 1}", + "aborted": "failed", + "preempted": "failed", + }, + ) + + sm.add( + f"STATE_{increment_state_count()}", + ReceiveObject(object_name=criteria_value), + transitions={ + "succeeded": f"STATE_{STATE_COUNT + 1}", + "failed": "failed", + }, + ) + + +def answer(command_param: Dict, sm: smach.StateMachine) -> None: + data_root = os.path.join(rospkg.RosPack().get_path("gpsr"), "data") + sm.add( + f"STATE_{increment_state_count()}", + QuestionAnswer( + index_path=os.path.join(data_root, "questions.index"), + txt_path=os.path.join(data_root, "questions.txt"), + xml_path=os.path.join(data_root, "questions.xml"), + k=1, + ), + transitions={ + "succeeded": f"STATE_{STATE_COUNT + 1}", + "failed": "failed", + }, + ) + + +def go(command_param: Dict, sm: smach.StateMachine, person: bool) -> None: + """ + The possible GO commands are: + - Go to a location or room and then... + + Person is used to denote whether the robot should go to the location's person + pose or object pose. + + """ + if "location" in command_param: + target_pose = get_location_pose(command_param["location"], person) + elif "room" in command_param: + target_pose = get_room_pose(command_param["room"]) + else: + raise ValueError( + "Go command received with no location or room in command parameters" + ) + + sm.add( + f"STATE_{increment_state_count()}", + GoToLocation(location=target_pose), + transitions={ + "succeeded": f"STATE_{STATE_COUNT + 1}", + "failed": "failed", + }, + ) + + +def bring(command_param: Dict, sm: smach.StateMachine) -> None: + raise NotImplementedError("Bring command not implemented") + + +def find(command_param: Dict, sm: smach.StateMachine) -> None: + """ + find a object in the given room + """ + + if "object_category" in command_param: + raise NotImplementedError("Find command not implemented") + if not "location" in command_param: + raise ValueError( + "find command with object but no room in command parameters" ) + location_param_room = f"/gpsr/arena/rooms/{command_param['location']}" + sm.add( + f"STATE_{increment_state_count()}", + GoFindTheObject( + location_param=location_param_room, + filter=command_param["object_category"], + ), + transitions={ + "succeeded": f"STATE_{STATE_COUNT + 1}", + "failed": "failed", + }, + ) + + elif "gesture" in command_param: + greet(command_param, sm) + + +def meet(command_param: Dict, sm: smach.StateMachine) -> None: + """ + Meet commands can be of the following form: + + - Meet a person at a location and execute another command. + - Meet a person in a room and execute another command. + - Meet a person in a rooom then find them in another room. + """ + greet(command_param, sm) - def _handle_count_command(self, command_params: Dict): - raise NotImplementedError("Count command not implemented") - - def _handle_follow_command(self, command_params: Dict): - raise NotImplementedError("Follow command not implemented") - - def _handle_guide_command(self, command_params: Dict): - raise NotImplementedError("Guide command not implemented") - - def build_state_machine(self) -> smach.StateMachine: - with self.sm: - command_verbs: List[str] = self.parsed_command["commands"] - command_params: List[Dict] = self.parsed_command["command_params"] - for command_verb, command_param in zip(command_verbs, command_params): - if command_verb == "take": - self._handle_take_command(command_param) - elif command_verb == "place": - self._handle_place_command(command_param) - elif command_verb == "deliver": - self._handle_deliver_command(command_param) - elif command_verb == "bring": - self._handle_bring_command(command_param) - elif command_verb == "go": - self._handle_go_command(command_param) - elif command_verb == "find": - self._handle_find_command(command_param) - elif command_verb == "talk": - self._handle_talk_command(command_param) - elif command_verb == "answer": - self._handle_answer_command(command_param) - elif command_verb == "meet": - self._handle_meet_command(command_param) - elif command_verb == "tell": - self._handle_tell_command(command_param) - elif command_verb == "greet": - self._handle_greet_command(command_param) - elif command_verb == "count": - self._handle_count_command(command_param) - elif command_verb == "follow": - self._handle_follow_command(command_param) - elif command_verb == "guide": - self._handle_guide_command(command_param) - else: - raise ValueError(f"Invalid command verb: {command_verb}") - - self.sm.add( - f"STATE_{self.state_count+1}", - Talk(talk_phrase="I have completed the task."), - transitions={"succeeded": "succeeded", "failed": "failed"}, + +def count(command_param: Dict, sm: smach.StateMachine) -> None: + """ + count the number of a category of objects at a given placement location + """ + + # TODO + raise NotImplementedError("Count command not implemented") + + if "object_category" in command_param: + if not "location" in command_param: + raise ValueError( + "Count command with object but no room in command parameters" ) + location_param_room = f"/gpsr/arena/rooms/{command_param['location']}" + sm.add( + f"STATE_{increment_state_count()}", + GoToLocation(location_param=f"{location_param_room}/pose"), + transitions={ + "succeeded": f"STATE_{STATE_COUNT + 1}", + "failed": "failed", + }, + ) + # TODO: combine the weight list within + weight_list = rospy.get_param("/Object_list/Object") + sm.add( + f"STATE_{increment_state_count()}", + ObjectComparison( + filter=command_param["object_category"], + operation_label="count", + weight=weight_list, + ), + transitions={ + "succeeded": f"STATE_{STATE_COUNT + 1}", + "failed": "failed", + }, + ) + + +def follow(command_param: Dict, sm: smach.StateMachine) -> None: + sm.add( + f"STATE_{increment_state_count()}", + smach_ros.SimpleActionState( + "/follow_person", + FollowAction, + goal=FollowGoal(), + ), + transitions={ + "succeeded": f"STATE_{STATE_COUNT + 1}", + "aborted": "failed", + "preempted": "failed", + }, + ) + + +""" +Build +""" + + +def build_state_machine(parsed_command: Dict) -> smach.StateMachine: + """Constructs the parameterized state machine for the GPSR task, + given the parsed command. + + Args: + parsed_command (Dict): parsed command. + + Returns: + smach.StateMachine: paramaterized state machine ready to be executed. + """ + command_verbs: List[str] = parsed_command["commands"] + command_params: List[Dict] = parsed_command["command_params"] + sm = smach.StateMachine(outcomes=["succeeded", "failed"]) + with sm: + for index, (command_verb, command_param) in enumerate( + zip(command_verbs, command_params) + ): + if command_verb == "greet": + greet(command_param, sm) + elif command_verb == "talk": + talk(command_param, sm) + elif command_verb == "guide": + guide(command_param, sm) + elif command_verb == "deliver": + deliver(command_param, sm) + elif command_verb == "place": + place(command_param, sm) + elif command_verb == "take": + take(command_param, sm) + elif command_verb == "answer": + answer(command_param, sm) + elif command_verb == "go": + person = not any( + [ + param in ["object", "object_category"] + for param in command_params[index:] + ] + ) + go(command_param, sm, person) + elif command_verb == "bring": + bring(command_param, sm) + elif command_verb == "find": + find(command_param, sm) + elif command_verb == "meet": + meet(command_param, sm) + elif command_verb == "count": + count(command_param, sm) + elif command_verb == "follow": + follow(command_param, sm) + else: + raise ValueError(f"Unrecognised command verb: {command_verb}") - return self.sm + rospy.loginfo(f"State machine: {sm}") + return sm diff --git a/tasks/gpsr/src/gpsr/states/__init__.py b/tasks/gpsr/src/gpsr/states/__init__.py index 84504f968..b0af86fd2 100644 --- a/tasks/gpsr/src/gpsr/states/__init__.py +++ b/tasks/gpsr/src/gpsr/states/__init__.py @@ -1,3 +1,6 @@ from .talk import Talk from .command_similarity_matcher import CommandSimilarityMatcher from .command_parser import ParseCommand, CommandParserStateMachine +from .question_answer import QuestionAnswer +from .go_find_the_object import GoFindTheObject +from .object_comparison import ObjectComparison diff --git a/tasks/gpsr/src/gpsr/states/command_parser.py b/tasks/gpsr/src/gpsr/states/command_parser.py index f0345bdcb..d572655bf 100644 --- a/tasks/gpsr/src/gpsr/states/command_parser.py +++ b/tasks/gpsr/src/gpsr/states/command_parser.py @@ -48,7 +48,7 @@ def __init__( Args: data_config (Configuration): Configuration object containing the regex patterns n_vecs_per_txt_file (int, optional): number of vectors in each gpsr txt - file. Defaults to 100. + file. Defaults to 1177943. total_txt_files (int, optional): total number of gpsr txt files. Defaults to 10. """ smach.StateMachine.__init__(self, outcomes=["succeeded", "failed"]) diff --git a/tasks/gpsr/src/gpsr/states/go_find_the_object.py b/tasks/gpsr/src/gpsr/states/go_find_the_object.py index 99f365db8..9256005db 100755 --- a/tasks/gpsr/src/gpsr/states/go_find_the_object.py +++ b/tasks/gpsr/src/gpsr/states/go_find_the_object.py @@ -1,11 +1,11 @@ #!/usr/bin/env python3 -from go_to_location import GoToLocation import smach -from lasr_skills import Detect3D +from lasr_skills import Detect3D, GoToLocation from shapely.geometry.polygon import Polygon from typing import List, Union from geometry_msgs.msg import Pose, Point, Quaternion from lasr_skills import Say, PlayMotion +import rospy """ location = rospy.get_param("/start") -> python dict @@ -14,7 +14,20 @@ """ -class Go_find_the_object(smach.StateMachine): +class GoFindTheObject(smach.StateMachine): + class GetLocation(smach.State): + def __init__(self): + smach.State.__init__( + self, + outcomes=["succeeded", "failed"], + input_keys=["location_index", "waypoints"], + output_keys=["location"], + ) + + def execute(self, userdata): + userdata.location = userdata.waypoints[userdata.location_index] + return "succeeded" + class check_objects(smach.State): def __init__(self): smach.State.__init__( @@ -86,146 +99,200 @@ def execute(self, userdata): def __init__( self, - area_polygon: Polygon, depth_topic: str = "/xtion/depth_registered/points", model: str = "yolov8n-seg.pt", filter: Union[List[str], None] = None, - locations: Union[List[dict], None] = None, + waypoints: Union[List[Pose], None] = None, + locations: Union[str, None] = None, confidence: float = 0.5, nms: float = 0.3, motions: Union[List[str], None] = None, ): smach.StateMachine.__init__(self, outcomes=["succeeded", "failed"]) + if waypoints is None and locations is None: + raise ValueError("Either waypoints or location_param must be provided") + + if waypoints is None: + waypoints_to_iterate: List[Pose] = [] + room = rospy.get_param(locations) + beacons = room["beacons"] + for beacon in beacons: + waypoint = Pose( + position=Point(**beacons[beacon]["near_pose"]["position"]), + orientation=Quaternion( + **beacons[beacon]["near_pose"]["orientation"] + ), + ) + waypoints_to_iterate.append(waypoint) + else: + waypoints_to_iterate: List[Pose] = waypoints + + if motions is None: + motions = ["look_down", "look_down_left", "look_down_right"] + say_object_found = Say("I found the object here!") - say_object_not_found = Say("the object is not here") + say_object_not_found = Say("The object is not here") with self: - state = 0 + waypoint_iterator = smach.Iterator( + outcomes=["succeeded", "failed"], + it=lambda: range(len(waypoints_to_iterate)), + it_label="location_index", + input_keys=["waypoints"], + output_keys=["cumulated_result"], + exhausted_outcome="failed", + ) - # Outer loop iterates over locations - for i, location in enumerate(locations): - pose = Pose( - position=Point(**location["pose"]["position"]), - orientation=Quaternion(**location["pose"]["orientation"]), + with waypoint_iterator: + container_sm = smach.StateMachine( + outcomes=["succeeded", "failed", "continue"], + input_keys=["location_index", "waypoints"], + output_keys=["cumulated_result"], ) - # Add the initial state for going to this location - smach.StateMachine.add( - f"LOCATION_{i}", - GoToLocation(pose), - transitions={ - "succeeded": f"LOOK_AROUND_{state}", - "aborted": "failed", - "preempted": "failed", - }, - ) + with container_sm: + smach.StateMachine.add( + "GET_LOCATION", + self.GetLocation(), + transitions={"succeeded": "GO_TO_LOCATION", "failed": "failed"}, + ) - # Inner loop iterates over motions - for index, direction in enumerate(motions): - print(direction) - # Add a state to perform the current motion smach.StateMachine.add( - f"LOOK_AROUND_{state}", - PlayMotion(motion_name=direction), + "GO_TO_LOCATION", + GoToLocation(), transitions={ - "succeeded": f"DETECT_OBJECTS_3D_{state}", - "aborted": f"DETECT_OBJECTS_3D_{state}", - "preempted": "failed", + "succeeded": "INNER_ITERATOR", + "failed": "failed", }, ) - # Add a state to detect 3D objects after performing the motion + inner_iterator = smach.Iterator( + outcomes=["succeeded", "failed", "continue"], + it=lambda: range(len(motions)), + it_label="motion_index", + input_keys=["waypoints", "location_index"], + output_keys=["cumulated_result"], + exhausted_outcome="succeeded", + ) + + with inner_iterator: + inner_container_sm = smach.StateMachine( + outcomes=["succeeded", "failed", "continue"], + input_keys=["motion_index", "location_index", "waypoints"], + output_keys=["cumulated_result"], + ) + + with inner_container_sm: + smach.StateMachine.add( + "LOOK_AROUND", + PlayMotion(), + transitions={ + "succeeded": "DETECT_OBJECTS_3D", + "aborted": "DETECT_OBJECTS_3D", + "preempted": "failed", + }, + ) + + smach.StateMachine.add( + "DETECT_OBJECTS_3D", + Detect3D( + depth_topic=depth_topic, + model=model, + filter=filter, + confidence=confidence, + nms=nms, + ), + transitions={ + "succeeded": "RESULT", + "failed": "failed", + }, + ) + + smach.StateMachine.add( + "RESULT", + self.check_objects(), + transitions={ + "succeeded": "SAVE_RESULT", + "failed": "failed", + }, + ) + + smach.StateMachine.add( + "SAVE_RESULT", + self.cumulate_result(), + transitions={ + "succeeded": "continue", + "failed": "failed", + }, + ) + + inner_iterator.set_contained_state( + "INNER_CONTAINER_STATE", + inner_container_sm, + loop_outcomes=["continue"], + ) + smach.StateMachine.add( - f"DETECT_OBJECTS_3D_{state}", - Detect3D( - depth_topic=depth_topic, - model=model, - filter=filter, - confidence=confidence, - nms=nms, - ), + "INNER_ITERATOR", + inner_iterator, + {"succeeded": "CHECK_RESULT", "failed": "failed"}, + ) + + smach.StateMachine.add( + "CHECK_RESULT", + self.detection_result(), transitions={ - "succeeded": f"RESULT_{state}", + "object_found": "SAY_OBJECT_FOUND", + "object_not_found": "SAY_OBJECT_NOT_FOUND", "failed": "failed", }, ) - # Add a state to check objects smach.StateMachine.add( - f"RESULT_{state}", - self.check_objects(), + "SAY_OBJECT_FOUND", + say_object_found, transitions={ - "succeeded": f"SAVE_RESULT_{state}", - "failed": "failed", + "succeeded": "succeeded", + "aborted": "failed", + "preempted": "failed", }, ) + smach.StateMachine.add( - f"SAVE_RESULT_{state}", - self.cumulate_result(), + "SAY_OBJECT_NOT_FOUND", + say_object_not_found, transitions={ - "succeeded": ( - f"LOOK_AROUND_{state+1}" - if index != (len(motions) - 1) - else f"CHECK_RESULT_{i}" - ), - "failed": "failed", + "succeeded": "continue", + "aborted": "failed", + "preempted": "failed", }, ) - state += 1 - - smach.StateMachine.add( - f"CHECK_RESULT_{i}", - self.detection_result(), - transitions={ - "object_found": f"SAY_OBJECT_FOUND_{i}", - "object_not_found": f"SAY_OBJECT_NOT_FOUND_{i}", - "failed": "failed", - }, - ) - # Add states to handle object found or not found results - smach.StateMachine.add( - f"SAY_OBJECT_FOUND_{i}", - say_object_found, - transitions={ - "succeeded": ( - f"LOCATION_{i+1}" - if i != len(locations) - 1 - else "succeeded" - ), - "aborted": "failed", - "preempted": "failed", - }, - ) - smach.StateMachine.add( - f"SAY_OBJECT_NOT_FOUND_{i}", - say_object_not_found, - transitions={ - "succeeded": ( - f"LOCATION_{i+1}" - if i != len(locations) - 1 - else "succeeded" - ), - "aborted": "failed", - "preempted": "failed", - }, + + waypoint_iterator.set_contained_state( + "CONTAINER_STATE", container_sm, loop_outcomes=["continue"] ) + smach.StateMachine.add( + "WAYPOINT_ITERATOR", + waypoint_iterator, + {"succeeded": "succeeded", "failed": "failed"}, + ) + + +# if __name__ == "__main__": +# import rospy +# from sensor_msgs.msg import PointCloud2 -if __name__ == "__main__": - import rospy - from sensor_msgs.msg import PointCloud2 - - location_list = list() - for i in range(3): - location = rospy.get_param(f"/LOCATION_{i}") - location_list.append(location) - motion = ["look_down", "look_down_left", "look_down_right"] - rospy.init_node("test_find_object") - sm = Go_find_the_object( - Polygon(), filter=["cup"], locations=location_list, motions=motion - ) - sm.userdata.pcl_msg = rospy.wait_for_message( - "/xtion/depth_registered/points", PointCloud2 - ) - sm.execute() +# location_list = list() +# for i in range(3): +# location = rospy.get_param(f"/LOCATION_{i}") +# location_list.append(location) +# rospy.init_node("test_find_object") +# sm = GoFindTheObject( +# Polygon(), filter=["cup"], locations=location_list, motions=motion +# ) +# sm.userdata.pcl_msg = rospy.wait_for_message( +# "/xtion/depth_registered/points", PointCloud2 +# ) +# sm.execute() diff --git a/tasks/gpsr/src/gpsr/states/object_comparison.py b/tasks/gpsr/src/gpsr/states/object_comparison.py index 579c34a16..e169bfb6c 100755 --- a/tasks/gpsr/src/gpsr/states/object_comparison.py +++ b/tasks/gpsr/src/gpsr/states/object_comparison.py @@ -1,10 +1,11 @@ #!/usr/bin/env python3 import smach -from lasr_skills import Detect3DInArea +from lasr_skills import Detect3DInArea, Say from lasr_skills import Detect3D from lasr_skills import Detect from shapely.geometry.polygon import Polygon from typing import List, Union, Dict +import rospy class ObjectComparison(smach.StateMachine): @@ -18,34 +19,21 @@ def __init__(self, area_polygon: Polygon): ) def count_types(self, detections): - """ - Count the number of different object types in detections. - - :param detections: List of detection tuples (object_type, position) - :return: Dictionary with object types as keys and counts as values - """ object_counts = {} for detection in detections: object_type = detection.name - - if len(object_counts.keys()) == 0: - object_counts[object_type] = 1 + if object_type in object_counts: + object_counts[object_type] += 1 else: - if object_type in object_counts.keys(): - object_counts[object_type] += 1 - else: - object_counts[object_type] = 1 + object_counts[object_type] = 1 return object_counts def execute(self, userdata): - print(userdata.detections_3d) filtered_detections = userdata.detections_3d rospy.loginfo(filtered_detections) - rospy.loginfo(type(filtered_detections.detected_objects[0])) object_counts = self.count_types(filtered_detections.detected_objects) - print(object_counts) userdata.object_dict = object_counts - userdata.detections_types = object_counts.keys() + userdata.detections_types = list(object_counts.keys()) return "succeeded" class CountCategory(smach.State): @@ -72,10 +60,8 @@ def execute(self, userdata): category_counts = { key: value for key, value in counts.items() if value != 0 } - print("finished counting") - print(category_counts) userdata.category_dict = category_counts - userdata.detections_categories = category_counts.keys() + userdata.detections_categories = list(category_counts.keys()) return "succeeded" class ObjectWeight(smach.State): @@ -89,19 +75,12 @@ def __init__(self, object_weight: Union[List[dict], None] = None): self.object_weight = object_weight def get_weight(self, detections, average_weights): - """ - rank the weight of a list of objects. - - :param detections: List of detection tuples (object_type, position) - :return: a list of the weight rank of object in a category(max->min) - """ weight_dict = {} for category, items in average_weights.items(): for i in detections: if i in items: weight = items[i] weight_dict[i] = weight - return weight_dict def execute(self, userdata): @@ -112,7 +91,6 @@ def execute(self, userdata): weights_dict.items(), key=lambda item: item[1], reverse=True ) userdata.sorted_weights = sorted_weights - print(sorted_weights) return "succeeded" class ObjectSize(smach.State): @@ -125,12 +103,6 @@ def __init__(self): ) def property_size_calculation(self, detections, result): - """ - calculate the size of a list of objects using bounding box. - :param detections: List of detection tuples (object_type, position) - :return: a list of the size of each object in a category - """ - area = dict() for i in detections: for object in result.detected_objects: @@ -139,7 +111,7 @@ def property_size_calculation(self, detections, result): return area def execute(self, userdata): - detections_types = userdata.object_dict.keys() + detections_types = list(userdata.object_dict.keys()) area_dict = self.property_size_calculation( detections_types, userdata.detections_3d ) @@ -147,25 +119,79 @@ def execute(self, userdata): area_dict.items(), key=lambda item: item[1], reverse=True ) userdata.sorted_size = sorted_size - print(sorted_size) return "succeeded" + class DecideOperation(smach.State): + def __init__(self): + smach.State.__init__( + self, + outcomes=["do_count", "do_weight", "do_size", "failed"], + input_keys=["operation_label"], + ) + + def execute(self, userdata): + if userdata.operation_label == "count": + return "do_count" + elif userdata.operation_label == "weight": + return "do_weight" + elif userdata.operation_label == "size": + return "do_size" + else: + return "failed" + + class SayResult(smach.State): + def __init__(self): + smach.State.__init__( + self, + outcomes=["succeeded", "failed"], + input_keys=[ + "operation_label", + "detections_types", + "detections_categories", + "sorted_size", + "sorted_weights", + ], + output_keys=["say_text"], + ) + + def execute(self, userdata): + try: + if userdata.operation_label == "count": + object_count = len(userdata.detections_categories) + userdata.say_text = f"There are {object_count} objects" + elif userdata.operation_label == "weight": + heaviest_object = userdata.sorted_weights[0][0] + userdata.say_text = f"The heaviest object is {heaviest_object}" + elif userdata.operation_label == "size": + biggest_object = userdata.sorted_size[0][0] + userdata.say_text = f"The biggest object is {biggest_object}" + else: + return "failed" + return "succeeded" + except Exception as e: + rospy.logerr(str(e)) + return "failed" + def __init__( self, area_polygon: Polygon, + operation_label: str, depth_topic: str = "/xtion/depth_registered/points", model: str = "yolov8n-seg.pt", filter: Union[List[str], None] = None, confidence: float = 0.5, nms: float = 0.3, - object: Union[List[dict], None] = None, + object_weight: Union[List[dict], None] = None, ): smach.StateMachine.__init__( self, outcomes=["succeeded", "failed"], - output_keys=["detections_3d", "object_dict"], + output_keys=["detections_3d", "object_dict", "say_text"], ) + # Set the operation label in the userdata + self.userdata.operation_label = operation_label + with self: smach.StateMachine.add( "DETECT_OBJECTS_3D", @@ -176,7 +202,18 @@ def __init__( confidence=confidence, nms=nms, ), - transitions={"succeeded": "COUNTOBJECTS", "failed": "failed"}, + transitions={"succeeded": "DECIDE_OPERATION", "failed": "failed"}, + ) + + smach.StateMachine.add( + "DECIDE_OPERATION", + self.DecideOperation(), + transitions={ + "do_count": "COUNTOBJECTS", + "do_weight": "GETWEIGHT", + "do_size": "GETSIZE", + "failed": "failed", + }, ) smach.StateMachine.add( @@ -184,33 +221,49 @@ def __init__( self.CountObjectTypes(area_polygon), transitions={"succeeded": "COUNTCATEGORY", "failed": "failed"}, ) + smach.StateMachine.add( "COUNTCATEGORY", - self.CountCategory(object_weight=object), - transitions={"succeeded": "GETWEIGHT", "failed": "failed"}, + self.CountCategory(object_weight=object_weight), + transitions={"succeeded": "SAY_RESULT", "failed": "failed"}, ) + smach.StateMachine.add( "GETWEIGHT", - self.ObjectWeight(object_weight=object), - transitions={"succeeded": "GETSIZE", "failed": "failed"}, + self.ObjectWeight(object_weight=object_weight), + transitions={"succeeded": "SAY_RESULT", "failed": "failed"}, ) + smach.StateMachine.add( "GETSIZE", self.ObjectSize(), + transitions={"succeeded": "SAY_RESULT", "failed": "failed"}, + ) + + smach.StateMachine.add( + "SAY_RESULT", + self.SayResult(), + transitions={"succeeded": "SAY", "failed": "failed"}, + ) + + smach.StateMachine.add( + "SAY", + Say(), transitions={"succeeded": "succeeded", "failed": "failed"}, + remapping={"text": "say_text"}, ) -if __name__ == "__main__": - import rospy - from sensor_msgs.msg import PointCloud2 +# if __name__ == "__main__": +# import rospy +# from sensor_msgs.msg import PointCloud2 - rospy.init_node("test_object_comparison") - weight = rospy.get_param("/Object_list/Object") +# rospy.init_node("test_object_comparison") +# weight = rospy.get_param("/Object_list/Object") - polygon = Polygon([[-1, 0], [1, 0], [0, 1], [1, 1]]) - sm = ObjectComparison(Polygon(), filter=["bottle", "cup", "cola"], object=weight) - sm.userdata.pcl_msg = rospy.wait_for_message( - "/xtion/depth_registered/points", PointCloud2 - ) - sm.execute() +# polygon = Polygon([[-1, 0], [1, 0], [0, 1], [1, 1]]) +# sm = ObjectComparison(Polygon(), filter=["bottle", "cup", "cola"], object=weight) +# sm.userdata.pcl_msg = rospy.wait_for_message( +# "/xtion/depth_registered/points", PointCloud2 +# ) +# sm.execute() diff --git a/tasks/gpsr/src/gpsr/states/question_answer.py b/tasks/gpsr/src/gpsr/states/question_answer.py new file mode 100644 index 000000000..c6bcb19df --- /dev/null +++ b/tasks/gpsr/src/gpsr/states/question_answer.py @@ -0,0 +1,50 @@ +import smach + +from lasr_skills import AskAndListen, XmlQuestionAnswer, Say + + +class QuestionAnswer(smach.StateMachine): + + def __init__(self, index_path: str, txt_path: str, xml_path: str, k: int = 1): + smach.StateMachine.__init__( + self, + outcomes=["succeeded", "failed"], + input_keys=["query_sentence"], + output_keys=["closest_answers"], + ) + + with self: + + smach.StateMachine.add( + "GET_QUESTION", + AskAndListen( + tts_phrase="I hear you have a question for me. What is it?" + ), + transitions={ + "succeeded": "XML_QUESTION_ANSWER", + "failed": "failed", + }, + ) + + smach.StateMachine.add( + "XML_QUESTION_ANSWER", + XmlQuestionAnswer(index_path, txt_path, xml_path, k), + transitions={ + "succeeded": "SAY_ANSWER", + "failed": "failed", + }, + remapping={ + "query_sentence": "transcribed_speech", + }, + ) + + smach.StateMachine.add( + "SAY_ANSWER", + Say(format_str="The answer to your question is: {}"), + transitions={ + "succeeded": "succeeded", + "aborted": "failed", + "preempted": "failed", + }, + remapping={"placeholders": "closest_answers"}, + ) diff --git a/tasks/receptionist/src/receptionist/states/get_name_and_drink.py b/tasks/receptionist/src/receptionist/states/get_name_and_drink.py index 4afb63975..b20bd862e 100644 --- a/tasks/receptionist/src/receptionist/states/get_name_and_drink.py +++ b/tasks/receptionist/src/receptionist/states/get_name_and_drink.py @@ -84,7 +84,6 @@ def __init__( guest_id: str, param_key: str = "/receptionist/priors", ): - smach.State.__init__( self, outcomes=["succeeded", "failed", "failed_name", "failed_drink"], diff --git a/tasks/receptionist/src/receptionist/states/local_speech_recognition.py b/tasks/receptionist/src/receptionist/states/local_speech_recognition.py index 867dba845..5f79f4d83 100644 --- a/tasks/receptionist/src/receptionist/states/local_speech_recognition.py +++ b/tasks/receptionist/src/receptionist/states/local_speech_recognition.py @@ -86,7 +86,6 @@ def handle_drink(sentence_list, last_resort): else: result = handle_similar_sound(sentence_list, available_drinks, 0) print(f"drink (sound): {result}") - if result != "unknown": if result in available_single_drinks: print(f"final attempt drink: {result}")