Skip to content

Commit

Permalink
Merge pull request #24 from Ke-Wang1017/main
Browse files Browse the repository at this point in the history
Create script policy to automatic generate demos for LiftCube Env
  • Loading branch information
perezjln authored Jun 18, 2024
2 parents 0f5898b + 5d2b950 commit a81f2d5
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 52 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ Currently, our todo list is:
- [ ] Provide reward shaping functions, get inspired from meta-world .
- [ ] Implement the real-world interface, seemless interface with real-world observations, motor.
- [ ] Improve the fidelity of the simulation.

- [ ] Simplify the gripper mesh for more stable grasp.
## License

This project is licensed under the Apache 2.0 License - see the [LICENSE](LICENSE) file for details.
137 changes: 137 additions & 0 deletions examples/scripted_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import gymnasium as gym
import gym_lowcostrobot # Import the low-cost robot environments
import numpy as np
import mujoco


def displace_object(env, square_size=0.15, invert_y=False, origin_pos=[0, 0.1]):
### Sample a position in a square in front of the robot
if not invert_y:
x = np.random.uniform(origin_pos[0] - square_size / 2, origin_pos[0] + square_size / 2)
y = np.random.uniform(origin_pos[1] - square_size / 2, origin_pos[1] + square_size / 2)
else:
x = np.random.uniform(origin_pos[0] + square_size / 2, origin_pos[0] - square_size / 2)
y = np.random.uniform(origin_pos[1] + square_size / 2, origin_pos[1] - square_size / 2)
env.data.qpos[:3] = np.array([x, y, origin_pos[2]])
return env.data.qpos[:3]


class BasePolicy:
def __init__(self, inject_noise=False, init_pose=None, meet_pose=None):
self.inject_noise = inject_noise
self.step_count = 0
self.trajectory = None
self.right_trajectory = None
self.init_pose = init_pose
self.meet_pose = meet_pose

def generate_trajectory(self, ts_first):
raise NotImplementedError

@staticmethod
def interpolate(curr_waypoint, next_waypoint, t):
t_frac = (t - curr_waypoint["t"]) / (next_waypoint["t"] - curr_waypoint["t"])
curr_xyz = curr_waypoint['xyz']
curr_quat = curr_waypoint['quat']
curr_grip = curr_waypoint['gripper']
next_xyz = next_waypoint['xyz']
next_quat = next_waypoint['quat']
next_grip = next_waypoint['gripper']
xyz = curr_xyz + (next_xyz - curr_xyz) * t_frac
quat = curr_quat + (next_quat - curr_quat) * t_frac
gripper = curr_grip + (next_grip - curr_grip) * t_frac
return xyz, quat, gripper

def __call__(self):
# generate trajectory at first timestep, then open-loop execution
if self.step_count == 0:
self.generate_trajectory(self.init_pose, self.meet_pose)

# obtain arm waypoints
if self.trajectory[0]['t'] == self.step_count:
self.curr_waypoint = self.trajectory.pop(0)
next_waypoint = self.trajectory[0]

# interpolate between waypoints to obtain current pose and gripper command
xyz, quat, gripper = self.interpolate(self.curr_waypoint, next_waypoint, self.step_count)

# Inject noise
if self.inject_noise:
scale = 0.01
xyz = xyz + np.random.uniform(-scale, scale, xyz.shape)

action = np.concatenate([xyz, quat, [gripper]])

self.step_count += 1
return action


class LiftCubePolicy(BasePolicy):

def generate_trajectory(self, init_pose, meet_pose):
init_pose = init_pose
meet_pose = meet_pose

# box_info = np.array(ts_first.observation['env_state'])
# box_xyz = box_info[:3]
# box_quat = box_info[3:]
# print(f"Generate trajectory for {box_xyz=}")

# gripper_pick_quat = Quaternion(init_mocap_pose_right[3:])
# gripper_pick_quat = gripper_pick_quat * Quaternion(axis=[0.0, 1.0, 0.0], degrees=-60)

meet_xyz = meet_pose[:3]

self.trajectory = [
{"t": 0, "xyz": init_pose[:3], "quat": init_pose[3:], "gripper": 0}, # sleep
{"t": 800, "xyz": meet_xyz + np.array([0.0, 0.01, 0.075]), "quat": meet_pose[3:], "gripper": -1.5}, # approach meet position
{"t": 1100, "xyz": meet_xyz + np.array([0.0, 0.01, 0.02]), "quat": meet_pose[3:], "gripper": -1.5}, # move to meet position
{"t": 1350, "xyz": meet_xyz + np.array([0.0, 0.01, 0.02]), "quat": meet_pose[3:], "gripper": -0.25}, # close gripper
{"t": 1490, "xyz": meet_xyz + np.array([0.0, 0.01, 0.1]), "quat": meet_pose[3:], "gripper": -0.25}, # lift up
{"t": 1500, "xyz": meet_xyz + np.array([0.0, 0.01, 0.1]), "quat": meet_pose[3:], "gripper": -0.25}, # stay
]


def test_policy(task_name):

# setup the environment
if 'LiftCube' in task_name:
env = gym.make("LiftCube-v0", render_mode="human", action_mode="ee")
# other tasks can be added here
else:
raise NotImplementedError

NUM_EPISODES = 5
cube_origin_pos = [0.03390873, 0.22571199, 0.04]

for episode_idx in range(NUM_EPISODES):
observation, info = env.reset()
cube_pos = displace_object(env, square_size=0.1, invert_y=False, origin_pos=cube_origin_pos)
# cube_pos = env.unwrapped.data.qpos[:3].astype(np.float32)
ee_id = env.model.body("moving_side").id
ee_pos = env.unwrapped.data.xpos[ee_id].astype(np.float32) # default [0.03390873 0.22571199 0.14506643]
ee_orn = np.zeros(4, dtype=np.float64)
mujoco.mju_mat2Quat(ee_orn, env.unwrapped.data.xmat[ee_id])
# keep orientation constant
init_pose = np.concatenate([ee_pos, ee_orn])
meet_pose = np.concatenate([cube_pos, ee_orn])
policy = LiftCubePolicy(init_pose=init_pose, meet_pose=meet_pose)
episode_length = 1500
for i in range(episode_length):
action = env.action_space.sample()
result = policy()
ee_pos = env.unwrapped.data.xpos[ee_id].astype(np.float32)
action[:3] = result[:3] - ee_pos
action[3] = result[7]
# Step the environment
observation, reward, terminted, truncated, info = env.step(action)

# Reset the environment if it's done
if terminted or truncated:
observation, info = env.reset()
break


if __name__ == '__main__':
test_task_name = 'LiftCube'
test_policy(test_task_name)
2 changes: 1 addition & 1 deletion gym_lowcostrobot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
register(
id="LiftCube-v0",
entry_point="gym_lowcostrobot.envs:LiftCubeEnv",
max_episode_steps=500,
max_episode_steps=1500,
)

register(
Expand Down
6 changes: 3 additions & 3 deletions gym_lowcostrobot/assets/low_cost_robot_6dof/lift_cube.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<mujoco model="bd1 scene">
<option timestep="0.005"/>
<option timestep="0.005" o_solimp=".89 .91 .01 0.5 2" o_solref=".01 1" />

<compiler angle="radian" autolimits="true"/>
<compiler angle="radian" autolimits="true" convexhull="false"/>

<asset>
<mesh name="base" file="base.stl"/>
Expand Down Expand Up @@ -38,7 +38,7 @@
<body name="cube" pos="0.1 0.1 0.01">
<freejoint name="cube"/>
<inertial pos="0 0 0" mass="0.1" diaginertia="0.00001125 0.00001125 0.00001125"/>
<geom friction="0.5" condim="3" pos="0 0 0" size="0.015 0.015 0.015" type="box" name="cube" rgba="0.5 0 0 1" priority="1"/>
<geom friction="2.0 0.8 0.8" condim="4" pos="0 0 0" size="0.01 0.01 0.01" type="box" name="cube" rgba="0.5 0 0 1" priority="1"/>
</body>


Expand Down
110 changes: 72 additions & 38 deletions gym_lowcostrobot/envs/lift_cube_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,17 @@ def __init__(self, observation_mode="image", action_mode="joint", render_mode=No
self.rgb_array_renderer = mujoco.Renderer(self.model, height=640, width=640)

# Set additional utils
self.done = False # termination flag
self.success = False # success flag
self.threshold_height = 0.5
self.cube_low = np.array([-0.15, 0.10, 0.015])
self.cube_high = np.array([0.15, 0.25, 0.015])
self.target_low = np.array([-3.14159, -1.5708, -1.48353, -1.91986, -2.96706, -1.74533])
self.target_high = np.array([3.14159, 1.22173, 1.74533, 1.91986, 2.96706, 0.0523599])
self.q0 = (self.target_high + self.target_low) / 2 # home position
self.q0[3] += 1.57

def inverse_kinematics(self, ee_target_pos, step=0.2, joint_name="moving_side", nb_dof=6, regularization=1e-6):
def inverse_kinematics(self, ee_target_pos, step=0.2, joint_name="moving_side", nb_dof=6, regularization=1e-6, home_position=None, nullspace_weight=1.):
"""
Computes the inverse kinematics for a robotic arm to reach the target end effector position.
Expand All @@ -122,45 +128,66 @@ def inverse_kinematics(self, ee_target_pos, step=0.2, joint_name="moving_side",
:param joint_name: str, name of the end effector joint
:param nb_dof: int, number of degrees of freedom
:param regularization: float, regularization factor for the pseudoinverse computation
:param home_position: numpy array of home joint positions to regularize towards
:param nullspace_weight: float, weight for the nullspace regularization
:return: numpy array of target joint positions
"""
if home_position is None:
home_position = np.zeros(nb_dof) # Default to zero if no home position is provided

try:
# Get the joint ID from the name
joint_id = self.model.body(joint_name).id
except KeyError:
raise ValueError(f"Body name '{joint_name}' not found in the model.")


ERROR_TOLERANCE = 1e-2
MAX_ITERATIONS = 10
i = 0
# Get the current end effector position
# ee_pos = self.d.geom_xpos[joint_id]
ee_id = self.model.body(joint_name).id
ee_pos = self.data.geom_xpos[ee_id]

# Compute the Jacobian
ee_pos = self.data.xpos[joint_id]
error = ee_target_pos - ee_pos
jac = np.zeros((3, self.model.nv))
mujoco.mj_jacBodyCom(self.model, self.data, jac, None, joint_id)

# Compute the difference between target and current end effector positions
delta_pos = ee_target_pos - ee_pos

# Compute the pseudoinverse of the Jacobian with regularization
jac_reg = jac[:, :nb_dof].T @ jac[:, :nb_dof] + regularization * np.eye(nb_dof)
jac_pinv = np.linalg.inv(jac_reg) @ jac[:, :nb_dof].T

# Compute target joint velocities
qdot = jac_pinv @ delta_pos

# Normalize joint velocities to avoid excessive movements
qdot_norm = np.linalg.norm(qdot)
if qdot_norm > 1.0:
qdot /= qdot_norm

# Read the current joint positions
qpos = self.data.qpos[:nb_dof]

# Compute the new joint positions
q_target_pos = qpos + qdot * step

return q_target_pos
q_pos = self.data.qpos[7:13].copy()
Kn = np.ones(nb_dof) * nullspace_weight

while np.linalg.norm(error) > ERROR_TOLERANCE and i < MAX_ITERATIONS:
# Compute the Jacobian
# mujoco.mj_step(self.model, self.data)
mujoco.mj_forward(self.model, self.data)
mujoco.mj_jac(self.model, self.data, jac, None, ee_target_pos, joint_id)
ee_pos = self.data.xpos[joint_id].astype(np.float32)

# Compute the difference between target and current end effector positions
error = ee_target_pos - ee_pos

# Compute the pseudoinverse of the Jacobian with damping, nv has 12 values
jac_reg = jac[:, 6:12].T @ jac[:, 6:12] + regularization * np.eye(nb_dof)
jac_pinv = np.linalg.inv(jac_reg) @ jac[:, 6:12].T

# Compute target joint velocities
qdot = jac_pinv @ error
# try to keep the joint close to home position, otherwise the robot will move even if the target is reached
qdot += (np.eye(nb_dof) - np.linalg.pinv(jac[:, 6:12]) @ jac[:, 6:12]) @ (Kn * (home_position - q_pos))


# Normalize joint velocities to avoid excessive movements
qdot_norm = np.linalg.norm(qdot)
if qdot_norm > 1.0:
qdot /= qdot_norm

# Update the joint positions
self.data.qpos[7:13] += qdot * step
i += 1
q_target_pos = self.data.qpos[7:13].copy()
self.data.qpos[7:13] = q_pos
if i == MAX_ITERATIONS:
print("Inverse kinematics did not converge")
print(f"Error: {error}")
else:
print(f"Inverse kinematics converged in {i} iterations")
return q_target_pos


def apply_action(self, action):
"""
Expand All @@ -179,12 +206,10 @@ def apply_action(self, action):
ee_target_pos = self.data.xpos[ee_id] + ee_action

# Use inverse kinematics to get the joint action wrt the end effector current position and displacement
target_qpos = self.inverse_kinematics(ee_target_pos=ee_target_pos)
target_qpos = self.inverse_kinematics(ee_target_pos=ee_target_pos, joint_name="moving_side", home_position=self.q0, step=0.05)
target_qpos[-1:] = gripper_action
elif self.action_mode == "joint":
target_low = np.array([-3.14159, -1.5708, -1.48353, -1.91986, -2.96706, -1.74533])
target_high = np.array([3.14159, 1.22173, 1.74533, 1.91986, 2.96706, 0.0523599])
target_qpos = action * (target_high - target_low) / 2 + (target_high + target_low) / 2
target_qpos = action * (self.target_high - self.target_low) / 2 + self.q0
else:
raise ValueError("Invalid action mode, must be 'ee' or 'joint'")

Expand Down Expand Up @@ -221,6 +246,8 @@ def reset(self, seed=None, options=None):
cube_rot = np.array([1.0, 0.0, 0.0, 0.0])
robot_qpos = np.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
self.data.qpos[:] = np.concatenate([cube_pos, cube_rot, robot_qpos])
self.done = False
self.success = False

# Step the simulation
mujoco.mj_forward(self.model, self.data)
Expand All @@ -238,14 +265,21 @@ def step(self, action):
cube_pos = self.data.qpos[:3]
cube_z = cube_pos[2]
ee_id = self.model.body("moving_side").id
ee_pos = self.data.geom_xpos[ee_id]
ee_pos = self.data.xpos[ee_id]
ee_to_cube = np.linalg.norm(ee_pos - cube_pos)
print(f"Cube position: {cube_pos}, EE position: {ee_pos}, Distance: {ee_to_cube}")

# Compute the reward
reward_height = cube_z - self.threshold_height
reward_distance = -ee_to_cube
reward = reward_height + reward_distance
return observation, reward, False, False, {}
# reward = reward_height + reward_distance
reward = reward_distance
if ee_to_cube < 0.049:
self.done = True
self.success = True
info = {}
info["success"] = self.success
return observation, reward, self.done, False, info

def render(self):
if self.render_mode == "human":
Expand Down
12 changes: 3 additions & 9 deletions gym_lowcostrobot/envs/reach_cube_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,7 @@ def inverse_kinematics(self, ee_target_pos, step=0.2, joint_name="moving_side",
if qdot_norm > 1.0:
qdot /= qdot_norm

# Read the current joint positions
# qpos = self.data.qpos[7:13]

# Compute the new joint positions
# q_target_pos = qpos + qdot * step

# Update the joint positions
self.data.qpos[7:13] += qdot * step
i += 1
q_target_pos = self.data.qpos[7:13].copy()
Expand All @@ -191,7 +186,7 @@ def inverse_kinematics(self, ee_target_pos, step=0.2, joint_name="moving_side",
else:
print(f"Inverse kinematics converged in {i} iterations")
return q_target_pos


def apply_action(self, action):
"""
Expand All @@ -213,8 +208,7 @@ def apply_action(self, action):
target_qpos = self.inverse_kinematics(ee_target_pos=ee_target_pos, joint_name="moving_side", home_position=self.q0, step=0.05)
target_qpos[-1:] = gripper_action
elif self.action_mode == "joint":

target_qpos = action * (self.target_high - self.target_low) / 2 + (self.target_high + self.target_low) / 2
target_qpos = action * (self.target_high - self.target_low) / 2 + self.q0
else:
raise ValueError("Invalid action mode, must be 'ee' or 'joint'")

Expand Down

0 comments on commit a81f2d5

Please sign in to comment.