diff --git a/examples/gym_test_ik.py b/examples/gym_test_ik.py new file mode 100644 index 0000000..5403b20 --- /dev/null +++ b/examples/gym_test_ik.py @@ -0,0 +1,44 @@ +import gymnasium as gym +import gym_lowcostrobot # Import the low-cost robot environments +import numpy as np + +def displace_object(square_size=0.15, invert_y=False, origin_pos=[0, 0.1]): + ### Sample a position in a square in front of the robot + if not invert_y: + x = np.random.uniform(origin_pos[0] - square_size / 2, origin_pos[0] + square_size / 2) + y = np.random.uniform(origin_pos[1] - square_size / 2, origin_pos[1] + square_size / 2) + else: + x = np.random.uniform(origin_pos[0] + square_size / 2, origin_pos[0] - square_size / 2) + y = np.random.uniform(origin_pos[1] + square_size / 2, origin_pos[1] - square_size / 2) + env.data.qpos[:3] = np.array([x, y, origin_pos[2]]) + return env.data.qpos[:3] + +if __name__ == "__main__": + + # Create the environment + env = gym.make("ReachCube-v0", render_mode="human", action_mode="ee") + + # Reset the environment + observation, info = env.reset() + cube_origin_pos = env.data.qpos[:3].astype(np.float32) + for i in range(10000): + if i % 500 == 0: + cube_pos = displace_object(square_size=0.2, invert_y=False, origin_pos=cube_origin_pos) + + # Sample random action + action = env.action_space.sample() + ee_id = env.model.body("moving_side").id + ee_pos = env.data.xpos[ee_id].astype(np.float32) # default [0.03390873 0.22571199 0.14506643] + action[:3] = cube_pos + [0,0,0.1] - ee_pos + # action[:3] = [0.03390873, 0.22571199, 0.14506643] + action[3] = -1.5 + + # Step the environment + observation, reward, terminted, truncated, info = env.step(action) + + # Reset the environment if it's done + if terminted or truncated: + observation, info = env.reset() + + # Close the environment + env.close() \ No newline at end of file diff --git a/examples/lerobotdataset_load.py b/examples/lerobotdataset_load.py index 07ba239..05a015b 100644 --- a/examples/lerobotdataset_load.py +++ b/examples/lerobotdataset_load.py @@ -5,8 +5,6 @@ from lerobot.common.datasets.lerobot_dataset import LeRobotDataset - - def main(): # You can easily load a dataset from a Hugging Face repository diff --git a/examples/mujoco_simple_invk.py b/examples/mujoco_simple_invk.py index a6adbca..2324092 100644 --- a/examples/mujoco_simple_invk.py +++ b/examples/mujoco_simple_invk.py @@ -5,7 +5,7 @@ import mujoco.viewer import numpy as np -from gym_lowcostrobot.simulated_robot import LevenbegMarquardtIK +from gym_lowcostrobot.interface.simulated_robot import LevenbegMarquardtIK def displace_object(data, m, object_id, viewer, square_size=0.2, invert_y=False, origin_pos=[0, 0.1]): diff --git a/examples/simple.py b/examples/simple.py deleted file mode 100644 index 097fec9..0000000 --- a/examples/simple.py +++ /dev/null @@ -1,15 +0,0 @@ -import gymnasium as gym - -import gym_lowcostrobot # noqa - -env = gym.make("LiftCube-v0", render_mode=None, observation_mode="state", action_mode="ee") - -env.reset() - -for _ in range(1000): - action = env.action_space.sample() - observation, reward, terminated, truncated, info = env.step(action) - print(observation, reward, terminated, truncated, info) - if terminated or truncated: - env.reset() -env.close() diff --git a/examples/teleoperate_simulated_robot.py b/examples/teleoperate_simulated_robot.py new file mode 100644 index 0000000..101efeb --- /dev/null +++ b/examples/teleoperate_simulated_robot.py @@ -0,0 +1,54 @@ +import time +import numpy as np +import threading +import mujoco.viewer + +from gym_lowcostrobot.interface.dynamixel import Dynamixel +from gym_lowcostrobot.interface.robot import Robot +from gym_lowcostrobot.interface.simulated_robot import SimulatedRobot + +def read_leader_position(): + global target_pos + while True: + target_pos = np.array(leader.read_position()) + target_pos = (target_pos / 2048 - 1) * 3.14 + target_pos[1] = -target_pos[1] + target_pos[3] = -target_pos[3] + target_pos[4] = -target_pos[4] + +if __name__ == '__main__': + + leader_dynamixel = Dynamixel.Config(baudrate=1_000_000, device_name='/dev/tty.usbmodem57380045631').instantiate() + leader = Robot(leader_dynamixel, servo_ids=[1, 2, 3, 6, 7]) + leader.set_trigger_torque() + + m = mujoco.MjModel.from_xml_path('gym_lowcostrobot/assets/low_cost_robot_6dof/reach_cube.xml') + d = mujoco.MjData(m) + r = SimulatedRobot(m, d) + + target_pos = np.zeros(5) + + # Start the thread for reading leader position + leader_thread = threading.Thread(target=read_leader_position) + leader_thread.daemon = True + leader_thread.start() + + with mujoco.viewer.launch_passive(m, d) as viewer: + start = time.time() + while viewer.is_running(): + # Use the latest target_pos + step_start = time.time() + target_pos_local = target_pos.copy() + # print(f'target pos copy {time.time() - step_start}') + r.set_target_pos(target_pos_local) + # print(f'set targtee pos copy {time.time() - step_start}') + mujoco.mj_step(m, d) + # print(f'mjstep {time.time() - step_start}') + viewer.sync() + # print(f'viewer sync {time.time() - step_start}') + + # Rudimentary time keeping, will drift relative to wall clock. + time_until_next_step = m.opt.timestep - (time.time() - step_start) + # print(f'time until next step {time_until_next_step}') + if time_until_next_step > 0: + time.sleep(time_until_next_step) \ No newline at end of file diff --git a/examples/test_ik.py b/examples/test_ik.py deleted file mode 100644 index d044f4a..0000000 --- a/examples/test_ik.py +++ /dev/null @@ -1,41 +0,0 @@ -import gymnasium as gym -import gym_lowcostrobot # Import the low-cost robot environments -import numpy as np - -def displace_object(square_size=0.15, invert_y=False, origin_pos=[0, 0.1]): - ### Sample a position in a square in front of the robot - if not invert_y: - x = np.random.uniform(origin_pos[0] - square_size / 2, origin_pos[0] + square_size / 2) - y = np.random.uniform(origin_pos[1] - square_size / 2, origin_pos[1] + square_size / 2) - else: - x = np.random.uniform(origin_pos[0] + square_size / 2, origin_pos[0] - square_size / 2) - y = np.random.uniform(origin_pos[1] + square_size / 2, origin_pos[1] - square_size / 2) - env.data.qpos[:3] = np.array([x, y, origin_pos[2]]) - return env.data.qpos[:3] - -# Create the environment -env = gym.make("ReachCube-v0", render_mode="human", action_mode="ee") - -# Reset the environment -observation, info = env.reset() -cube_origin_pos = env.data.qpos[:3].astype(np.float32) -for i in range(10000): - if i % 500 == 0: - cube_pos = displace_object(square_size=0.2, invert_y=False, origin_pos=cube_origin_pos) - # Sample random action - action = env.action_space.sample() - ee_id = env.model.body("moving_side").id - ee_pos = env.data.xpos[ee_id].astype(np.float32) # default [0.03390873 0.22571199 0.14506643] - action[:3] = cube_pos + [0,0,0.1] - ee_pos - # action[:3] = [0.03390873, 0.22571199, 0.14506643] - action[3] = -1.5 - - # Step the environment - observation, reward, terminted, truncated, info = env.step(action) - - # Reset the environment if it's done - if terminted or truncated: - observation, info = env.reset() - -# Close the environment -env.close() \ No newline at end of file diff --git a/gym_lowcostrobot/interface/dynamixel.py b/gym_lowcostrobot/interface/dynamixel.py new file mode 100644 index 0000000..9942504 --- /dev/null +++ b/gym_lowcostrobot/interface/dynamixel.py @@ -0,0 +1,313 @@ +from __future__ import annotations +import math +import os +from dynamixel_sdk import * # Uses Dynamixel SDK library +from dataclasses import dataclass +import enum + + +class ReadAttribute(enum.Enum): + TEMPERATURE = 146 + VOLTAGE = 145 + VELOCITY = 128 + POSITION = 132 + CURRENT = 126 + PWM = 124 + HARDWARE_ERROR_STATUS = 70 + HOMING_OFFSET = 20 + BAUDRATE = 8 + + +class OperatingMode(enum.Enum): + VELOCITY = 1 + POSITION = 3 + CURRENT_CONTROLLED_POSITION = 5 + PWM = 16 + UNKNOWN = -1 + + +class Dynamixel: + ADDR_TORQUE_ENABLE = 64 + ADDR_GOAL_POSITION = 116 + ADDR_VELOCITY_LIMIT = 44 + ADDR_GOAL_PWM = 100 + OPERATING_MODE_ADDR = 11 + POSITION_I = 82 + POSITION_P = 84 + ADDR_ID = 7 + + @dataclass + class Config: + def instantiate(self): return Dynamixel(self) + + baudrate: int = 57600 + protocol_version: float = 2.0 + device_name: str = '' # /dev/tty.usbserial-1120' + dynamixel_id: int = 1 + + def __init__(self, config: Config): + self.config = config + self.connect() + + def connect(self): + if self.config.device_name == '': + for port_name in os.listdir('/dev'): + if 'ttyUSB' in port_name or 'ttyACM' in port_name: + self.config.device_name = '/dev/' + port_name + print(f'using device {self.config.device_name}') + self.portHandler = PortHandler(self.config.device_name) + # self.portHandler.LA + self.packetHandler = PacketHandler(self.config.protocol_version) + if not self.portHandler.openPort(): + raise Exception(f'Failed to open port {self.config.device_name}') + + if not self.portHandler.setBaudRate(self.config.baudrate): + raise Exception(f'failed to set baudrate to {self.config.baudrate}') + + # self.operating_mode = OperatingMode.UNKNOWN + # self.torque_enabled = False + # self._disable_torque() + + self.operating_modes = [None for _ in range(32)] + self.torque_enabled = [None for _ in range(32)] + return True + + def disconnect(self): + self.portHandler.closePort() + + def set_goal_position(self, motor_id, goal_position): + # if self.operating_modes[motor_id] is not OperatingMode.POSITION: + # self._disable_torque(motor_id) + # self.set_operating_mode(motor_id, OperatingMode.POSITION) + + # if not self.torque_enabled[motor_id]: + # self._enable_torque(motor_id) + + # self._enable_torque(motor_id) + dxl_comm_result, dxl_error = self.packetHandler.write4ByteTxRx(self.portHandler, motor_id, + self.ADDR_GOAL_POSITION, goal_position) + # self._process_response(dxl_comm_result, dxl_error) + # print(f'set position of motor {motor_id} to {goal_position}') + + def set_pwm_value(self, motor_id: int, pwm_value, tries=3): + if self.operating_modes[motor_id] is not OperatingMode.PWM: + self._disable_torque(motor_id) + self.set_operating_mode(motor_id, OperatingMode.PWM) + + if not self.torque_enabled[motor_id]: + self._enable_torque(motor_id) + # print(f'enabling torque') + dxl_comm_result, dxl_error = self.packetHandler.write2ByteTxRx(self.portHandler, motor_id, + self.ADDR_GOAL_PWM, pwm_value) + # self._process_response(dxl_comm_result, dxl_error) + # print(f'set pwm of motor {motor_id} to {pwm_value}') + if dxl_comm_result != COMM_SUCCESS: + if tries <= 1: + raise ConnectionError(f"dxl_comm_result: {self.packetHandler.getTxRxResult(dxl_comm_result)}") + else: + print(f'dynamixel pwm setting failure trying again with {tries - 1} tries') + self.set_pwm_value(motor_id, pwm_value, tries=tries - 1) + elif dxl_error != 0: + print(f'dxl error {dxl_error}') + raise ConnectionError(f"dynamixel error: {self.packetHandler.getTxRxResult(dxl_error)}") + + def read_temperature(self, motor_id: int): + return self._read_value(motor_id, ReadAttribute.TEMPERATURE, 1) + + def read_velocity(self, motor_id: int): + pos = self._read_value(motor_id, ReadAttribute.VELOCITY, 4) + if pos > 2 ** 31: + pos -= 2 ** 32 + # print(f'read position {pos} for motor {motor_id}') + return pos + + def read_position(self, motor_id: int): + pos = self._read_value(motor_id, ReadAttribute.POSITION, 4) + if pos > 2 ** 31: + pos -= 2 ** 32 + # print(f'read position {pos} for motor {motor_id}') + return pos + + def read_position_degrees(self, motor_id: int) -> float: + return (self.read_position(motor_id) / 4096) * 360 + + def read_position_radians(self, motor_id: int) -> float: + return (self.read_position(motor_id) / 4096) * 2 * math.pi + + def read_current(self, motor_id: int): + current = self._read_value(motor_id, ReadAttribute.CURRENT, 2) + if current > 2 ** 15: + current -= 2 ** 16 + return current + + def read_present_pwm(self, motor_id: int): + return self._read_value(motor_id, ReadAttribute.PWM, 2) + + def read_hardware_error_status(self, motor_id: int): + return self._read_value(motor_id, ReadAttribute.HARDWARE_ERROR_STATUS, 1) + + def disconnect(self): + self.portHandler.closePort() + + def set_id(self, old_id, new_id, use_broadcast_id: bool = False): + """ + sets the id of the dynamixel servo + @param old_id: current id of the servo + @param new_id: new id + @param use_broadcast_id: set ids of all connected dynamixels if True. + If False, change only servo with self.config.id + @return: + """ + if use_broadcast_id: + current_id = 254 + else: + current_id = old_id + dxl_comm_result, dxl_error = self.packetHandler.write1ByteTxRx(self.portHandler, + current_id, self.ADDR_ID, new_id) + self._process_response(dxl_comm_result, dxl_error, old_id) + self.config.id = id + + def _enable_torque(self, motor_id): + dxl_comm_result, dxl_error = self.packetHandler.write1ByteTxRx(self.portHandler, motor_id, + self.ADDR_TORQUE_ENABLE, 1) + self._process_response(dxl_comm_result, dxl_error, motor_id) + self.torque_enabled[motor_id] = True + + def _disable_torque(self, motor_id): + dxl_comm_result, dxl_error = self.packetHandler.write1ByteTxRx(self.portHandler, motor_id, + self.ADDR_TORQUE_ENABLE, 0) + self._process_response(dxl_comm_result, dxl_error, motor_id) + self.torque_enabled[motor_id] = False + + def _process_response(self, dxl_comm_result: int, dxl_error: int, motor_id: int): + if dxl_comm_result != COMM_SUCCESS: + raise ConnectionError( + f"dxl_comm_result for motor {motor_id}: {self.packetHandler.getTxRxResult(dxl_comm_result)}") + elif dxl_error != 0: + print(f'dxl error {dxl_error}') + raise ConnectionError( + f"dynamixel error for motor {motor_id}: {self.packetHandler.getTxRxResult(dxl_error)}") + + def set_operating_mode(self, motor_id: int, operating_mode: OperatingMode): + dxl_comm_result, dxl_error = self.packetHandler.write2ByteTxRx(self.portHandler, motor_id, + self.OPERATING_MODE_ADDR, operating_mode.value) + self._process_response(dxl_comm_result, dxl_error, motor_id) + self.operating_modes[motor_id] = operating_mode + + def set_pwm_limit(self, motor_id: int, limit: int): + dxl_comm_result, dxl_error = self.packetHandler.write2ByteTxRx(self.portHandler, motor_id, + 36, limit) + self._process_response(dxl_comm_result, dxl_error, motor_id) + + def set_velocity_limit(self, motor_id: int, velocity_limit): + dxl_comm_result, dxl_error = self.packetHandler.write4ByteTxRx(self.portHandler, motor_id, + self.ADDR_VELOCITY_LIMIT, velocity_limit) + self._process_response(dxl_comm_result, dxl_error, motor_id) + + def set_P(self, motor_id: int, P: int): + dxl_comm_result, dxl_error = self.packetHandler.write2ByteTxRx(self.portHandler, motor_id, + self.POSITION_P, P) + self._process_response(dxl_comm_result, dxl_error, motor_id) + + def set_I(self, motor_id: int, I: int): + dxl_comm_result, dxl_error = self.packetHandler.write2ByteTxRx(self.portHandler, motor_id, + self.POSITION_I, I) + self._process_response(dxl_comm_result, dxl_error, motor_id) + + def read_home_offset(self, motor_id: int): + self._disable_torque(motor_id) + # dxl_comm_result, dxl_error = self.packetHandler.write4ByteTxRx(self.portHandler, motor_id, + # ReadAttribute.HOMING_OFFSET.value, home_position) + home_offset = self._read_value(motor_id, ReadAttribute.HOMING_OFFSET, 4) + # self._process_response(dxl_comm_result, dxl_error) + self._enable_torque(motor_id) + return home_offset + + def set_home_offset(self, motor_id: int, home_position: int): + self._disable_torque(motor_id) + dxl_comm_result, dxl_error = self.packetHandler.write4ByteTxRx(self.portHandler, motor_id, + ReadAttribute.HOMING_OFFSET.value, home_position) + self._process_response(dxl_comm_result, dxl_error, motor_id) + self._enable_torque(motor_id) + + def set_baudrate(self, motor_id: int, baudrate): + # translate baudrate into dynamixel baudrate setting id + if baudrate == 57600: + baudrate_id = 1 + elif baudrate == 1_000_000: + baudrate_id = 3 + elif baudrate == 2_000_000: + baudrate_id = 4 + elif baudrate == 3_000_000: + baudrate_id = 5 + elif baudrate == 4_000_000: + baudrate_id = 6 + else: + raise Exception('baudrate not implemented') + + self._disable_torque(motor_id) + dxl_comm_result, dxl_error = self.packetHandler.write1ByteTxRx(self.portHandler, motor_id, + ReadAttribute.BAUDRATE.value, baudrate_id) + self._process_response(dxl_comm_result, dxl_error, motor_id) + + def _read_value(self, motor_id, attribute: ReadAttribute, num_bytes: int, tries=10): + try: + if num_bytes == 1: + value, dxl_comm_result, dxl_error = self.packetHandler.read1ByteTxRx(self.portHandler, + motor_id, + attribute.value) + elif num_bytes == 2: + value, dxl_comm_result, dxl_error = self.packetHandler.read2ByteTxRx(self.portHandler, + motor_id, + attribute.value) + elif num_bytes == 4: + value, dxl_comm_result, dxl_error = self.packetHandler.read4ByteTxRx(self.portHandler, + motor_id, + attribute.value) + except Exception: + if tries == 0: + raise Exception + else: + return self._read_value(motor_id, attribute, num_bytes, tries=tries - 1) + if dxl_comm_result != COMM_SUCCESS: + if tries <= 1: + # print("%s" % self.packetHandler.getTxRxResult(dxl_comm_result)) + raise ConnectionError(f'dxl_comm_result {dxl_comm_result} for servo {motor_id} value {value}') + else: + print(f'dynamixel read failure for servo {motor_id} trying again with {tries - 1} tries') + time.sleep(0.02) + return self._read_value(motor_id, attribute, num_bytes, tries=tries - 1) + elif dxl_error != 0: # # print("%s" % self.packetHandler.getRxPacketError(dxl_error)) + # raise ConnectionError(f'dxl_error {dxl_error} binary ' + "{0:b}".format(37)) + if tries == 0 and dxl_error != 128: + raise Exception(f'Failed to read value from motor {motor_id} error is {dxl_error}') + else: + return self._read_value(motor_id, attribute, num_bytes, tries=tries - 1) + return value + + def set_home_position(self, motor_id: int): + print(f'setting home position for motor {motor_id}') + self.set_home_offset(motor_id, 0) + current_position = self.read_position(motor_id) + print(f'position before {current_position}') + self.set_home_offset(motor_id, -current_position) + # dynamixel.set_home_offset(motor_id, -4096) + # dynamixel.set_home_offset(motor_id, -4294964109) + current_position = self.read_position(motor_id) + # print(f'signed position {current_position - 2** 32}') + print(f'position after {current_position}') + + +if __name__ == "__main__": + dynamixel = Dynamixel.Config( + baudrate=1_000_000, + device_name='/dev/tty.usbmodem57380045631' + ).instantiate() + motor_id = 1 + pos = dynamixel.read_position(motor_id) + for i in range(10): + s = time.monotonic() + pos = dynamixel.read_position(motor_id) + delta = time.monotonic() - s + print(f'read position took {delta}') + print(f'position {pos}') \ No newline at end of file diff --git a/gym_lowcostrobot/interface/robot.py b/gym_lowcostrobot/interface/robot.py new file mode 100644 index 0000000..523d996 --- /dev/null +++ b/gym_lowcostrobot/interface/robot.py @@ -0,0 +1,175 @@ +import numpy as np +from dynamixel import Dynamixel, OperatingMode, ReadAttribute +import time +from dynamixel_sdk import GroupSyncRead, GroupSyncWrite, DXL_LOBYTE, DXL_HIBYTE, DXL_LOWORD, DXL_HIWORD +from enum import Enum, auto +from typing import Union + + +class MotorControlType(Enum): + PWM = auto() + POSITION_CONTROL = auto() + DISABLED = auto() + UNKNOWN = auto() + + +class Robot: + # def __init__(self, device_name: str, baudrate=1_000_000, servo_ids=[1, 2, 3, 4, 5]): + def __init__(self, dynamixel, baudrate=1_000_000, servo_ids=[1, 2, 3, 4, 5]): + self.servo_ids = servo_ids + self.dynamixel = dynamixel + # self.dynamixel = Dynamixel.Config(baudrate=baudrate, device_name=device_name).instantiate() + self.position_reader = GroupSyncRead( + self.dynamixel.portHandler, + self.dynamixel.packetHandler, + ReadAttribute.POSITION.value, + 4) + for id in self.servo_ids: + self.position_reader.addParam(id) + + self.velocity_reader = GroupSyncRead( + self.dynamixel.portHandler, + self.dynamixel.packetHandler, + ReadAttribute.VELOCITY.value, + 4) + for id in self.servo_ids: + self.velocity_reader.addParam(id) + + self.pos_writer = GroupSyncWrite( + self.dynamixel.portHandler, + self.dynamixel.packetHandler, + self.dynamixel.ADDR_GOAL_POSITION, + 4) + for id in self.servo_ids: + self.pos_writer.addParam(id, [2048]) + + self.pwm_writer = GroupSyncWrite( + self.dynamixel.portHandler, + self.dynamixel.packetHandler, + self.dynamixel.ADDR_GOAL_PWM, + 2) + for id in self.servo_ids: + self.pwm_writer.addParam(id, [2048]) + self._disable_torque() + self.motor_control_state = MotorControlType.DISABLED + + def read_position(self, tries=2): + """ + Reads the joint positions of the robot. 2048 is the center position. 0 and 4096 are 180 degrees in each direction. + :param tries: maximum number of tries to read the position + :return: list of joint positions in range [0, 4096] + """ + result = self.position_reader.txRxPacket() + if result != 0: + if tries > 0: + return self.read_position(tries=tries - 1) + else: + print(f'failed to read position!!!!!!!!!!!!!!!!!!!!!!!!!!!!!') + positions = [] + for id in self.servo_ids: + position = self.position_reader.getData(id, ReadAttribute.POSITION.value, 4) + if position > 2 ** 31: + position -= 2 ** 32 + positions.append(position) + return positions + + def read_velocity(self): + """ + Reads the joint velocities of the robot. + :return: list of joint velocities, + """ + self.velocity_reader.txRxPacket() + velocties = [] + for id in self.servo_ids: + velocity = self.velocity_reader.getData(id, ReadAttribute.VELOCITY.value, 4) + if velocity > 2 ** 31: + velocity -= 2 ** 32 + velocties.append(velocity) + return velocties + + def set_goal_pos(self, action): + """ + + :param action: list or numpy array of target joint positions in range [0, 4096] + """ + if not self.motor_control_state is MotorControlType.POSITION_CONTROL: + self._set_position_control() + for i, motor_id in enumerate(self.servo_ids): + data_write = [DXL_LOBYTE(DXL_LOWORD(action[i])), + DXL_HIBYTE(DXL_LOWORD(action[i])), + DXL_LOBYTE(DXL_HIWORD(action[i])), + DXL_HIBYTE(DXL_HIWORD(action[i]))] + self.pos_writer.changeParam(motor_id, data_write) + + self.pos_writer.txPacket() + + def set_pwm(self, action): + """ + Sets the pwm values for the servos. + :param action: list or numpy array of pwm values in range [0, 885] + """ + if not self.motor_control_state is MotorControlType.PWM: + self._set_pwm_control() + for i, motor_id in enumerate(self.servo_ids): + data_write = [DXL_LOBYTE(DXL_LOWORD(action[i])), + DXL_HIBYTE(DXL_LOWORD(action[i])), + ] + self.pwm_writer.changeParam(motor_id, data_write) + + self.pwm_writer.txPacket() + + def set_trigger_torque(self): + """ + Sets a constant torque torque for the last servo in the chain. This is useful for the trigger of the leader arm + """ + self.dynamixel._enable_torque(self.servo_ids[-1]) + self.dynamixel.set_pwm_value(self.servo_ids[-1], 200) + + def limit_pwm(self, limit: Union[int, list, np.ndarray]): + """ + Limits the pwm values for the servos in for position control + @param limit: 0 ~ 885 + @return: + """ + if isinstance(limit, int): + limits = [limit, ] * 5 + else: + limits = limit + self._disable_torque() + for motor_id, limit in zip(self.servo_ids, limits): + self.dynamixel.set_pwm_limit(motor_id, limit) + self._enable_torque() + + def _disable_torque(self): + print(f'disabling torque for servos {self.servo_ids}') + for motor_id in self.servo_ids: + self.dynamixel._disable_torque(motor_id) + + def _enable_torque(self): + print(f'enabling torque for servos {self.servo_ids}') + for motor_id in self.servo_ids: + self.dynamixel._enable_torque(motor_id) + + def _set_pwm_control(self): + self._disable_torque() + for motor_id in self.servo_ids: + self.dynamixel.set_operating_mode(motor_id, OperatingMode.PWM) + self._enable_torque() + self.motor_control_state = MotorControlType.PWM + + def _set_position_control(self): + self._disable_torque() + for motor_id in self.servo_ids: + self.dynamixel.set_operating_mode(motor_id, OperatingMode.POSITION) + self._enable_torque() + self.motor_control_state = MotorControlType.POSITION_CONTROL + + +if __name__ == "__main__": + robot = Robot(device_name='/dev/tty.usbmodem57380045631') + robot._disable_torque() + for _ in range(10000): + s = time.time() + pos = robot.read_position() + elapsed = time.time() - s + print(f'read took {elapsed} pos {pos}') \ No newline at end of file diff --git a/gym_lowcostrobot/simulated_robot.py b/gym_lowcostrobot/interface/simulated_robot.py similarity index 100% rename from gym_lowcostrobot/simulated_robot.py rename to gym_lowcostrobot/interface/simulated_robot.py