Skip to content

Commit

Permalink
feat: Refactor SimCamera and SimDynamixelMotorsBus classes for improv…
Browse files Browse the repository at this point in the history
…ed organization and readability
  • Loading branch information
perezjln committed Aug 20, 2024
1 parent c5a8528 commit c81bbd4
Showing 1 changed file with 73 additions and 34 deletions.
107 changes: 73 additions & 34 deletions gym_lowcostrobot/envs/teleoperation_record.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

import time
import numpy as np
from tqdm import tqdm

import mujoco
import mujoco.viewer
Expand All @@ -14,6 +15,7 @@
import numpy as np
import torch
from datasets import Dataset, Features, Sequence, Value

from lerobot.common.datasets.compute_stats import compute_stats
from lerobot.common.datasets.lerobot_dataset import CODEBASE_VERSION, DATA_DIR, LeRobotDataset
from lerobot.common.datasets.push_dataset_to_hub.utils import concatenate_episodes, save_images_concurrently
Expand All @@ -22,7 +24,9 @@
)
from lerobot.common.datasets.video_utils import VideoFrame, encode_video_frames
from lerobot.scripts.push_dataset_to_hub import push_meta_data_to_hub, push_videos_to_hub, save_meta_data
from tqdm import tqdm
from lerobot.common.robot_devices.motors.dynamixel import DynamixelMotorsBus
from lerobot.common.robot_devices.robots.koch import KochRobot



### SimCamera classes
Expand Down Expand Up @@ -145,6 +149,12 @@ def revert_calibration(self, values: np.ndarray | list, motor_names: list[str] |
"""Inverse of `apply_calibration`."""
return values


## read the motor values from the mujoco environment
# motor_models: The motor models to read
# motor_ids: The motor ids to read
# data_name: The data name to read

def _read_with_motor_ids(self, motor_models, motor_ids, data_name):
return_list = True
if not isinstance(motor_ids, list):
Expand All @@ -160,6 +170,9 @@ def _read_with_motor_ids(self, motor_models, motor_ids, data_name):
else:
return values[0]

## read the motor values from the mujoco environment
# data_name: The data name to read
# motor_names: The motor names to read

def read(self, data_name, motor_names: str | list[str] | None = None):

Expand All @@ -179,6 +192,11 @@ def read(self, data_name, motor_names: str | list[str] | None = None):
return np.asarray(values)


## write the motor values to the mujoco environment
# data_name: The data name to write
# values: The values to write
# motor_names: The motor names to write

def _write_with_motor_ids(self, motor_models, motor_ids, data_name, values):
if not self.is_connected:
raise SimRobotDeviceNotConnectedError(
Expand All @@ -187,6 +205,13 @@ def _write_with_motor_ids(self, motor_models, motor_ids, data_name, values):
for idx, value in zip(motor_ids, values):
self.data.qpos[idx-6-1] = value


## convert the real robot joint positions to mujoco joint positions
# with support for inverted joints
# real_positions: Joint positions in degrees
# transforms: List of transforms to apply to each joint
# oppose: List of oppositions to apply to each joint

@staticmethod
def real_to_mujoco(real_positions, transforms, oppose):
"""
Expand All @@ -213,13 +238,12 @@ def real_to_mujoco(real_positions, transforms, oppose):

def write(self, data_name, values: int | float | np.ndarray, motor_names: str | list[str] | None = None):

#print(data_name, values, motor_names)

if not self.is_connected:
raise SimRobotDeviceNotConnectedError(
f"SimDynamixelMotorsBus({self.path_scene}) is not connected. You need to run `motors_bus.connect()`."
)

## do not write the following data for simulation motors so far
if data_name in ["Torque_Enable", "Operating_Mode", "Homing_Offset", "Drive_Mode", "Position_P_Gain", "Position_I_Gain", "Position_D_Gain"]:
return

Expand All @@ -232,6 +256,7 @@ def write(self, data_name, values: int | float | np.ndarray, motor_names: str |
0],
oppose=[-1,1,-1,1,-1,-1])

## update the mujoco environment
mujoco.mj_step(follower.model, follower.data)
viewer.sync()

Expand All @@ -249,19 +274,7 @@ def __del__(self):
self.disconnect()



import argparse
from lerobot.common.robot_devices.motors.dynamixel import DynamixelMotorsBus
from lerobot.common.robot_devices.robots.koch import KochRobot

def busy_wait(seconds):
# Significantly more accurate than `time.sleep`, and mendatory for our use case,
# but it consumes CPU cycles.
# TODO(rcadene): find an alternative: from python 11, time.sleep is precise
end_time = time.perf_counter() + seconds
while time.perf_counter() < end_time:
pass

## test the leader motors reading
def test_read_leader_position():

leader = DynamixelMotorsBus(
Expand All @@ -280,15 +293,25 @@ def test_read_leader_position():
leader.connect()
while True:
print(leader.read("Present_Position",
["shoulder_pan", "shoulder_lift", "elbow_flex",
"wrist_flex", "wrist_roll", "gripper"]))
["shoulder_pan", "shoulder_lift",
"elbow_flex", "wrist_flex",
"wrist_roll", "gripper"]))

leader.disconnect()

## global variables
current_motor_ids=1
stop_episode = False
stop_record = False


## callback function for the keyboard control over mujoco environment
# [1-6] to select the current controlled motor using the keyboard
# [8] to increase the current motor position
# [9] to decrease the current motor position
# [7] to stop the episode
# [space] to stop the recording

def key_callback(keycode):

global current_motor_ids
Expand Down Expand Up @@ -326,6 +349,7 @@ def key_callback(keycode):
viewer.sync()


## function to replace the cube in the mujoco environment
def mujoco_replace_cube(model, data):
cube_low = np.array([-0.15, 0.10, 0.015])
cube_high = np.array([0.15, 0.25, 0.015])
Expand All @@ -334,15 +358,17 @@ def mujoco_replace_cube(model, data):
data.qpos[0:7] = np.concatenate([cube_pos, cube_rot])
mujoco.mj_forward(model, data)


if __name__ == "__main__":

parser = argparse.ArgumentParser()

parser.add_argument("--leader-port", type=str, default="/dev/ttyACM0", help="Port for the leader motors")
parser.add_argument("--calibration-path", type=str, default=".cache/calibration/koch.pkl", help="Path to the robots calibration file")
parser.add_argument("--test-leader", action="store_true", help="Test the leader motors")
parser.add_argument("--mujoco-replace-cube", action="store_true", help="Replace the cube in the mujoco environment")


parser.add_argument("--num-episodes", type=int, default=1)
parser.add_argument("--num-frames", type=int, default=20)
parser.add_argument("--num-workers", type=int, default=8)
parser.add_argument("--keep-last", action="store_true")
Expand All @@ -362,7 +388,6 @@ def mujoco_replace_cube(model, data):
args = parser.parse_args()

repo_id = args.repo_id
num_episodes = args.num_episodes
num_frames = args.num_frames
revision = args.revision
fps = args.fps
Expand All @@ -373,7 +398,8 @@ def mujoco_replace_cube(model, data):
test_read_leader_position()
exit()

## test the teleoperation

## create the leader and follower motors
leader = DynamixelMotorsBus(
port=args.leader_port,
motors={
Expand All @@ -400,11 +426,13 @@ def mujoco_replace_cube(model, data):
},
)

## create cameras which are instantiated to the mujoco environment in the simulated follower robot class
cameras = {
"image_top": SimCamera(id_camera="camera_top", model=follower.model, data=follower.data, camera_index=0, fps=30, width=640, height=480),
"image_front": SimCamera(id_camera="camera_front", model=follower.model, data=follower.data, camera_index=1, fps=30, width=640, height=480),
}

## define the path to store the data
DATA_DIR = pathlib.Path("data_traces")
out_data = DATA_DIR / repo_id

Expand All @@ -422,13 +450,15 @@ def mujoco_replace_cube(model, data):
os.makedirs(videos_dir, exist_ok=True)


## Define the episode data index and dictionaries to store the data
ep_dicts = []
episode_data_index = {"from": [], "to": []}
ep_fps = []
id_from = 0
id_to = 0

### start the teleoperation

## start the mujoco environment and start the teleoperation
ep_idx = 0
with mujoco.viewer.launch_passive(follower.model, follower.data, key_callback=key_callback) as viewer:

Expand All @@ -441,14 +471,15 @@ def mujoco_replace_cube(model, data):
while stop_record == False:

# sample the initial position of the cube
mujoco_replace_cube(follower.model, follower.data)
if args.mujoco_replace_cube:
mujoco_replace_cube(follower.model, follower.data)

# Instantiate the episode data storage
obs_replay = {}
obs_replay["observation"] = []
obs_replay["action"] = []
obs_replay["image_top"] = []
obs_replay["image_front"] = []

timestamps = []
start_time = time.time()

Expand All @@ -471,7 +502,7 @@ def mujoco_replace_cube(model, data):
# os.system(f'spd-say "saving episode"')
ep_dict = {}

# store images in png and create the video
# store the images of the episode in .png and create the video
for img_key in ["image_top", "image_front"]:
save_images_concurrently(
obs_replay[img_key],
Expand All @@ -483,6 +514,7 @@ def mujoco_replace_cube(model, data):
# store the reference to the video frame
ep_dict[f"observation.{img_key}"] = [{"path": f"videos/{fname}", "timestamp": tstp} for tstp in timestamps]

# store the state, action, episode index, frame index, timestamp and next done
state = torch.tensor(np.array(obs_replay["observation"]))
action = torch.tensor(np.array(obs_replay["action"]))
next_done = torch.zeros(num_frames, dtype=torch.bool)
Expand All @@ -497,19 +529,22 @@ def mujoco_replace_cube(model, data):
ep_fps.append(num_frames / timestamps[-1])
print(f"Episode {ep_idx} done, fps: {ep_fps[-1]:.2f}")

## store the episode data index
episode_data_index["from"].append(id_from)
episode_data_index["to"].append(id_from + num_frames if args.keep_last else id_from + num_frames - 1)

## update the episode data index
id_to = id_from + num_frames if args.keep_last else id_from + num_frames - 1
id_from = id_to

## store the episode data in the overall data list
ep_dicts.append(ep_dict)
ep_idx += 1

### end the teleoperation
## end the teleoperation
robot.disconnect()

# os.system('spd-say "encode video frames"')
## encode the images to videos for all the episodes
for idx in range(ep_idx):
for img_key in ["image_top", "image_front"]:
encode_video_frames(
Expand All @@ -518,11 +553,11 @@ def mujoco_replace_cube(model, data):
ep_fps[idx],
)

#os.system('spd-say "concatenate episodes"')
## concatenate the episodes and store the data
data_dict = concatenate_episodes(ep_dicts) # Since our fps varies we are sometimes off tolerance for the last frame

## store the data in the dataset format in a features dictionary
features = {}

keys = [key for key in data_dict if "observation.image_" in key]
for key in keys:
features[key.replace("observation.image_", "observation.images.")] = VideoFrame()
Expand All @@ -536,17 +571,19 @@ def mujoco_replace_cube(model, data):
features["timestamp"] = Value(dtype="float32", id=None)
features["next.done"] = Value(dtype="bool", id=None)
features["index"] = Value(dtype="int64", id=None)


## store the data in the dataset format
hf_dataset = Dataset.from_dict(data_dict, features=Features(features))
hf_dataset.set_transform(hf_transform_to_torch)

## store the meta data
info = {
#"fps": sum(ep_fps) / len(ep_fps), # to have a good tolerance in data processing for the slowest video
"fps": 24, # to have a good tolerance in data processing for the slowest video
"video": ep_idx,
}

#os.system('spd-say "from preloaded"')
## store the data in the LeRobotDataset format
lerobot_dataset = LeRobotDataset.from_preloaded(
repo_id=repo_id,
hf_dataset=hf_dataset,
Expand All @@ -555,15 +592,17 @@ def mujoco_replace_cube(model, data):
videos_dir=videos_dir,
)

#os.system('spd-say "compute stats"')
## compute the stats and save the meta data
stats = compute_stats(lerobot_dataset, num_workers=args.num_workers)
save_meta_data(info, stats, episode_data_index, meta_data_dir)

#os.system('spd-say "save to disk"')

## save the data in the dataset format in disk
hf_dataset = hf_dataset.with_format(None) # to remove transforms that cant be saved
hf_dataset.save_to_disk(str(out_data / "train"))

args.push_to_hub = True

## push the data to the hub
if args.push_to_hub:

repo_name = f"{repo_id}/lowcostrobot-mujoco-pickplace"
Expand Down

0 comments on commit c81bbd4

Please sign in to comment.