diff --git a/tasks/gpsr/config/adjust_height.yaml b/tasks/gpsr/config/adjust_height.yaml new file mode 100755 index 000000000..d9d33a355 --- /dev/null +++ b/tasks/gpsr/config/adjust_height.yaml @@ -0,0 +1,38 @@ +play_motion: + motions: + higher_3: + joints: [torso_lift_joint] + points: + - positions: [0.30] + time_from_start: 0.0 + higher_2: + joints: [torso_lift_joint] + points: + - positions: [0.26] + time_from_start: 0.0 + higher_1: + joints: [torso_lift_joint] + points: + - positions: [0.24] + time_from_start: 0.0 + middle: + joints: [torso_lift_joint] + points: + - positions: [0.22] + time_from_start: 0.0 + lower_1: + joints: [torso_lift_joint] + points: + - positions: [0.20] + time_from_start: 0.0 + lower_2: + joints: [torso_lift_joint] + points: + - positions: [0.18] + time_from_start: 0.0 + lower_3: + joints: [torso_lift_joint] + points: + - positions: [0.16] + time_from_start: 0.0 + \ No newline at end of file diff --git a/tasks/gpsr/config/find_object.yaml b/tasks/gpsr/config/find_object.yaml new file mode 100755 index 000000000..9a825d450 --- /dev/null +++ b/tasks/gpsr/config/find_object.yaml @@ -0,0 +1,23 @@ +LOCATION_0: + pose: + position: {x: 0.8552300795613897, y: 1.429070868095394, z: 0.0} + orientation: {x: 0.0, y: 0.0, z: 0.9133881598650153, w: 0.40708975597330027} + + +LOCATION_1: + pose: + position: {x: 2.9409194762572146,y: -3.712802433807799,z: 0.0} + orientation: {x: 0.0, y: 0.0, z: -0.611918859029574, w: 0.7909205459235108} + + + +LOCATION_2: + pose: + position: {x: 0.8491182859434898, y: -3.0076084920640334, z: 0.0} + orientation: { x: 0.0, y: 0.0, z: -0.6636264920996592, w: 0.7480640874841546} + +LOCATION_3: + pose: + position: {x: 0.8479247023925237, y: 1.432409182340313, z: 0.0} + orientation: {x: 0.0, y: 0.0, z: 0.8476927158365988, w: 0.530487567731395} + diff --git a/tasks/gpsr/config/guiding_people.yaml b/tasks/gpsr/config/guiding_people.yaml new file mode 100755 index 000000000..59137f006 --- /dev/null +++ b/tasks/gpsr/config/guiding_people.yaml @@ -0,0 +1,40 @@ +Start: + pose: + position: {x: 4.229935896326249, y: 5.989767766362657, z: 0.0} + orientation: {x: 0.0, y: 0.0, z: 0.8755557432741014, w: 0.4831171083906427} + + +WaitPoint1: + pose: + position: {x: 2.3428986531653817, y: 3.3629245249591597, z: 0.0} + orientation: {x: 0.0, y: 0.0, z: -0.5548983151957669, w: 0.8319181809486431} + +TurnAround1: + pose: + position: {x: 2.3362042642374203,y: 3.2752834061488105,z: 0.0} + orientation: {x: 0.0, y: 0.0, z: 0.6376807566089671, w: 0.7703007546735335} + + +WaitPoint2: + pose: + position: {x: 2.17519261886225, y: -0.36151674936209355, z: 0.0} + orientation: { x: 0.0, y: 0.0, z: -0.7375323735298429, w: 0.6753117783627324} + + +TurnAround2: + pose: + position: {x: 2.157761847255999,y: -0.462103856858122,z: 0.0} + orientation: {x: 0.0, y: 0.0, z: 0.8652281383725151, w: 0.5013783686682463} + + +Destination: + pose: + position: {x: 2.5710323093426086, y: -2.9194003635449, z: 0.0} + orientation: {x: 0.0, y: 0.0, z: -0.7864753977372808, w: 0.6176216064500868} + + +TurnAroundDestination: + pose: + position: {x: 2.546711405659906, y: -3.012970231861997, z: 0.0} + orientation: {x: 0.0, y: 0.0, z: 0.7369260641412018, w: 0.6759733544965788} + diff --git a/tasks/gpsr/config/object_weight.yaml b/tasks/gpsr/config/object_weight.yaml new file mode 100755 index 000000000..391ce9663 --- /dev/null +++ b/tasks/gpsr/config/object_weight.yaml @@ -0,0 +1,55 @@ +Object_list: + Object: + Cleaning_supplies: + sponge: 50 + clanser: 500 + + Toys: + tennis_ball: 58 + soccer_ball: 430 + dice: 8 + baseball: 145 + rubiks_cube: 98 + + Dishes: + fork: 30 + knife: 80 + spoon: 50 + plate: 500 + bowl: 300 + cup: 250 + bottle: 300 + + Drinks: + red_wine: 1200 + cola: 380 + juice_pack: 200 + tropical_juice: 1200 + milk: 1000 + iced_tea: 340 + orange_juice: 1200 + + Food: + cereal: 500 + tuna: 165 + spam: 340 + mustard: 250 + strawberry_jello: 85 + chocolate_jello: 85 + coffee_grounds: 250 + sugar: 1000 + + Fruits: + apple: 150 + orange: 200 + pear: 180 + banana: 120 + strawberry: 10 + lemon: 110 + peach: 150 + plum: 80 + + Snacks: + pringles: 150 + cornflakes: 500 + cheezit: 200 \ No newline at end of file diff --git a/tasks/gpsr/config/play_motion.yaml b/tasks/gpsr/config/play_motion.yaml new file mode 100755 index 000000000..7db1cdc58 --- /dev/null +++ b/tasks/gpsr/config/play_motion.yaml @@ -0,0 +1,45 @@ +play_motion: + motions: + look_down: + joints: [head_1_joint, head_2_joint] + points: + - positions: [0.0, -0.2] + time_from_start: 0.0 + # look_left: + # joints: [head_1_joint, head_2_joint] + # points: + # - positions: [0.5, -0.25] + # time_from_start: 0.0 + look_down_left: + joints: [head_1_joint, head_2_joint] + points: + - positions: [0.5, -0.55] + time_from_start: 0.0 + # look_right: + # joints: [head_1_joint, head_2_joint] + # points: + # - positions: [-0.5, -0.25] + # time_from_start: 0.0 + look_down_right: + joints: [head_1_joint, head_2_joint] + points: + - positions: [-0.5, -0.55] + time_from_start: 0.0 + + look_center: + joints: [head_1_joint, head_2_joint] + points: + - positions: [0.0, 0.0] + time_from_start: 0.0 + + look_left: + joints: [head_1_joint, head_2_joint] + points: + - positions: [0.5, 0.0] + time_from_start: 0.0 + + look_right: + joints: [head_1_joint, head_2_joint] + points: + - positions: [-0.5, 0.0] + time_from_start: 0.0 \ No newline at end of file diff --git a/tasks/gpsr/src/gpsr/states/adjust_base.py b/tasks/gpsr/src/gpsr/states/adjust_base.py new file mode 100755 index 000000000..f613dd7cc --- /dev/null +++ b/tasks/gpsr/src/gpsr/states/adjust_base.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python3 +import rospy +import smach +from geometry_msgs.msg import Twist +from lasr_skills.vision import GetImage +from lasr_vision_msgs.srv import BodyPixDetection, BodyPixDetectionRequest +from lasr_vision_msgs.msg import BodyPixMaskRequest +from time import sleep + + +class Rotate(smach.State): + def __init__(self, angular_speed, duration=1): + smach.State.__init__(self, outcomes=["succeeded", "failed"]) + self.angular_speed = angular_speed + self.duration = duration # Duration for which to rotate + + def execute(self, userdata): + rospy.loginfo("Rotating") + pub = rospy.Publisher( + "/mobile_base_controller/cmd_vel", Twist, queue_size=10, latch=True + ) + + # Construct the twist message with the desired angular speed + twist = Twist() + twist.angular.z = self.angular_speed + + try: + # Rotate for the specified duration + start_time = rospy.Time.now() + while (rospy.Time.now() - start_time).to_sec() < self.duration: + pub.publish(twist) + sleep(0.1) # Publish at 10 Hz + + # Stop the robot after rotating + twist.angular.z = 0 + pub.publish(twist) + return "succeeded" + except rospy.ROSInterruptException: + rospy.loginfo("ROS Interrupt Exception") + return "failed" + + +class DetectHipCenter(smach.State): + """ + State for detecting gestures. + """ + + def __init__(self, gesture_to_detect: str = "raising_left_hand"): + self.gesture_to_detect = gesture_to_detect + smach.State.__init__( + self, + outcomes=["move_left", "move_right", "stay_same", "failed"], + input_keys=["img_msg"], + ) + + self.body_pix_client = rospy.ServiceProxy("/bodypix/detect", BodyPixDetection) + + def execute(self, userdata): + + body_pix_masks = BodyPixMaskRequest() + body_pix_masks.parts = ["left_hip", "right_hip"] + masks = [body_pix_masks] + + req = BodyPixDetectionRequest() + req.image_raw = userdata.img_msg + req.masks = masks + req.dataset = "resnet50" + req.confidence = 0.7 + + try: + res = self.body_pix_client(req) + except Exception as e: + print(e) + return "failed" + + part_info = {} + poses = res.poses + + if not poses: + rospy.logerr("No poses available in the response.") + return "failed" + + for pose in poses: + for keypoint in pose.keypoints: + if pose is None or pose.keypoints is None: + rospy.logerr("Pose or keypoints data is missing.") + return "failed" # Skip this iteration if data is missing + else: + part_info[keypoint.part] = { + "x": keypoint.xy[0], + "y": keypoint.xy[1], + "score": keypoint.score, + } + + print(part_info["leftHip"]["x"]) + print(part_info["rightHip"]["x"]) + if (part_info["leftHip"]["x"] + part_info["rightHip"]["x"]) / 2 < 300: + return "move_left" + elif (part_info["leftHip"]["x"] + part_info["rightHip"]["x"]) / 2 > 340: + return "move_right" + elif 300 < (part_info["leftHip"]["x"] + part_info["rightHip"]["x"]) / 2 < 340: + return "stay_same" + else: + return "failed" + + +class AdjustBase(smach.StateMachine): + def __init__(self): + smach.StateMachine.__init__(self, outcomes=["succeeded", "failed"]) + with self: + smach.StateMachine.add( + "GET_IMAGE", + GetImage(), + transitions={"succeeded": "BODY_PIX_DETECTION", "failed": "failed"}, + ) + + smach.StateMachine.add( + "BODY_PIX_DETECTION", + DetectHipCenter(), + transitions={ + "move_left": "ADJUST_BASE_LEFT", + "move_right": "ADJUST_BASE_RIGHT", + "stay_same": "succeeded", + "failed": "GET_IMAGE", + }, + ) + + smach.StateMachine.add( + "ADJUST_BASE_LEFT", + Rotate(0.1), + transitions={"succeeded": "GET_IMAGE", "failed": "failed"}, + ) + + smach.StateMachine.add( + "ADJUST_BASE_RIGHT", + Rotate(-0.1), + transitions={"succeeded": "GET_IMAGE", "failed": "failed"}, + ) + + +if __name__ == "__main__": + rospy.init_node("adjust_base") + while not rospy.is_shutdown(): + sm = AdjustBase() + sm.execute() + rospy.spin() diff --git a/tasks/gpsr/src/gpsr/states/adjust_gesture.py b/tasks/gpsr/src/gpsr/states/adjust_gesture.py new file mode 100755 index 000000000..cb453e193 --- /dev/null +++ b/tasks/gpsr/src/gpsr/states/adjust_gesture.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 +import rospy +import smach +from adjust_base import AdjustBase +from adjust_torso import TorsoHeight +from adjust_head import AdjustHeadTilt +from lasr_vision_msgs.srv import BodyPixDetection, BodyPixDetectionRequest +from lasr_vision_msgs.msg import BodyPixMaskRequest +from lasr_skills.vision import GetImage + + +class DetectNose(smach.State): + """ + State for detecting gestures. + # adjust base + # detect nose --> if it in the image--> head up + # --> if not adjust the torso until we can see the nose --> adjust head + """ + + def __init__(self): + smach.State.__init__( + self, + outcomes=["move_up", "move_down", "stay_same", "no_person", "no_nose"], + input_keys=["img_msg"], + ) + + self.body_pix_client = rospy.ServiceProxy("/bodypix/detect", BodyPixDetection) + + def execute(self, userdata): + + body_pix_masks = BodyPixMaskRequest() + body_pix_masks.parts = ["nose"] + masks = [body_pix_masks] + + req = BodyPixDetectionRequest() + req.image_raw = userdata.img_msg + req.masks = masks + req.dataset = "resnet50" + req.confidence = 0.7 + + try: + res = self.body_pix_client(req) + except Exception as e: + print(e) + return "failed" + + part_info = {} + poses = res.poses + if not poses: + rospy.logerr("No poses available in the response. No person detected.") + return "no_person" + + for pose in poses: + for keypoint in pose.keypoints: + if pose is None or pose.keypoints is None: + rospy.logerr("No nose available in the response.") + return "no_nose" # Skip this iteration if data is missing + else: + part_info[keypoint.part] = { + "x": keypoint.xy[0], + "y": keypoint.xy[1], + "score": keypoint.score, + } + + # img (640 480) origin of coordinate: upperleft + + if (part_info["nose"]["x"]) < 220: + return "move_down" + elif (part_info["nose"]["x"]) > 260: + return "move_up" + else: + return "stay_same" + + +class AdjustGesture(smach.StateMachine): + def __init__(self): + smach.StateMachine.__init__(self, outcomes=["succeeded", "failed"]) + with self: + smach.StateMachine.add( + "ADJUST_BASE", + AdjustBase(), + transitions={"succeeded": "GET_IMAGE", "failed": "failed"}, + ) + + smach.StateMachine.add( + "GET_IMAGE", + GetImage(), + transitions={"succeeded": "DETECT_NOSE", "failed": "failed"}, + ) + + smach.StateMachine.add( + "DETECT_NOSE", + DetectNose(), + transitions={ + "move_up": "HEAD_MOVE_UP", + "move_down": "HEAD_MOVE_DOWN", + "stay_same": "suceeded", + "no_nose": "ADJUST_TORSO", + "no_person": "GET_IMAGE", + }, + ) + + smach.StateMachine.add( + "HEAD_MOVE_UP", + AdjustHeadTilt(0.1, 1), + transitions={"succeeded": "GET_IMAGE", "failed": "failed"}, + ) + + smach.StateMachine.add( + "HEAD_MOVE_DOWN", + AdjustHeadTilt(0.1, 1), + transitions={"succeeded": "GET_IMAGE", "failed": "failed"}, + ) + + smach.StateMachine.add( + "ADJUST_TORSO", + TorsoHeight(-0.1, 1), + transitions={"succeeded": "GET_IMAGE", "failed": "failed"}, + ) + + +if __name__ == "__main__": + rospy.init_node("adjust_gesture") + while not rospy.is_shutdown(): + sm = AdjustGesture() + sm.execute() + rospy.spin() diff --git a/tasks/gpsr/src/gpsr/states/adjust_head.py b/tasks/gpsr/src/gpsr/states/adjust_head.py new file mode 100755 index 000000000..5f55917b9 --- /dev/null +++ b/tasks/gpsr/src/gpsr/states/adjust_head.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 +import rospy +import smach +from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint + + +class AdjustHeadTilt(smach.State): + def __init__(self, tilt, duration): + smach.State.__init__(self, outcomes=["succeeded", "failed"]) + self.tilt = tilt + self.duration = duration + self.pub = rospy.Publisher( + "/head_controller/command", JointTrajectory, queue_size=10 + ) + + def execute(self): + if not rospy.is_shutdown(): + rospy.loginfo("Adjusting head tilt to %s radians", self.tilt) + trajectory = JointTrajectory() + trajectory.header.stamp = rospy.Time.now() + trajectory.joint_names = ["head_2_joint"] + + point = JointTrajectoryPoint() + point.positions = [self.tilt] + point.time_from_start = rospy.Duration(self.duration) + trajectory.points = [point] + + start_time = rospy.Time.now() + rate = rospy.Rate(50) # Use a higher frequency for trajectory commands + + while (rospy.Time.now() - start_time).to_sec() < self.duration: + if rospy.is_shutdown(): + rospy.loginfo("ROS node shutdown detected") + return "failed" + self.pub.publish(trajectory) + rate.sleep() + + return "succeeded" + else: + rospy.loginfo("ROS node shutdown detected") + return "failed" + + +if __name__ == "__main__": + rospy.init_node("adjust_head") + # heads up--> 0.1 + # heads down --> -0,1 + sm = AdjustHeadTilt(0.1, 1) + outcome = sm.execute() + rospy.spin() diff --git a/tasks/gpsr/src/gpsr/states/adjust_torso.py b/tasks/gpsr/src/gpsr/states/adjust_torso.py new file mode 100755 index 000000000..3489a1157 --- /dev/null +++ b/tasks/gpsr/src/gpsr/states/adjust_torso.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python3 +import rospy +import smach +from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint +from lasr_skills.vision import GetImage + + +class TorsoHeight(smach.State): + def __init__(self, target_height, duration): + smach.State.__init__(self, outcomes=["succeeded", "failed"]) + self.target_height = target_height + self.duration = duration + + def execute(self, userdata): + rospy.loginfo("Adjusting torso height to %s meters", self.target_height) + pub = rospy.Publisher( + "/torso_controller/command", JointTrajectory, queue_size=10, latch=True + ) + trajectory = JointTrajectory() + trajectory.header.stamp = rospy.Time.now() + trajectory.joint_names = ["torso_lift_joint"] + + point = JointTrajectoryPoint() + point.positions = [self.target_height] + # point.time_from_start.nsecs = 1000000 + trajectory.points = [point] + + try: + # Rotate for the specified duration + start_time = rospy.Time.now() + while (rospy.Time.now() - start_time).to_sec() < self.duration: + # Publish the command + print(trajectory) + pub.publish(trajectory) + rospy.sleep(0.1) + return ( + "succeeded" # Wait for the duration of the movement plus a little extra + ) + except rospy.ROSInterruptException: + rospy.loginfo("ROS Interrupt Exception") + return "failed" + + +if __name__ == "__main__": + rospy.init_node("adjust_torso") + while not rospy.is_shutdown(): + sm = TorsoHeight(-0.1, 1) + sm.execute() + rospy.spin() diff --git a/tasks/gpsr/src/gpsr/states/detect_corner.py b/tasks/gpsr/src/gpsr/states/detect_corner.py new file mode 100644 index 000000000..607cc4061 --- /dev/null +++ b/tasks/gpsr/src/gpsr/states/detect_corner.py @@ -0,0 +1,110 @@ +import smach +import rospy +from nav_msgs.msg import Odometry +from sensor_msgs.msg import Imu +import math + + +class DetectTurn(smach.State): + """ + State for detecting Tiago is turning around <-- for guiding people. + """ + + def __init__(self): + smach.State.__init__(self, outcomes=["turn_detected", "no_turn"]) + self.sub_odom = rospy.Subscriber("/odom", Odometry, self.odom_callback) + self.sub_imu = rospy.Subscriber("/imu", Imu, self.imu_callback) + self.prev_orientation = None + self.turn_detected = False + self.turn_threshold = math.radians(45) # 45 degrees threshold + + def odom_callback(self, msg): + orientation_q = msg.pose.pose.orientation + (roll, pitch, yaw) = self.quaternion_to_euler( + orientation_q.x, orientation_q.y, orientation_q.z, orientation_q.w + ) + + if self.prev_orientation is not None: + yaw_diff = self.angle_diff(yaw, self.prev_orientation) + if abs(yaw_diff) > self.turn_threshold: + rospy.loginfo( + "Turn detected: {:.2f} degrees".format(math.degrees(yaw_diff)) + ) + self.turn_detected = True + + self.prev_orientation = yaw + + def imu_callback(self, msg): + angular_velocity_z = msg.angular_velocity.z + if abs(angular_velocity_z) > self.turn_threshold: + rospy.loginfo( + "Turn detected from IMU: {:.2f} degrees/sec".format( + math.degrees(angular_velocity_z) + ) + ) + self.turn_detected = True + + def quaternion_to_euler(self, x, y, z, w): + t0 = +2.0 * (w * x + y * z) + t1 = +1.0 - 2.0 * (x * x + y * y) + roll = math.atan2(t0, t1) + + t2 = +2.0 * (w * y - z * x) + t2 = +1.0 if t2 > +1.0 else t2 + t2 = -1.0 if t2 < -1.0 else t2 + pitch = math.asin(t2) + + t3 = +2.0 * (w * z + x * y) + t4 = +1.0 - 2.0 * (y * y + z * z) + yaw = math.atan2(t3, t4) + + return roll, pitch, yaw + + def angle_diff(self, a, b): + diff = a - b + while diff < -math.pi: + diff += 2 * math.pi + while diff > math.pi: + diff -= 2 * math.pi + return diff + + def execute(self, userdata): + if self.turn_detected: + self.turn_detected = False # Reset flag for next detection + return "turn_detected" + else: + return "no_turn" + + +class TrackDistance(smach.State): + """ + State for detecting Tiago moved a certain distance <-- for guiding people. + """ + + def __init__(self, target_distance): + smach.State.__init__(self, outcomes=["distance_reached", "failed"]) + self.target_distance = target_distance + self.start_position = None + self.sub_odom = rospy.Subscriber("/odom", Odometry, self.odom_callback) + + def odom_callback(self, msg): + if self.start_position is None: + self.start_position = msg.pose.pose.position + else: + current_position = msg.pose.pose.position + distance_travelled = self.calculate_distance( + self.start_position, current_position + ) + if distance_travelled >= self.target_distance: + self.start_position = None # Reset for next distance tracking + self.sub_odom.unregister() # Stop the subscriber + self.distance_reached = True + + def calculate_distance(self, start, end): + return math.sqrt((start.x - end.x) ** 2 + (start.y - end.y) ** 2) + + def execute(self, userdata): + self.distance_reached = False + while not self.distance_reached: + rospy.sleep(0.1) + return "distance_reached" diff --git a/tasks/gpsr/src/gpsr/states/go_find_the_object.py b/tasks/gpsr/src/gpsr/states/go_find_the_object.py new file mode 100755 index 000000000..99f365db8 --- /dev/null +++ b/tasks/gpsr/src/gpsr/states/go_find_the_object.py @@ -0,0 +1,231 @@ +#!/usr/bin/env python3 +from go_to_location import GoToLocation +import smach +from lasr_skills import Detect3D +from shapely.geometry.polygon import Polygon +from typing import List, Union +from geometry_msgs.msg import Pose, Point, Quaternion +from lasr_skills import Say, PlayMotion + +""" +location = rospy.get_param("/start") -> python dict +Pose(position : Point, orientation : Quaternion) +pose = Pose(position=Point(**location['pose']['position'], orientation=Quaternion(**location['pose']['orientation'])) +""" + + +class Go_find_the_object(smach.StateMachine): + class check_objects(smach.State): + def __init__(self): + smach.State.__init__( + self, + outcomes=["succeeded", "failed"], + input_keys=["detections_3d"], + output_keys=["detection_result"], + ) + + def move_around(self, detections): + print("we're checking on the object") + if len(detections) == 0: + result = False + else: + result = True + return result + + def execute(self, userdata): + filtered_detections = ( + userdata.detections_3d + ) # the outcome of the 3d detection + print(filtered_detections) + object_list = list() + for i in filtered_detections.detected_objects: + object_list.append(i.name) + result = self.move_around(object_list) + userdata.detection_result = result + rospy.loginfo(filtered_detections) + + return "succeeded" + + class cumulate_result(smach.State): + def __init__(self): + smach.State.__init__( + self, + outcomes=["succeeded", "failed"], + input_keys=["detection_result", "cumulated_result"], + output_keys=["cumulated_result"], + ) + + def execute(self, userdata): + if "cumulated_result" not in userdata: + userdata.cumulated_result = list() + userdata.cumulated_result.append(userdata.detection_result) + else: + userdata.cumulated_result.append( + userdata.detection_result + ) # the outcome of the 3d detection + return "succeeded" + + class detection_result(smach.State): + def __init__(self): + smach.State.__init__( + self, + outcomes=["object_found", "object_not_found", "failed"], + input_keys=["cumulated_result"], + output_keys=["result", "cumulated_result"], + ) + + def execute(self, userdata): + if any(userdata.cumulated_result): + userdata.result = True + userdata.cumulated_result = list() + return "object_found" + else: + userdata.cumulated_result = list() + userdata.result = False + return "object_not_found" + + def __init__( + self, + area_polygon: Polygon, + depth_topic: str = "/xtion/depth_registered/points", + model: str = "yolov8n-seg.pt", + filter: Union[List[str], None] = None, + locations: Union[List[dict], None] = None, + confidence: float = 0.5, + nms: float = 0.3, + motions: Union[List[str], None] = None, + ): + smach.StateMachine.__init__(self, outcomes=["succeeded", "failed"]) + + say_object_found = Say("I found the object here!") + say_object_not_found = Say("the object is not here") + + with self: + state = 0 + + # Outer loop iterates over locations + for i, location in enumerate(locations): + pose = Pose( + position=Point(**location["pose"]["position"]), + orientation=Quaternion(**location["pose"]["orientation"]), + ) + + # Add the initial state for going to this location + smach.StateMachine.add( + f"LOCATION_{i}", + GoToLocation(pose), + transitions={ + "succeeded": f"LOOK_AROUND_{state}", + "aborted": "failed", + "preempted": "failed", + }, + ) + + # Inner loop iterates over motions + for index, direction in enumerate(motions): + print(direction) + # Add a state to perform the current motion + smach.StateMachine.add( + f"LOOK_AROUND_{state}", + PlayMotion(motion_name=direction), + transitions={ + "succeeded": f"DETECT_OBJECTS_3D_{state}", + "aborted": f"DETECT_OBJECTS_3D_{state}", + "preempted": "failed", + }, + ) + + # Add a state to detect 3D objects after performing the motion + smach.StateMachine.add( + f"DETECT_OBJECTS_3D_{state}", + Detect3D( + depth_topic=depth_topic, + model=model, + filter=filter, + confidence=confidence, + nms=nms, + ), + transitions={ + "succeeded": f"RESULT_{state}", + "failed": "failed", + }, + ) + + # Add a state to check objects + smach.StateMachine.add( + f"RESULT_{state}", + self.check_objects(), + transitions={ + "succeeded": f"SAVE_RESULT_{state}", + "failed": "failed", + }, + ) + smach.StateMachine.add( + f"SAVE_RESULT_{state}", + self.cumulate_result(), + transitions={ + "succeeded": ( + f"LOOK_AROUND_{state+1}" + if index != (len(motions) - 1) + else f"CHECK_RESULT_{i}" + ), + "failed": "failed", + }, + ) + state += 1 + + smach.StateMachine.add( + f"CHECK_RESULT_{i}", + self.detection_result(), + transitions={ + "object_found": f"SAY_OBJECT_FOUND_{i}", + "object_not_found": f"SAY_OBJECT_NOT_FOUND_{i}", + "failed": "failed", + }, + ) + # Add states to handle object found or not found results + smach.StateMachine.add( + f"SAY_OBJECT_FOUND_{i}", + say_object_found, + transitions={ + "succeeded": ( + f"LOCATION_{i+1}" + if i != len(locations) - 1 + else "succeeded" + ), + "aborted": "failed", + "preempted": "failed", + }, + ) + smach.StateMachine.add( + f"SAY_OBJECT_NOT_FOUND_{i}", + say_object_not_found, + transitions={ + "succeeded": ( + f"LOCATION_{i+1}" + if i != len(locations) - 1 + else "succeeded" + ), + "aborted": "failed", + "preempted": "failed", + }, + ) + + +if __name__ == "__main__": + import rospy + from sensor_msgs.msg import PointCloud2 + + location_list = list() + for i in range(3): + location = rospy.get_param(f"/LOCATION_{i}") + location_list.append(location) + motion = ["look_down", "look_down_left", "look_down_right"] + rospy.init_node("test_find_object") + sm = Go_find_the_object( + Polygon(), filter=["cup"], locations=location_list, motions=motion + ) + sm.userdata.pcl_msg = rospy.wait_for_message( + "/xtion/depth_registered/points", PointCloud2 + ) + sm.execute() diff --git a/tasks/gpsr/src/gpsr/states/go_to_location.py b/tasks/gpsr/src/gpsr/states/go_to_location.py new file mode 100755 index 000000000..79f16c041 --- /dev/null +++ b/tasks/gpsr/src/gpsr/states/go_to_location.py @@ -0,0 +1,32 @@ +import smach_ros + +from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal +from geometry_msgs.msg import Pose, PoseStamped +from std_msgs.msg import Header + +from typing import Union + + +class GoToLocation(smach_ros.SimpleActionState): + def __init__(self, location: Union[Pose, None] = None): + if location is not None: + super(GoToLocation, self).__init__( + "move_base", + MoveBaseAction, + goal=MoveBaseGoal( + target_pose=PoseStamped( + pose=location, header=Header(frame_id="map") + ) + ), + ) + else: + super(GoToLocation, self).__init__( + "move_base", + MoveBaseAction, + goal_cb=lambda ud, _: MoveBaseGoal( + target_pose=PoseStamped( + pose=ud.location, header=Header(frame_id="map") + ) + ), + input_keys=["location"], + ) diff --git a/tasks/gpsr/src/gpsr/states/guide_people.py b/tasks/gpsr/src/gpsr/states/guide_people.py new file mode 100755 index 000000000..29c8399c0 --- /dev/null +++ b/tasks/gpsr/src/gpsr/states/guide_people.py @@ -0,0 +1,380 @@ +#!/usr/bin/env python3 +from go_to_location import GoToLocation +import smach +from lasr_skills import Detect3D, Detect3DInArea +from shapely.geometry.polygon import Polygon +from typing import List, Union, Dict +from geometry_msgs.msg import Pose, Point, Quaternion +from lasr_skills import Say, PlayMotion + + +""" +location = rospy.get_param("/start") -> python dict +Pose(position : Point, orientation : Quaternion) +pose = Pose(position=Point(**location['pose']['position'], orientation=Quaternion(**location['pose']['orientation'])) +""" + + +class WaitForPersonInArea(smach.StateMachine): + + class CheckForPerson(smach.State): + + def __init__(self): + smach.State.__init__( + self, outcomes=["done", "not_done"], input_keys=["detections_3d"] + ) + + def execute(self, userdata): + if len(userdata.detections_3d): + return "done" + else: + return "not_done" + + def __init__(self, area_polygon: Polygon): + smach.StateMachine.__init__( + self, + outcomes=["find_person", "not_find_person", "failed"], + output_keys=["detections_3d"], + ) + + with self: + smach.StateMachine.add( + "DETECT_PEOPLE_3D", + Detect3DInArea(area_polygon=area_polygon, filter=["person"]), + transitions={"succeeded": "CHECK_FOR_PERSON", "failed": "failed"}, + ) + smach.StateMachine.add( + "CHECK_FOR_PERSON", + self.CheckForPerson(), + transitions={"done": "find_person", "not_done": "not_find_person"}, + ) + + +class Guiding_people(smach.StateMachine): + class Detect_people(smach.State): + # move head & detect people + def __init__(self): + smach.State.__init__( + self, + outcomes=["people_found", "people_not_found", "failed"], + input_keys=["detections_3d"], + output_keys=["detection_result"], + ) + + def look_around(self, detections): + print("we're checking on the follower") + if len(detections) == 0: + result = False + else: + result = True + return result + + def execute(self, userdata): + filtered_detections = ( + userdata.detections_3d + ) # the outcome of the 3d detection + print(filtered_detections) + object_list = filtered_detections + result = self.look_around(object_list) + userdata.detection_result = result + rospy.loginfo(filtered_detections) + if result: + return "people_found" + else: + return "people_not_found" + + class cumulate_result(smach.State): + def __init__(self): + smach.State.__init__( + self, + outcomes=["succeeded", "failed"], + input_keys=["detection_result", "cumulated_result"], + output_keys=["cumulated_result"], + ) + + def execute(self, userdata): + if "cumulated_result" not in userdata: + userdata.cumulated_result = list() + userdata.cumulated_result.append(userdata.detection_result) + else: + userdata.cumulated_result.append( + userdata.detection_result + ) # the outcome of the 3d detection + return "succeeded" + + class detection_result(smach.State): + def __init__(self): + smach.State.__init__( + self, + outcomes=["check_found", "check_not_found", "failed"], + input_keys=["cumulated_result"], + output_keys=["result", "cumulated_result"], + ) + + def execute(self, userdata): + if any(userdata.cumulated_result): + userdata.result = True + userdata.cumulated_result = list() + return "check_found" + else: + userdata.cumulated_result = list() + userdata.result = False + return "check_not_found" + + def __init__( + self, + locations: Union[List[dict], None] = None, + motions: Union[List[str], None] = None, + waiting_area: Union[List[Polygon], None] = None, + turning_point: Union[List[dict], None] = None, + ): + smach.StateMachine.__init__(self, outcomes=["succeeded", "failed"]) + + with self: + # 1. Start & greeting + start_pose = Pose( + position=Point(**locations[0]["pose"]["position"]), + orientation=Quaternion(**locations[0]["pose"]["orientation"]), + ) + smach.StateMachine.add( + "START_GUIDING", + GoToLocation(start_pose), + transitions={ + "succeeded": "GREETING_GUEST_FIRST", + "aborted": "failed", + "preempted": "failed", + }, + ) + smach.StateMachine.add( + "GREETING_GUEST_FIRST", + Say(text="I am waiting for a person."), + transitions={ + "succeeded": "WAIT_FOR_PERSON_GUEST_1", + "aborted": "failed", + "preempted": "failed", + }, + ) + smach.StateMachine.add( + "WAIT_FOR_PERSON_GUEST_1", + PlayMotion(motion_name=motions[0]), + transitions={ + "succeeded": "GREETING_GUEST", + "aborted": "GREETING_GUEST", + "preempted": "failed", + }, + ) + smach.StateMachine.add( + "GREETING_GUEST", + Say(text="Please stand in front of me."), + transitions={ + "succeeded": "DETECT_OBJECTS_3D_START", + "aborted": "failed", + "preempted": "failed", + }, + ) + + # wait for person in area + smach.StateMachine.add( + "DETECT_OBJECTS_3D_START", + WaitForPersonInArea(waiting_area[0]), + transitions={ + "find_person": "GREETING_GUEST_YES", + "not_find_person": "GREETING_GUEST_NO", + "failed": "failed", + }, + ) + smach.StateMachine.add( + "GREETING_GUEST_YES", + Say(text="Hello, nice to meet you, please follow me."), + transitions={ + "succeeded": "START_GUIDING_0", + "aborted": "failed", + "preempted": "failed", + }, + ) + smach.StateMachine.add( + "GREETING_GUEST_NO", + Say(text="I didn't see anyone."), + transitions={ + "succeeded": "GREETING_GUEST", + "aborted": "failed", + "preempted": "failed", + }, + ) + + # 2. Go to locations + for i, location in enumerate(locations[1:-1]): + pose = Pose( + position=Point(**location["pose"]["position"]), + orientation=Quaternion(**location["pose"]["orientation"]), + ) + smach.StateMachine.add( + f"START_GUIDING_{i}", + GoToLocation(pose), + transitions={ + "succeeded": f"TURNING_AROUND{i}", + "aborted": "failed", + "preempted": "failed", + }, + ) + + turn_pose = Pose( + position=Point(**turning_point[i]["pose"]["position"]), + orientation=Quaternion(**turning_point[i]["pose"]["orientation"]), + ) + smach.StateMachine.add( + f"TURNING_AROUND{i}", + GoToLocation(turn_pose), + transitions={ + "succeeded": f"CHECK_AROUND_{i}_0", + "aborted": "failed", + "preempted": "failed", + }, + ) + + state = 0 + for index, motion in enumerate(motions): + smach.StateMachine.add( + f"CHECK_AROUND_{i}_{state}", + PlayMotion(motion_name=motion), + transitions={ + "succeeded": f"DETECT_OBJECTS_{i}_{state}", + "aborted": f"DETECT_OBJECTS_{i}_{state}", + "preempted": "failed", + }, + ) + smach.StateMachine.add( + f"DETECT_OBJECTS_{i}_{state}", + WaitForPersonInArea(waiting_area[i + 1]), + transitions={ + "find_person": f"RESULT_{i}_{state}", + "not_find_person": f"RESULT_{i}_{state}", + "failed": "failed", + }, + ) + smach.StateMachine.add( + f"RESULT_{i}_{state}", + self.Detect_people(), + transitions={ + "people_found": f"SAVE_RESULT_{i}_{state}", + "people_not_found": f"SAVE_RESULT_{i}_{state}", + "failed": "failed", + }, + ) + smach.StateMachine.add( + f"SAVE_RESULT_{i}_{state}", + self.cumulate_result(), + transitions={ + "succeeded": ( + f"CHECK_AROUND_{i}_{state+1}" + if index != (len(motions) - 1) + else f"CHECK_RESULT_{i}" + ), + "failed": "failed", + }, + ) + state += 1 + + smach.StateMachine.add( + f"CHECK_RESULT_{i}", + self.detection_result(), + transitions={ + "check_found": f"GREETING_GUEST_YES_{i}", + "check_not_found": f"GREETING_GUEST_NO_{i}", + "failed": "failed", + }, + ) + smach.StateMachine.add( + f"GREETING_GUEST_YES_{i}", + Say(text="Let's move on to the destination."), + transitions={ + "succeeded": ( + f"START_GUIDING_{i+1}" + if index != (len(locations[1:-1])) + else "GUIDING_DESTINATION" + ), + "aborted": "failed", + "preempted": "failed", + }, + ) + smach.StateMachine.add( + f"GREETING_GUEST_NO_{i}", + Say(text="Hey, where are you? please follow me."), + transitions={ + "succeeded": f"CHECK_AROUND_{i}_{state-2}", + "aborted": "failed", + "preempted": "failed", + }, + ) + + # 3. Final destination + end_pose = Pose( + position=Point(**locations[-1]["pose"]["position"]), + orientation=Quaternion(**locations[-1]["pose"]["orientation"]), + ) + smach.StateMachine.add( + f"GUIDING_DESTINATION", + GoToLocation(end_pose), + transitions={ + "succeeded": "TURN_BACK", + "aborted": "failed", + "preempted": "failed", + }, + ) + end_turn_pose = Pose( + position=Point(**turning_point[-1]["pose"]["position"]), + orientation=Quaternion(**turning_point[-1]["pose"]["orientation"]), + ) + smach.StateMachine.add( + f"TURN_BACK", + GoToLocation(end_turn_pose), + transitions={ + "succeeded": "SAY_ARRIVED", + "aborted": "failed", + "preempted": "failed", + }, + ) + smach.StateMachine.add( + "SAY_ARRIVED", + Say(text="We've arrived at the destination. See you."), + transitions={ + "succeeded": "succeeded", + "aborted": "failed", + "preempted": "failed", + }, + ) + + +if __name__ == "__main__": + import rospy + from sensor_msgs.msg import PointCloud2 + + location = [ + rospy.get_param("/Start"), + rospy.get_param("/WaitPoint1"), + rospy.get_param("/WaitPoint2"), + rospy.get_param(f"/Destination"), + ] + turn_around = [ + rospy.get_param(f"/TurnAround1"), + rospy.get_param(f"/TurnAround2"), + rospy.get_param("/TurnAroundDestination"), + ] + motion = ["look_center", "look_left", "look_right"] + point_1 = Polygon([[2.72, 7.01], [5.09, 7.57], [5.14, 6.13], [3.64, 5.51]]) + point_2 = Polygon([[1.35, 5.15], [2.47, 5.4], [2.78, 4.25], [1.69, 3.96]]) + point_3 = Polygon([[1.11, 1.09], [3.36, 1.73], [3.73, -0.03], [1.53, -0.60]]) + point = [point_1, point_2, point_3] + rospy.init_node("guiding_people") + sm = Guiding_people( + locations=location, + motions=motion, + turning_point=turn_around, + waiting_area=point, + ) + sm.userdata.pcl_msg = rospy.wait_for_message( + "/xtion/depth_registered/points", PointCloud2 + ) + sm.execute() + + # wait for person in area diff --git a/tasks/gpsr/src/gpsr/states/object_comparison.py b/tasks/gpsr/src/gpsr/states/object_comparison.py new file mode 100755 index 000000000..579c34a16 --- /dev/null +++ b/tasks/gpsr/src/gpsr/states/object_comparison.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python3 +import smach +from lasr_skills import Detect3DInArea +from lasr_skills import Detect3D +from lasr_skills import Detect +from shapely.geometry.polygon import Polygon +from typing import List, Union, Dict + + +class ObjectComparison(smach.StateMachine): + class CountObjectTypes(smach.State): + def __init__(self, area_polygon: Polygon): + smach.State.__init__( + self, + outcomes=["succeeded", "failed"], + input_keys=["detections_3d"], + output_keys=["detections_types", "object_dict"], + ) + + def count_types(self, detections): + """ + Count the number of different object types in detections. + + :param detections: List of detection tuples (object_type, position) + :return: Dictionary with object types as keys and counts as values + """ + object_counts = {} + for detection in detections: + object_type = detection.name + + if len(object_counts.keys()) == 0: + object_counts[object_type] = 1 + else: + if object_type in object_counts.keys(): + object_counts[object_type] += 1 + else: + object_counts[object_type] = 1 + return object_counts + + def execute(self, userdata): + print(userdata.detections_3d) + filtered_detections = userdata.detections_3d + rospy.loginfo(filtered_detections) + rospy.loginfo(type(filtered_detections.detected_objects[0])) + object_counts = self.count_types(filtered_detections.detected_objects) + print(object_counts) + userdata.object_dict = object_counts + userdata.detections_types = object_counts.keys() + return "succeeded" + + class CountCategory(smach.State): + def __init__(self, object_weight: Union[List[dict], None] = None): + smach.State.__init__( + self, + outcomes=["succeeded", "failed"], + input_keys=["object_dict"], + output_keys=["category_dict", "detections_categories"], + ) + self.object_weight = object_weight + + def count_category(self, object_dictionary, count_object): + category_count = {category: 0 for category in object_dictionary.keys()} + for category, items in object_dictionary.items(): + for obj in count_object.keys(): + if obj in items: + category_count[category] += count_object[obj] + return category_count + + def execute(self, userdata): + detected_objects = userdata.object_dict + counts = self.count_category(self.object_weight, detected_objects) + category_counts = { + key: value for key, value in counts.items() if value != 0 + } + print("finished counting") + print(category_counts) + userdata.category_dict = category_counts + userdata.detections_categories = category_counts.keys() + return "succeeded" + + class ObjectWeight(smach.State): + def __init__(self, object_weight: Union[List[dict], None] = None): + smach.State.__init__( + self, + outcomes=["succeeded", "failed"], + input_keys=["detections_types"], + output_keys=["sorted_weights"], + ) + self.object_weight = object_weight + + def get_weight(self, detections, average_weights): + """ + rank the weight of a list of objects. + + :param detections: List of detection tuples (object_type, position) + :return: a list of the weight rank of object in a category(max->min) + """ + weight_dict = {} + for category, items in average_weights.items(): + for i in detections: + if i in items: + weight = items[i] + weight_dict[i] = weight + + return weight_dict + + def execute(self, userdata): + weights_dict = self.get_weight( + userdata.detections_types, self.object_weight + ) + sorted_weights = sorted( + weights_dict.items(), key=lambda item: item[1], reverse=True + ) + userdata.sorted_weights = sorted_weights + print(sorted_weights) + return "succeeded" + + class ObjectSize(smach.State): + def __init__(self): + smach.State.__init__( + self, + outcomes=["succeeded", "failed"], + input_keys=["object_dict", "detections_3d"], + output_keys=["sorted_size"], + ) + + def property_size_calculation(self, detections, result): + """ + calculate the size of a list of objects using bounding box. + :param detections: List of detection tuples (object_type, position) + :return: a list of the size of each object in a category + """ + + area = dict() + for i in detections: + for object in result.detected_objects: + if i == object.name: + area[i] = object.xywh[2] * object.xywh[3] + return area + + def execute(self, userdata): + detections_types = userdata.object_dict.keys() + area_dict = self.property_size_calculation( + detections_types, userdata.detections_3d + ) + sorted_size = sorted( + area_dict.items(), key=lambda item: item[1], reverse=True + ) + userdata.sorted_size = sorted_size + print(sorted_size) + return "succeeded" + + def __init__( + self, + area_polygon: Polygon, + depth_topic: str = "/xtion/depth_registered/points", + model: str = "yolov8n-seg.pt", + filter: Union[List[str], None] = None, + confidence: float = 0.5, + nms: float = 0.3, + object: Union[List[dict], None] = None, + ): + smach.StateMachine.__init__( + self, + outcomes=["succeeded", "failed"], + output_keys=["detections_3d", "object_dict"], + ) + + with self: + smach.StateMachine.add( + "DETECT_OBJECTS_3D", + Detect3DInArea( + depth_topic=depth_topic, + model=model, + filter=filter, + confidence=confidence, + nms=nms, + ), + transitions={"succeeded": "COUNTOBJECTS", "failed": "failed"}, + ) + + smach.StateMachine.add( + "COUNTOBJECTS", + self.CountObjectTypes(area_polygon), + transitions={"succeeded": "COUNTCATEGORY", "failed": "failed"}, + ) + smach.StateMachine.add( + "COUNTCATEGORY", + self.CountCategory(object_weight=object), + transitions={"succeeded": "GETWEIGHT", "failed": "failed"}, + ) + smach.StateMachine.add( + "GETWEIGHT", + self.ObjectWeight(object_weight=object), + transitions={"succeeded": "GETSIZE", "failed": "failed"}, + ) + smach.StateMachine.add( + "GETSIZE", + self.ObjectSize(), + transitions={"succeeded": "succeeded", "failed": "failed"}, + ) + + +if __name__ == "__main__": + import rospy + from sensor_msgs.msg import PointCloud2 + + rospy.init_node("test_object_comparison") + weight = rospy.get_param("/Object_list/Object") + + polygon = Polygon([[-1, 0], [1, 0], [0, 1], [1, 1]]) + sm = ObjectComparison(Polygon(), filter=["bottle", "cup", "cola"], object=weight) + sm.userdata.pcl_msg = rospy.wait_for_message( + "/xtion/depth_registered/points", PointCloud2 + ) + sm.execute()