From 97908be86b3b14860539e0b5fe8d7e505474c242 Mon Sep 17 00:00:00 2001 From: Julien Perez Date: Thu, 20 Jun 2024 09:04:19 +0200 Subject: [PATCH] Implement tele-operation over gym environment with a physical leader arm from Alex. Introduce the interface module to regroup physical robot interaction The code as been adapted from the orginal repo of Alex Some prefix renaming of some example scripts --- examples/gym_test_ik.py | 44 +++ examples/lerobotdataset_load.py | 2 - examples/mujoco_simple_invk.py | 2 +- examples/simple.py | 15 - examples/teleoperate_simulated_robot.py | 54 +++ examples/test_ik.py | 41 --- gym_lowcostrobot/interface/dynamixel.py | 313 ++++++++++++++++++ gym_lowcostrobot/interface/robot.py | 175 ++++++++++ .../{ => interface}/simulated_robot.py | 0 9 files changed, 587 insertions(+), 59 deletions(-) create mode 100644 examples/gym_test_ik.py delete mode 100644 examples/simple.py create mode 100644 examples/teleoperate_simulated_robot.py delete mode 100644 examples/test_ik.py create mode 100644 gym_lowcostrobot/interface/dynamixel.py create mode 100644 gym_lowcostrobot/interface/robot.py rename gym_lowcostrobot/{ => interface}/simulated_robot.py (100%) diff --git a/examples/gym_test_ik.py b/examples/gym_test_ik.py new file mode 100644 index 0000000..5403b20 --- /dev/null +++ b/examples/gym_test_ik.py @@ -0,0 +1,44 @@ +import gymnasium as gym +import gym_lowcostrobot # Import the low-cost robot environments +import numpy as np + +def displace_object(square_size=0.15, invert_y=False, origin_pos=[0, 0.1]): + ### Sample a position in a square in front of the robot + if not invert_y: + x = np.random.uniform(origin_pos[0] - square_size / 2, origin_pos[0] + square_size / 2) + y = np.random.uniform(origin_pos[1] - square_size / 2, origin_pos[1] + square_size / 2) + else: + x = np.random.uniform(origin_pos[0] + square_size / 2, origin_pos[0] - square_size / 2) + y = np.random.uniform(origin_pos[1] + square_size / 2, origin_pos[1] - square_size / 2) + env.data.qpos[:3] = np.array([x, y, origin_pos[2]]) + return env.data.qpos[:3] + +if __name__ == "__main__": + + # Create the environment + env = gym.make("ReachCube-v0", render_mode="human", action_mode="ee") + + # Reset the environment + observation, info = env.reset() + cube_origin_pos = env.data.qpos[:3].astype(np.float32) + for i in range(10000): + if i % 500 == 0: + cube_pos = displace_object(square_size=0.2, invert_y=False, origin_pos=cube_origin_pos) + + # Sample random action + action = env.action_space.sample() + ee_id = env.model.body("moving_side").id + ee_pos = env.data.xpos[ee_id].astype(np.float32) # default [0.03390873 0.22571199 0.14506643] + action[:3] = cube_pos + [0,0,0.1] - ee_pos + # action[:3] = [0.03390873, 0.22571199, 0.14506643] + action[3] = -1.5 + + # Step the environment + observation, reward, terminted, truncated, info = env.step(action) + + # Reset the environment if it's done + if terminted or truncated: + observation, info = env.reset() + + # Close the environment + env.close() \ No newline at end of file diff --git a/examples/lerobotdataset_load.py b/examples/lerobotdataset_load.py index 07ba239..05a015b 100644 --- a/examples/lerobotdataset_load.py +++ b/examples/lerobotdataset_load.py @@ -5,8 +5,6 @@ from lerobot.common.datasets.lerobot_dataset import LeRobotDataset - - def main(): # You can easily load a dataset from a Hugging Face repository diff --git a/examples/mujoco_simple_invk.py b/examples/mujoco_simple_invk.py index a6adbca..2324092 100644 --- a/examples/mujoco_simple_invk.py +++ b/examples/mujoco_simple_invk.py @@ -5,7 +5,7 @@ import mujoco.viewer import numpy as np -from gym_lowcostrobot.simulated_robot import LevenbegMarquardtIK +from gym_lowcostrobot.interface.simulated_robot import LevenbegMarquardtIK def displace_object(data, m, object_id, viewer, square_size=0.2, invert_y=False, origin_pos=[0, 0.1]): diff --git a/examples/simple.py b/examples/simple.py deleted file mode 100644 index 097fec9..0000000 --- a/examples/simple.py +++ /dev/null @@ -1,15 +0,0 @@ -import gymnasium as gym - -import gym_lowcostrobot # noqa - -env = gym.make("LiftCube-v0", render_mode=None, observation_mode="state", action_mode="ee") - -env.reset() - -for _ in range(1000): - action = env.action_space.sample() - observation, reward, terminated, truncated, info = env.step(action) - print(observation, reward, terminated, truncated, info) - if terminated or truncated: - env.reset() -env.close() diff --git a/examples/teleoperate_simulated_robot.py b/examples/teleoperate_simulated_robot.py new file mode 100644 index 0000000..101efeb --- /dev/null +++ b/examples/teleoperate_simulated_robot.py @@ -0,0 +1,54 @@ +import time +import numpy as np +import threading +import mujoco.viewer + +from gym_lowcostrobot.interface.dynamixel import Dynamixel +from gym_lowcostrobot.interface.robot import Robot +from gym_lowcostrobot.interface.simulated_robot import SimulatedRobot + +def read_leader_position(): + global target_pos + while True: + target_pos = np.array(leader.read_position()) + target_pos = (target_pos / 2048 - 1) * 3.14 + target_pos[1] = -target_pos[1] + target_pos[3] = -target_pos[3] + target_pos[4] = -target_pos[4] + +if __name__ == '__main__': + + leader_dynamixel = Dynamixel.Config(baudrate=1_000_000, device_name='/dev/tty.usbmodem57380045631').instantiate() + leader = Robot(leader_dynamixel, servo_ids=[1, 2, 3, 6, 7]) + leader.set_trigger_torque() + + m = mujoco.MjModel.from_xml_path('gym_lowcostrobot/assets/low_cost_robot_6dof/reach_cube.xml') + d = mujoco.MjData(m) + r = SimulatedRobot(m, d) + + target_pos = np.zeros(5) + + # Start the thread for reading leader position + leader_thread = threading.Thread(target=read_leader_position) + leader_thread.daemon = True + leader_thread.start() + + with mujoco.viewer.launch_passive(m, d) as viewer: + start = time.time() + while viewer.is_running(): + # Use the latest target_pos + step_start = time.time() + target_pos_local = target_pos.copy() + # print(f'target pos copy {time.time() - step_start}') + r.set_target_pos(target_pos_local) + # print(f'set targtee pos copy {time.time() - step_start}') + mujoco.mj_step(m, d) + # print(f'mjstep {time.time() - step_start}') + viewer.sync() + # print(f'viewer sync {time.time() - step_start}') + + # Rudimentary time keeping, will drift relative to wall clock. + time_until_next_step = m.opt.timestep - (time.time() - step_start) + # print(f'time until next step {time_until_next_step}') + if time_until_next_step > 0: + time.sleep(time_until_next_step) \ No newline at end of file diff --git a/examples/test_ik.py b/examples/test_ik.py deleted file mode 100644 index d044f4a..0000000 --- a/examples/test_ik.py +++ /dev/null @@ -1,41 +0,0 @@ -import gymnasium as gym -import gym_lowcostrobot # Import the low-cost robot environments -import numpy as np - -def displace_object(square_size=0.15, invert_y=False, origin_pos=[0, 0.1]): - ### Sample a position in a square in front of the robot - if not invert_y: - x = np.random.uniform(origin_pos[0] - square_size / 2, origin_pos[0] + square_size / 2) - y = np.random.uniform(origin_pos[1] - square_size / 2, origin_pos[1] + square_size / 2) - else: - x = np.random.uniform(origin_pos[0] + square_size / 2, origin_pos[0] - square_size / 2) - y = np.random.uniform(origin_pos[1] + square_size / 2, origin_pos[1] - square_size / 2) - env.data.qpos[:3] = np.array([x, y, origin_pos[2]]) - return env.data.qpos[:3] - -# Create the environment -env = gym.make("ReachCube-v0", render_mode="human", action_mode="ee") - -# Reset the environment -observation, info = env.reset() -cube_origin_pos = env.data.qpos[:3].astype(np.float32) -for i in range(10000): - if i % 500 == 0: - cube_pos = displace_object(square_size=0.2, invert_y=False, origin_pos=cube_origin_pos) - # Sample random action - action = env.action_space.sample() - ee_id = env.model.body("moving_side").id - ee_pos = env.data.xpos[ee_id].astype(np.float32) # default [0.03390873 0.22571199 0.14506643] - action[:3] = cube_pos + [0,0,0.1] - ee_pos - # action[:3] = [0.03390873, 0.22571199, 0.14506643] - action[3] = -1.5 - - # Step the environment - observation, reward, terminted, truncated, info = env.step(action) - - # Reset the environment if it's done - if terminted or truncated: - observation, info = env.reset() - -# Close the environment -env.close() \ No newline at end of file diff --git a/gym_lowcostrobot/interface/dynamixel.py b/gym_lowcostrobot/interface/dynamixel.py new file mode 100644 index 0000000..9942504 --- /dev/null +++ b/gym_lowcostrobot/interface/dynamixel.py @@ -0,0 +1,313 @@ +from __future__ import annotations +import math +import os +from dynamixel_sdk import * # Uses Dynamixel SDK library +from dataclasses import dataclass +import enum + + +class ReadAttribute(enum.Enum): + TEMPERATURE = 146 + VOLTAGE = 145 + VELOCITY = 128 + POSITION = 132 + CURRENT = 126 + PWM = 124 + HARDWARE_ERROR_STATUS = 70 + HOMING_OFFSET = 20 + BAUDRATE = 8 + + +class OperatingMode(enum.Enum): + VELOCITY = 1 + POSITION = 3 + CURRENT_CONTROLLED_POSITION = 5 + PWM = 16 + UNKNOWN = -1 + + +class Dynamixel: + ADDR_TORQUE_ENABLE = 64 + ADDR_GOAL_POSITION = 116 + ADDR_VELOCITY_LIMIT = 44 + ADDR_GOAL_PWM = 100 + OPERATING_MODE_ADDR = 11 + POSITION_I = 82 + POSITION_P = 84 + ADDR_ID = 7 + + @dataclass + class Config: + def instantiate(self): return Dynamixel(self) + + baudrate: int = 57600 + protocol_version: float = 2.0 + device_name: str = '' # /dev/tty.usbserial-1120' + dynamixel_id: int = 1 + + def __init__(self, config: Config): + self.config = config + self.connect() + + def connect(self): + if self.config.device_name == '': + for port_name in os.listdir('/dev'): + if 'ttyUSB' in port_name or 'ttyACM' in port_name: + self.config.device_name = '/dev/' + port_name + print(f'using device {self.config.device_name}') + self.portHandler = PortHandler(self.config.device_name) + # self.portHandler.LA + self.packetHandler = PacketHandler(self.config.protocol_version) + if not self.portHandler.openPort(): + raise Exception(f'Failed to open port {self.config.device_name}') + + if not self.portHandler.setBaudRate(self.config.baudrate): + raise Exception(f'failed to set baudrate to {self.config.baudrate}') + + # self.operating_mode = OperatingMode.UNKNOWN + # self.torque_enabled = False + # self._disable_torque() + + self.operating_modes = [None for _ in range(32)] + self.torque_enabled = [None for _ in range(32)] + return True + + def disconnect(self): + self.portHandler.closePort() + + def set_goal_position(self, motor_id, goal_position): + # if self.operating_modes[motor_id] is not OperatingMode.POSITION: + # self._disable_torque(motor_id) + # self.set_operating_mode(motor_id, OperatingMode.POSITION) + + # if not self.torque_enabled[motor_id]: + # self._enable_torque(motor_id) + + # self._enable_torque(motor_id) + dxl_comm_result, dxl_error = self.packetHandler.write4ByteTxRx(self.portHandler, motor_id, + self.ADDR_GOAL_POSITION, goal_position) + # self._process_response(dxl_comm_result, dxl_error) + # print(f'set position of motor {motor_id} to {goal_position}') + + def set_pwm_value(self, motor_id: int, pwm_value, tries=3): + if self.operating_modes[motor_id] is not OperatingMode.PWM: + self._disable_torque(motor_id) + self.set_operating_mode(motor_id, OperatingMode.PWM) + + if not self.torque_enabled[motor_id]: + self._enable_torque(motor_id) + # print(f'enabling torque') + dxl_comm_result, dxl_error = self.packetHandler.write2ByteTxRx(self.portHandler, motor_id, + self.ADDR_GOAL_PWM, pwm_value) + # self._process_response(dxl_comm_result, dxl_error) + # print(f'set pwm of motor {motor_id} to {pwm_value}') + if dxl_comm_result != COMM_SUCCESS: + if tries <= 1: + raise ConnectionError(f"dxl_comm_result: {self.packetHandler.getTxRxResult(dxl_comm_result)}") + else: + print(f'dynamixel pwm setting failure trying again with {tries - 1} tries') + self.set_pwm_value(motor_id, pwm_value, tries=tries - 1) + elif dxl_error != 0: + print(f'dxl error {dxl_error}') + raise ConnectionError(f"dynamixel error: {self.packetHandler.getTxRxResult(dxl_error)}") + + def read_temperature(self, motor_id: int): + return self._read_value(motor_id, ReadAttribute.TEMPERATURE, 1) + + def read_velocity(self, motor_id: int): + pos = self._read_value(motor_id, ReadAttribute.VELOCITY, 4) + if pos > 2 ** 31: + pos -= 2 ** 32 + # print(f'read position {pos} for motor {motor_id}') + return pos + + def read_position(self, motor_id: int): + pos = self._read_value(motor_id, ReadAttribute.POSITION, 4) + if pos > 2 ** 31: + pos -= 2 ** 32 + # print(f'read position {pos} for motor {motor_id}') + return pos + + def read_position_degrees(self, motor_id: int) -> float: + return (self.read_position(motor_id) / 4096) * 360 + + def read_position_radians(self, motor_id: int) -> float: + return (self.read_position(motor_id) / 4096) * 2 * math.pi + + def read_current(self, motor_id: int): + current = self._read_value(motor_id, ReadAttribute.CURRENT, 2) + if current > 2 ** 15: + current -= 2 ** 16 + return current + + def read_present_pwm(self, motor_id: int): + return self._read_value(motor_id, ReadAttribute.PWM, 2) + + def read_hardware_error_status(self, motor_id: int): + return self._read_value(motor_id, ReadAttribute.HARDWARE_ERROR_STATUS, 1) + + def disconnect(self): + self.portHandler.closePort() + + def set_id(self, old_id, new_id, use_broadcast_id: bool = False): + """ + sets the id of the dynamixel servo + @param old_id: current id of the servo + @param new_id: new id + @param use_broadcast_id: set ids of all connected dynamixels if True. + If False, change only servo with self.config.id + @return: + """ + if use_broadcast_id: + current_id = 254 + else: + current_id = old_id + dxl_comm_result, dxl_error = self.packetHandler.write1ByteTxRx(self.portHandler, + current_id, self.ADDR_ID, new_id) + self._process_response(dxl_comm_result, dxl_error, old_id) + self.config.id = id + + def _enable_torque(self, motor_id): + dxl_comm_result, dxl_error = self.packetHandler.write1ByteTxRx(self.portHandler, motor_id, + self.ADDR_TORQUE_ENABLE, 1) + self._process_response(dxl_comm_result, dxl_error, motor_id) + self.torque_enabled[motor_id] = True + + def _disable_torque(self, motor_id): + dxl_comm_result, dxl_error = self.packetHandler.write1ByteTxRx(self.portHandler, motor_id, + self.ADDR_TORQUE_ENABLE, 0) + self._process_response(dxl_comm_result, dxl_error, motor_id) + self.torque_enabled[motor_id] = False + + def _process_response(self, dxl_comm_result: int, dxl_error: int, motor_id: int): + if dxl_comm_result != COMM_SUCCESS: + raise ConnectionError( + f"dxl_comm_result for motor {motor_id}: {self.packetHandler.getTxRxResult(dxl_comm_result)}") + elif dxl_error != 0: + print(f'dxl error {dxl_error}') + raise ConnectionError( + f"dynamixel error for motor {motor_id}: {self.packetHandler.getTxRxResult(dxl_error)}") + + def set_operating_mode(self, motor_id: int, operating_mode: OperatingMode): + dxl_comm_result, dxl_error = self.packetHandler.write2ByteTxRx(self.portHandler, motor_id, + self.OPERATING_MODE_ADDR, operating_mode.value) + self._process_response(dxl_comm_result, dxl_error, motor_id) + self.operating_modes[motor_id] = operating_mode + + def set_pwm_limit(self, motor_id: int, limit: int): + dxl_comm_result, dxl_error = self.packetHandler.write2ByteTxRx(self.portHandler, motor_id, + 36, limit) + self._process_response(dxl_comm_result, dxl_error, motor_id) + + def set_velocity_limit(self, motor_id: int, velocity_limit): + dxl_comm_result, dxl_error = self.packetHandler.write4ByteTxRx(self.portHandler, motor_id, + self.ADDR_VELOCITY_LIMIT, velocity_limit) + self._process_response(dxl_comm_result, dxl_error, motor_id) + + def set_P(self, motor_id: int, P: int): + dxl_comm_result, dxl_error = self.packetHandler.write2ByteTxRx(self.portHandler, motor_id, + self.POSITION_P, P) + self._process_response(dxl_comm_result, dxl_error, motor_id) + + def set_I(self, motor_id: int, I: int): + dxl_comm_result, dxl_error = self.packetHandler.write2ByteTxRx(self.portHandler, motor_id, + self.POSITION_I, I) + self._process_response(dxl_comm_result, dxl_error, motor_id) + + def read_home_offset(self, motor_id: int): + self._disable_torque(motor_id) + # dxl_comm_result, dxl_error = self.packetHandler.write4ByteTxRx(self.portHandler, motor_id, + # ReadAttribute.HOMING_OFFSET.value, home_position) + home_offset = self._read_value(motor_id, ReadAttribute.HOMING_OFFSET, 4) + # self._process_response(dxl_comm_result, dxl_error) + self._enable_torque(motor_id) + return home_offset + + def set_home_offset(self, motor_id: int, home_position: int): + self._disable_torque(motor_id) + dxl_comm_result, dxl_error = self.packetHandler.write4ByteTxRx(self.portHandler, motor_id, + ReadAttribute.HOMING_OFFSET.value, home_position) + self._process_response(dxl_comm_result, dxl_error, motor_id) + self._enable_torque(motor_id) + + def set_baudrate(self, motor_id: int, baudrate): + # translate baudrate into dynamixel baudrate setting id + if baudrate == 57600: + baudrate_id = 1 + elif baudrate == 1_000_000: + baudrate_id = 3 + elif baudrate == 2_000_000: + baudrate_id = 4 + elif baudrate == 3_000_000: + baudrate_id = 5 + elif baudrate == 4_000_000: + baudrate_id = 6 + else: + raise Exception('baudrate not implemented') + + self._disable_torque(motor_id) + dxl_comm_result, dxl_error = self.packetHandler.write1ByteTxRx(self.portHandler, motor_id, + ReadAttribute.BAUDRATE.value, baudrate_id) + self._process_response(dxl_comm_result, dxl_error, motor_id) + + def _read_value(self, motor_id, attribute: ReadAttribute, num_bytes: int, tries=10): + try: + if num_bytes == 1: + value, dxl_comm_result, dxl_error = self.packetHandler.read1ByteTxRx(self.portHandler, + motor_id, + attribute.value) + elif num_bytes == 2: + value, dxl_comm_result, dxl_error = self.packetHandler.read2ByteTxRx(self.portHandler, + motor_id, + attribute.value) + elif num_bytes == 4: + value, dxl_comm_result, dxl_error = self.packetHandler.read4ByteTxRx(self.portHandler, + motor_id, + attribute.value) + except Exception: + if tries == 0: + raise Exception + else: + return self._read_value(motor_id, attribute, num_bytes, tries=tries - 1) + if dxl_comm_result != COMM_SUCCESS: + if tries <= 1: + # print("%s" % self.packetHandler.getTxRxResult(dxl_comm_result)) + raise ConnectionError(f'dxl_comm_result {dxl_comm_result} for servo {motor_id} value {value}') + else: + print(f'dynamixel read failure for servo {motor_id} trying again with {tries - 1} tries') + time.sleep(0.02) + return self._read_value(motor_id, attribute, num_bytes, tries=tries - 1) + elif dxl_error != 0: # # print("%s" % self.packetHandler.getRxPacketError(dxl_error)) + # raise ConnectionError(f'dxl_error {dxl_error} binary ' + "{0:b}".format(37)) + if tries == 0 and dxl_error != 128: + raise Exception(f'Failed to read value from motor {motor_id} error is {dxl_error}') + else: + return self._read_value(motor_id, attribute, num_bytes, tries=tries - 1) + return value + + def set_home_position(self, motor_id: int): + print(f'setting home position for motor {motor_id}') + self.set_home_offset(motor_id, 0) + current_position = self.read_position(motor_id) + print(f'position before {current_position}') + self.set_home_offset(motor_id, -current_position) + # dynamixel.set_home_offset(motor_id, -4096) + # dynamixel.set_home_offset(motor_id, -4294964109) + current_position = self.read_position(motor_id) + # print(f'signed position {current_position - 2** 32}') + print(f'position after {current_position}') + + +if __name__ == "__main__": + dynamixel = Dynamixel.Config( + baudrate=1_000_000, + device_name='/dev/tty.usbmodem57380045631' + ).instantiate() + motor_id = 1 + pos = dynamixel.read_position(motor_id) + for i in range(10): + s = time.monotonic() + pos = dynamixel.read_position(motor_id) + delta = time.monotonic() - s + print(f'read position took {delta}') + print(f'position {pos}') \ No newline at end of file diff --git a/gym_lowcostrobot/interface/robot.py b/gym_lowcostrobot/interface/robot.py new file mode 100644 index 0000000..523d996 --- /dev/null +++ b/gym_lowcostrobot/interface/robot.py @@ -0,0 +1,175 @@ +import numpy as np +from dynamixel import Dynamixel, OperatingMode, ReadAttribute +import time +from dynamixel_sdk import GroupSyncRead, GroupSyncWrite, DXL_LOBYTE, DXL_HIBYTE, DXL_LOWORD, DXL_HIWORD +from enum import Enum, auto +from typing import Union + + +class MotorControlType(Enum): + PWM = auto() + POSITION_CONTROL = auto() + DISABLED = auto() + UNKNOWN = auto() + + +class Robot: + # def __init__(self, device_name: str, baudrate=1_000_000, servo_ids=[1, 2, 3, 4, 5]): + def __init__(self, dynamixel, baudrate=1_000_000, servo_ids=[1, 2, 3, 4, 5]): + self.servo_ids = servo_ids + self.dynamixel = dynamixel + # self.dynamixel = Dynamixel.Config(baudrate=baudrate, device_name=device_name).instantiate() + self.position_reader = GroupSyncRead( + self.dynamixel.portHandler, + self.dynamixel.packetHandler, + ReadAttribute.POSITION.value, + 4) + for id in self.servo_ids: + self.position_reader.addParam(id) + + self.velocity_reader = GroupSyncRead( + self.dynamixel.portHandler, + self.dynamixel.packetHandler, + ReadAttribute.VELOCITY.value, + 4) + for id in self.servo_ids: + self.velocity_reader.addParam(id) + + self.pos_writer = GroupSyncWrite( + self.dynamixel.portHandler, + self.dynamixel.packetHandler, + self.dynamixel.ADDR_GOAL_POSITION, + 4) + for id in self.servo_ids: + self.pos_writer.addParam(id, [2048]) + + self.pwm_writer = GroupSyncWrite( + self.dynamixel.portHandler, + self.dynamixel.packetHandler, + self.dynamixel.ADDR_GOAL_PWM, + 2) + for id in self.servo_ids: + self.pwm_writer.addParam(id, [2048]) + self._disable_torque() + self.motor_control_state = MotorControlType.DISABLED + + def read_position(self, tries=2): + """ + Reads the joint positions of the robot. 2048 is the center position. 0 and 4096 are 180 degrees in each direction. + :param tries: maximum number of tries to read the position + :return: list of joint positions in range [0, 4096] + """ + result = self.position_reader.txRxPacket() + if result != 0: + if tries > 0: + return self.read_position(tries=tries - 1) + else: + print(f'failed to read position!!!!!!!!!!!!!!!!!!!!!!!!!!!!!') + positions = [] + for id in self.servo_ids: + position = self.position_reader.getData(id, ReadAttribute.POSITION.value, 4) + if position > 2 ** 31: + position -= 2 ** 32 + positions.append(position) + return positions + + def read_velocity(self): + """ + Reads the joint velocities of the robot. + :return: list of joint velocities, + """ + self.velocity_reader.txRxPacket() + velocties = [] + for id in self.servo_ids: + velocity = self.velocity_reader.getData(id, ReadAttribute.VELOCITY.value, 4) + if velocity > 2 ** 31: + velocity -= 2 ** 32 + velocties.append(velocity) + return velocties + + def set_goal_pos(self, action): + """ + + :param action: list or numpy array of target joint positions in range [0, 4096] + """ + if not self.motor_control_state is MotorControlType.POSITION_CONTROL: + self._set_position_control() + for i, motor_id in enumerate(self.servo_ids): + data_write = [DXL_LOBYTE(DXL_LOWORD(action[i])), + DXL_HIBYTE(DXL_LOWORD(action[i])), + DXL_LOBYTE(DXL_HIWORD(action[i])), + DXL_HIBYTE(DXL_HIWORD(action[i]))] + self.pos_writer.changeParam(motor_id, data_write) + + self.pos_writer.txPacket() + + def set_pwm(self, action): + """ + Sets the pwm values for the servos. + :param action: list or numpy array of pwm values in range [0, 885] + """ + if not self.motor_control_state is MotorControlType.PWM: + self._set_pwm_control() + for i, motor_id in enumerate(self.servo_ids): + data_write = [DXL_LOBYTE(DXL_LOWORD(action[i])), + DXL_HIBYTE(DXL_LOWORD(action[i])), + ] + self.pwm_writer.changeParam(motor_id, data_write) + + self.pwm_writer.txPacket() + + def set_trigger_torque(self): + """ + Sets a constant torque torque for the last servo in the chain. This is useful for the trigger of the leader arm + """ + self.dynamixel._enable_torque(self.servo_ids[-1]) + self.dynamixel.set_pwm_value(self.servo_ids[-1], 200) + + def limit_pwm(self, limit: Union[int, list, np.ndarray]): + """ + Limits the pwm values for the servos in for position control + @param limit: 0 ~ 885 + @return: + """ + if isinstance(limit, int): + limits = [limit, ] * 5 + else: + limits = limit + self._disable_torque() + for motor_id, limit in zip(self.servo_ids, limits): + self.dynamixel.set_pwm_limit(motor_id, limit) + self._enable_torque() + + def _disable_torque(self): + print(f'disabling torque for servos {self.servo_ids}') + for motor_id in self.servo_ids: + self.dynamixel._disable_torque(motor_id) + + def _enable_torque(self): + print(f'enabling torque for servos {self.servo_ids}') + for motor_id in self.servo_ids: + self.dynamixel._enable_torque(motor_id) + + def _set_pwm_control(self): + self._disable_torque() + for motor_id in self.servo_ids: + self.dynamixel.set_operating_mode(motor_id, OperatingMode.PWM) + self._enable_torque() + self.motor_control_state = MotorControlType.PWM + + def _set_position_control(self): + self._disable_torque() + for motor_id in self.servo_ids: + self.dynamixel.set_operating_mode(motor_id, OperatingMode.POSITION) + self._enable_torque() + self.motor_control_state = MotorControlType.POSITION_CONTROL + + +if __name__ == "__main__": + robot = Robot(device_name='/dev/tty.usbmodem57380045631') + robot._disable_torque() + for _ in range(10000): + s = time.time() + pos = robot.read_position() + elapsed = time.time() - s + print(f'read took {elapsed} pos {pos}') \ No newline at end of file diff --git a/gym_lowcostrobot/simulated_robot.py b/gym_lowcostrobot/interface/simulated_robot.py similarity index 100% rename from gym_lowcostrobot/simulated_robot.py rename to gym_lowcostrobot/interface/simulated_robot.py