From 8e05497b114ac220dc28ad6e1ea9f9e85ff4ed53 Mon Sep 17 00:00:00 2001 From: jess-moss Date: Wed, 28 Aug 2024 09:01:09 -0500 Subject: [PATCH 1/2] commit. --- .../common/robot_devices/motors/hiwonder.py | 142 ++++++++++++++++++ lerobot/scripts/read_hiwonder_motors2.py | 37 +++++ 2 files changed, 179 insertions(+) create mode 100644 lerobot/common/robot_devices/motors/hiwonder.py create mode 100644 lerobot/scripts/read_hiwonder_motors2.py diff --git a/lerobot/common/robot_devices/motors/hiwonder.py b/lerobot/common/robot_devices/motors/hiwonder.py new file mode 100644 index 000000000..de9edb98a --- /dev/null +++ b/lerobot/common/robot_devices/motors/hiwonder.py @@ -0,0 +1,142 @@ +import time +import traceback + +import serial + +from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError + + +class HiwonderMotorsBus: + """ + The HiwonderMotorBus class allows to efficiently read and write to the attached motors. It relies on + the [Hiwonder Bus Communication Protoco](https://drive.google.com/file/d/1JKyt_OUg9V6cIBC-SiX6IIAACsvz86aB/view?usp=sharing). + + A HiwonderMotorBus instance requires a port (e.g. `HiwonderMotorBus(port="/dev/tty.usbmodem575E0031751"`)). + + Example of usage for 1 motor connected to the bus: + ```python + motor_name = "gripper" + motor_index = 6 + motor_model = "lx-16a" + + motors_bus = HiwonderMotorBus( + port="/dev/tty.usbmodem575E0031751", + motors={motor_name: (motor_index, motor_model)}, + ) + motors_bus.connect() + + position = motors_bus.read("Present_Position") + + # move from a few motor steps as an example + few_steps = 30 + motors_bus.write("Goal_Position", position + few_steps) + + # when done, consider disconnecting + motors_bus.disconnect() + ``` + """ + + def __init__(self, port): + self.port = port + self.serial = None + self.is_connected = False + + def connect(self): + """Open the serial port and establish a connection.""" + if self.is_connected: + raise RobotDeviceAlreadyConnectedError( + f"HiwonderMotorsBus({self.port}) is already connected. Do not call `motors_bus.connect()` twice." + ) + try: + # TODO made baudrate, timeout constant + self.serial = serial.Serial(self.port, baudrate=115200, timeout=1) + self.is_connected = True + print("Connection established") + except Exception: + traceback.print_exc() + print("\nCould not open port.\n") + raise + + def disconnect(self): + """Close the serial port connection.""" + if self.serial: + self.serial.close() + self.is_connected = False + print("Connection closed") + + def calculate_checksum(self, data): + """Calculate the checksum for the given data.""" + checksum = ~(sum(data)) & 0xFF + return checksum + + def write_position(self, motor_id, position, duration): + """Send a command to move the motor to a specific position over a specified duration.""" + if not self.is_connected: + print("Serial port not connected.") + return + + # Command structure: 55 55 ID LEN CMD P1 P2 P3 P4 CHK + cmd = 0x01 # Assuming command for move + length = 0x07 # Length of the remaining bytes + pos_low = position & 0xFF + pos_high = (position >> 8) & 0xFF + dur_low = duration & 0xFF + dur_high = (duration >> 8) & 0xFF + + # Prepare the packet + data = [motor_id, length, cmd, pos_low, pos_high, dur_low, dur_high] + checksum = self.calculate_checksum(data) + command = [0x55, 0x55] + data + [checksum] + + # Send the packet + self.serial.write(bytearray(command)) + print(f"Position write command sent: {' '.join(f'{byte:02X}' for byte in command)}") + + def read_position(self, motor_id): + """Send a command to read the current position of the motor.""" + if not self.is_connected: + print("Serial port not connected.") + return + + # Command structure for reading position: 55 55 ID LEN CMD CHK + cmd = 0x1C # Command value for reading position + length = 0x03 # Length is always 3 for the read position command + + # Prepare the command packet + data = [motor_id, length, cmd] + checksum = self.calculate_checksum(data) + command = [0x55, 0x55] + data + [checksum] + + # Send the command + self.serial.write(bytearray(command)) + print(f"Position read command sent: {' '.join(f'{byte:02X}' for byte in command)}") + + # Wait for response and handle it + try: + response = self.serial.read(8) # Adjust size based on expected response + if len(response) == 8: + header1, header2, resp_id, resp_len, resp_cmd, param1, param2, resp_chk = response + # Calculate expected checksum and compare + expected_checksum = self.calculate_checksum(response[2:-1]) + if resp_chk == expected_checksum: + # Combine param1 and param2 to form the actual position + position = param1 + (param2 << 8) + print(f"Current position of motor {motor_id}: {position}") + return position + else: + print("Checksum mismatch.") + else: + print("No response or invalid response") + except Exception as e: + print(f"Failed to read position: {e}") + + +# Example usage +if __name__ == "__main__": + hw_bus = HiwonderMotorsBus("/dev/ttyUSB0") # Adjust as necessary for your setup + hw_bus.connect() + hw_bus.send_command(1, 1000, 2000) # Move motor ID 1 to position 1000 in 2000 ms + time.sleep(2.5) # Wait for the movement to complete + hw_bus.send_command(1, 500, 2000) # Move motor ID 1 to position 500 in 2000 ms + time.sleep(2.5) # Wait for the movement to complete + hw_bus.disconnect() diff --git a/lerobot/scripts/read_hiwonder_motors2.py b/lerobot/scripts/read_hiwonder_motors2.py new file mode 100644 index 000000000..e9896b014 --- /dev/null +++ b/lerobot/scripts/read_hiwonder_motors2.py @@ -0,0 +1,37 @@ +import time + +from lerobot.common.robot_devices.motors.hiwonder import ( + HiwonderMotorsBus, # assuming the class is saved in hiwonder_motors_bus.py +) + + +def main(): + # Replace '/dev/ttyUSB0' with your actual port + # Try connecting with 9600 baud rate + hw_bus = HiwonderMotorsBus("/dev/ttyUSB0") + hw_bus.connect() + # hw_bus.read_motor_id() + hw_bus.write_position(1, 10, 2000) # Move motor ID 1 to position 1000 in 2000 ms + time.sleep(3.0) # Wait for the movement to complete + hw_bus.read_position(1) + hw_bus.write_position(1, 500, 2000) # Move motor ID 1 to position 500 in 2000 ms + time.sleep(3.0) # Wait for the movement to complete + hw_bus.read_position(1) + hw_bus.write_position(1, 1000, 2000) # Move motor ID 1 to position 500 in 2000 ms + time.sleep(3.0) # Wait for the movement to complete + hw_bus.read_position(1) + + hw_bus.write_position(2, 10, 2000) # Move motor ID 1 to position 1000 in 2000 ms + time.sleep(3.0) # Wait for the movement to complete + hw_bus.read_position(2) + hw_bus.write_position(2, 500, 2000) # Move motor ID 1 to position 500 in 2000 ms + time.sleep(3.0) # Wait for the movement to complete + hw_bus.read_position(2) + hw_bus.write_position(2, 1000, 2000) # Move motor ID 1 to position 500 in 2000 ms + time.sleep(3.0) # Wait for the movement to complete + hw_bus.read_position(2) + hw_bus.disconnect() + + +if __name__ == "__main__": + main() From 699adc2130daa462f5e3ed9234e49806767ab165 Mon Sep 17 00:00:00 2001 From: jess-moss Date: Wed, 28 Aug 2024 14:42:39 -0500 Subject: [PATCH 2/2] Things are looking good. --- .../common/robot_devices/motors/hiwonder.py | 223 ++++++++++++------ lerobot/scripts/read_hiwonder_motors2.py | 37 --- 2 files changed, 149 insertions(+), 111 deletions(-) delete mode 100644 lerobot/scripts/read_hiwonder_motors2.py diff --git a/lerobot/common/robot_devices/motors/hiwonder.py b/lerobot/common/robot_devices/motors/hiwonder.py index de9edb98a..91ae5f85f 100644 --- a/lerobot/common/robot_devices/motors/hiwonder.py +++ b/lerobot/common/robot_devices/motors/hiwonder.py @@ -1,9 +1,64 @@ -import time import traceback +import numpy as np import serial -from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError +from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError + +BAUDRATE = 115_200 +TIMEOUT_S = 1 +COMMAND_HEADER = [0x55, 0x55] # Header for all commands sent to Hiwonder motors +BYTE_MASK = 0xFF + +# https://drive.google.com/file/d/1DQGBBng8UFziv5hg15qmyvakItG3w6Ss/view?usp=sharing + +# data_name: (command, length) +HIWONDER_CONTROL_TABLE = { + "SERVO_MOVE_TIME_WRITE": (0x01, 0x07), + "SERVO_MOVE_TIME_READ": (0x02, 0x03), + "SERVO_MOVE_TIME_WAIT_WRITE": (0x07, 0x07), + "SERVO_MOVE_TIME_WAIT_READ": (0x08, 0x03), + "SERVO_MOVE_START": (0x0B, 0x03), + "SERVO_MOVE_STOP": (0x0C, 0x03), + "SERVO_ID_WRITE": (0x0D, 0x04), + "SERVO_ID_READ": (0x0E, 0x03), + "SERVO_ANGLE_OFFSET_ADJUST": (0x11, 0x04), + "SERVO_ANGLE_OFFSET_WRITE": (0x12, 0x03), + "SERVO_ANGLE_OFFSET_READ": (0x13, 0x03), + "SERVO_ANGLE_LIMIT_WRITE": (0x14, 0x07), + "SERVO_ANGLE_LIMIT_READ": (0x15, 0x03), + "SERVO_VIN_LIMIT_WRITE": (0x16, 0x07), + "SERVO_VIN_LIMIT_READ": (0x17, 0x03), + "SERVO_TEMP_MAX_LIMIT_WRITE": (0x18, 0x04), + "SERVO_TEMP_MAX_LIMIT_READ": (0x19, 0x03), + "SERVO_TEMP_READ": (0x1A, 0x03), + "SERVO_VIN_READ": (0x1B, 0x03), + "SERVO_POS_READ": (0x1C, 0x03), + "SERVO_OR_MOTOR_MODE_WRITE": (0x1D, 0x07), + "SERVO_OR_MOTOR_MODE_READ": (0x1E, 0x03), + "SERVO_LOAD_OR_UNLOAD_WRITE": (0x1F, 0x04), + "SERVO_LOAD_OR_UNLOAD_READ": (0x20, 0x03), + "SERVO_LED_CTRL_WRITE": (0x21, 0x04), + "SERVO_LED_CTRL_READ": (0x22, 0x03), + "SERVO_LED_ERROR_WRITE": (0x23, 0x04), + "SERVO_LED_ERROR_READ": (0x24, 0x03), +} + + +def calculate_checksum(data): + """Calculate the checksum for the given data.""" + checksum = ~(sum(data)) & BYTE_MASK + return checksum + + +def low_byte(value): + """Extract the low byte of a 16-bit integer.""" + return value & BYTE_MASK + + +def high_byte(value): + """Extract the high byte of a 16-bit integer.""" + return (value >> 8) & BYTE_MASK class HiwonderMotorsBus: @@ -25,21 +80,22 @@ class HiwonderMotorsBus: ) motors_bus.connect() - position = motors_bus.read("Present_Position") + position = motors_bus.read() # move from a few motor steps as an example few_steps = 30 - motors_bus.write("Goal_Position", position + few_steps) + motors_bus.write(position + few_steps) # when done, consider disconnecting motors_bus.disconnect() ``` """ - def __init__(self, port): + def __init__(self, port: str, motors: dict[str, tuple[int, str]]): self.port = port self.serial = None self.is_connected = False + self.motors = motors def connect(self): """Open the serial port and establish a connection.""" @@ -48,8 +104,7 @@ def connect(self): f"HiwonderMotorsBus({self.port}) is already connected. Do not call `motors_bus.connect()` twice." ) try: - # TODO made baudrate, timeout constant - self.serial = serial.Serial(self.port, baudrate=115200, timeout=1) + self.serial = serial.Serial(self.port, baudrate=BAUDRATE, timeout=TIMEOUT_S) self.is_connected = True print("Connection established") except Exception: @@ -64,79 +119,99 @@ def disconnect(self): self.is_connected = False print("Connection closed") - def calculate_checksum(self, data): - """Calculate the checksum for the given data.""" - checksum = ~(sum(data)) & 0xFF - return checksum + @property + def motor_names(self) -> list[str]: + return list(self.motors.keys()) - def write_position(self, motor_id, position, duration): - """Send a command to move the motor to a specific position over a specified duration.""" + def write(self, values: int | float | np.ndarray, motor_names: str | list[str] | None = None): if not self.is_connected: - print("Serial port not connected.") - return - - # Command structure: 55 55 ID LEN CMD P1 P2 P3 P4 CHK - cmd = 0x01 # Assuming command for move - length = 0x07 # Length of the remaining bytes - pos_low = position & 0xFF - pos_high = (position >> 8) & 0xFF - dur_low = duration & 0xFF - dur_high = (duration >> 8) & 0xFF - - # Prepare the packet - data = [motor_id, length, cmd, pos_low, pos_high, dur_low, dur_high] - checksum = self.calculate_checksum(data) - command = [0x55, 0x55] + data + [checksum] - - # Send the packet - self.serial.write(bytearray(command)) - print(f"Position write command sent: {' '.join(f'{byte:02X}' for byte in command)}") - - def read_position(self, motor_id): - """Send a command to read the current position of the motor.""" + raise RobotDeviceAlreadyConnectedError(f"HiwonderMotorsBus({self.port}) is not connected.") + + if motor_names is None: + motor_names = self.motor_names + + if isinstance(motor_names, str): + motor_names = [motor_names] + + if isinstance(values, (int, float)): + values = [values] * len( + motor_names + ) # Replicate value for each motor if a single value is provided + + motor_commands = zip(motor_names, values, strict=False) + + for motor_name, value in motor_commands: + motor_id, _ = self.motors[motor_name] # Using only the ID for the command + duration = 0 # Get to the next position as soon as possible + + # Command structure: 55 55 ID LEN CMD P1 P2 P3 P4 CHK + # Retrieve command and length from the control table + cmd, length = HIWONDER_CONTROL_TABLE["SERVO_MOVE_TIME_WRITE"] + # Prepare the packet + data = [ + motor_id, + length, + cmd, + low_byte(value), + high_byte(value), + low_byte(duration), + high_byte(duration), + ] + checksum = calculate_checksum(data) + command = COMMAND_HEADER + data + [checksum] + + self.serial.write(bytearray(command)) + + def read(self, motor_names: str | list[str] | None = None): + """Send a command to read the current position of the motors.""" if not self.is_connected: - print("Serial port not connected.") - return - - # Command structure for reading position: 55 55 ID LEN CMD CHK - cmd = 0x1C # Command value for reading position - length = 0x03 # Length is always 3 for the read position command - - # Prepare the command packet - data = [motor_id, length, cmd] - checksum = self.calculate_checksum(data) - command = [0x55, 0x55] + data + [checksum] - - # Send the command - self.serial.write(bytearray(command)) - print(f"Position read command sent: {' '.join(f'{byte:02X}' for byte in command)}") + raise RobotDeviceNotConnectedError( + f"HiwonderMotorsBus({self.port}) is not connected. You need to run `motors_bus.connect()`." + ) - # Wait for response and handle it - try: - response = self.serial.read(8) # Adjust size based on expected response - if len(response) == 8: - header1, header2, resp_id, resp_len, resp_cmd, param1, param2, resp_chk = response - # Calculate expected checksum and compare - expected_checksum = self.calculate_checksum(response[2:-1]) - if resp_chk == expected_checksum: - # Combine param1 and param2 to form the actual position - position = param1 + (param2 << 8) - print(f"Current position of motor {motor_id}: {position}") - return position + if motor_names is None: + motor_names = self.motor_names + + if isinstance(motor_names, str): + motor_names = [motor_names] + + motor_ids = [self.motors[name][0] for name in motor_names] # Extract motor IDs based on names + + positions = {} + for motor_name, motor_id in zip(motor_names, motor_ids, strict=False): + # Command structure for reading position: 55 55 ID LEN CMD CHK + # Retrieve command and length from the control table + cmd, length = HIWONDER_CONTROL_TABLE["SERVO_POS_READ"] + + # Prepare the command packet + data = [motor_id, length, cmd] + checksum = calculate_checksum(data) + command = COMMAND_HEADER + data + [checksum] + + # Send the command + self.serial.write(bytearray(command)) + + # Wait for response and handle it + try: + response = self.serial.read(8) # Adjust size based on expected response + if len(response) == 8: + header1, header2, resp_id, resp_len, resp_cmd, param1, param2, resp_chk = response + # Calculate expected checksum and compare + expected_checksum = calculate_checksum(response[2:-1]) + if resp_chk == expected_checksum: + # Combine param1 and param2 to form the actual position + position = param1 + (param2 << 8) + positions[motor_name] = position # Store position with motor ID as the key + else: + print(f"Checksum mismatch for motor {motor_id}.") else: - print("Checksum mismatch.") - else: - print("No response or invalid response") - except Exception as e: - print(f"Failed to read position: {e}") + print(f"No response or invalid response for motor {motor_id}.") + except Exception as e: + print(f"Failed to read position for motor {motor_id}: {str(e)}") + + return positions # Example usage if __name__ == "__main__": - hw_bus = HiwonderMotorsBus("/dev/ttyUSB0") # Adjust as necessary for your setup - hw_bus.connect() - hw_bus.send_command(1, 1000, 2000) # Move motor ID 1 to position 1000 in 2000 ms - time.sleep(2.5) # Wait for the movement to complete - hw_bus.send_command(1, 500, 2000) # Move motor ID 1 to position 500 in 2000 ms - time.sleep(2.5) # Wait for the movement to complete - hw_bus.disconnect() + pass diff --git a/lerobot/scripts/read_hiwonder_motors2.py b/lerobot/scripts/read_hiwonder_motors2.py deleted file mode 100644 index e9896b014..000000000 --- a/lerobot/scripts/read_hiwonder_motors2.py +++ /dev/null @@ -1,37 +0,0 @@ -import time - -from lerobot.common.robot_devices.motors.hiwonder import ( - HiwonderMotorsBus, # assuming the class is saved in hiwonder_motors_bus.py -) - - -def main(): - # Replace '/dev/ttyUSB0' with your actual port - # Try connecting with 9600 baud rate - hw_bus = HiwonderMotorsBus("/dev/ttyUSB0") - hw_bus.connect() - # hw_bus.read_motor_id() - hw_bus.write_position(1, 10, 2000) # Move motor ID 1 to position 1000 in 2000 ms - time.sleep(3.0) # Wait for the movement to complete - hw_bus.read_position(1) - hw_bus.write_position(1, 500, 2000) # Move motor ID 1 to position 500 in 2000 ms - time.sleep(3.0) # Wait for the movement to complete - hw_bus.read_position(1) - hw_bus.write_position(1, 1000, 2000) # Move motor ID 1 to position 500 in 2000 ms - time.sleep(3.0) # Wait for the movement to complete - hw_bus.read_position(1) - - hw_bus.write_position(2, 10, 2000) # Move motor ID 1 to position 1000 in 2000 ms - time.sleep(3.0) # Wait for the movement to complete - hw_bus.read_position(2) - hw_bus.write_position(2, 500, 2000) # Move motor ID 1 to position 500 in 2000 ms - time.sleep(3.0) # Wait for the movement to complete - hw_bus.read_position(2) - hw_bus.write_position(2, 1000, 2000) # Move motor ID 1 to position 500 in 2000 ms - time.sleep(3.0) # Wait for the movement to complete - hw_bus.read_position(2) - hw_bus.disconnect() - - -if __name__ == "__main__": - main()