Things are looking good.

This commit is contained in:
jess-moss
2024-08-28 14:42:39 -05:00
parent 8e05497b11
commit 699adc2130
2 changed files with 142 additions and 104 deletions

View File

@@ -1,9 +1,64 @@
import time
import traceback
import numpy as np
import serial
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
BAUDRATE = 115_200
TIMEOUT_S = 1
COMMAND_HEADER = [0x55, 0x55] # Header for all commands sent to Hiwonder motors
BYTE_MASK = 0xFF
# https://drive.google.com/file/d/1DQGBBng8UFziv5hg15qmyvakItG3w6Ss/view?usp=sharing
# data_name: (command, length)
HIWONDER_CONTROL_TABLE = {
"SERVO_MOVE_TIME_WRITE": (0x01, 0x07),
"SERVO_MOVE_TIME_READ": (0x02, 0x03),
"SERVO_MOVE_TIME_WAIT_WRITE": (0x07, 0x07),
"SERVO_MOVE_TIME_WAIT_READ": (0x08, 0x03),
"SERVO_MOVE_START": (0x0B, 0x03),
"SERVO_MOVE_STOP": (0x0C, 0x03),
"SERVO_ID_WRITE": (0x0D, 0x04),
"SERVO_ID_READ": (0x0E, 0x03),
"SERVO_ANGLE_OFFSET_ADJUST": (0x11, 0x04),
"SERVO_ANGLE_OFFSET_WRITE": (0x12, 0x03),
"SERVO_ANGLE_OFFSET_READ": (0x13, 0x03),
"SERVO_ANGLE_LIMIT_WRITE": (0x14, 0x07),
"SERVO_ANGLE_LIMIT_READ": (0x15, 0x03),
"SERVO_VIN_LIMIT_WRITE": (0x16, 0x07),
"SERVO_VIN_LIMIT_READ": (0x17, 0x03),
"SERVO_TEMP_MAX_LIMIT_WRITE": (0x18, 0x04),
"SERVO_TEMP_MAX_LIMIT_READ": (0x19, 0x03),
"SERVO_TEMP_READ": (0x1A, 0x03),
"SERVO_VIN_READ": (0x1B, 0x03),
"SERVO_POS_READ": (0x1C, 0x03),
"SERVO_OR_MOTOR_MODE_WRITE": (0x1D, 0x07),
"SERVO_OR_MOTOR_MODE_READ": (0x1E, 0x03),
"SERVO_LOAD_OR_UNLOAD_WRITE": (0x1F, 0x04),
"SERVO_LOAD_OR_UNLOAD_READ": (0x20, 0x03),
"SERVO_LED_CTRL_WRITE": (0x21, 0x04),
"SERVO_LED_CTRL_READ": (0x22, 0x03),
"SERVO_LED_ERROR_WRITE": (0x23, 0x04),
"SERVO_LED_ERROR_READ": (0x24, 0x03),
}
def calculate_checksum(data):
"""Calculate the checksum for the given data."""
checksum = ~(sum(data)) & BYTE_MASK
return checksum
def low_byte(value):
"""Extract the low byte of a 16-bit integer."""
return value & BYTE_MASK
def high_byte(value):
"""Extract the high byte of a 16-bit integer."""
return (value >> 8) & BYTE_MASK
class HiwonderMotorsBus:
@@ -25,21 +80,22 @@ class HiwonderMotorsBus:
)
motors_bus.connect()
position = motors_bus.read("Present_Position")
position = motors_bus.read()
# move from a few motor steps as an example
few_steps = 30
motors_bus.write("Goal_Position", position + few_steps)
motors_bus.write(position + few_steps)
# when done, consider disconnecting
motors_bus.disconnect()
```
"""
def __init__(self, port):
def __init__(self, port: str, motors: dict[str, tuple[int, str]]):
self.port = port
self.serial = None
self.is_connected = False
self.motors = motors
def connect(self):
"""Open the serial port and establish a connection."""
@@ -48,8 +104,7 @@ class HiwonderMotorsBus:
f"HiwonderMotorsBus({self.port}) is already connected. Do not call `motors_bus.connect()` twice."
)
try:
# TODO made baudrate, timeout constant
self.serial = serial.Serial(self.port, baudrate=115200, timeout=1)
self.serial = serial.Serial(self.port, baudrate=BAUDRATE, timeout=TIMEOUT_S)
self.is_connected = True
print("Connection established")
except Exception:
@@ -64,79 +119,99 @@ class HiwonderMotorsBus:
self.is_connected = False
print("Connection closed")
def calculate_checksum(self, data):
"""Calculate the checksum for the given data."""
checksum = ~(sum(data)) & 0xFF
return checksum
@property
def motor_names(self) -> list[str]:
return list(self.motors.keys())
def write_position(self, motor_id, position, duration):
"""Send a command to move the motor to a specific position over a specified duration."""
def write(self, values: int | float | np.ndarray, motor_names: str | list[str] | None = None):
if not self.is_connected:
print("Serial port not connected.")
return
raise RobotDeviceAlreadyConnectedError(f"HiwonderMotorsBus({self.port}) is not connected.")
# Command structure: 55 55 ID LEN CMD P1 P2 P3 P4 CHK
cmd = 0x01 # Assuming command for move
length = 0x07 # Length of the remaining bytes
pos_low = position & 0xFF
pos_high = (position >> 8) & 0xFF
dur_low = duration & 0xFF
dur_high = (duration >> 8) & 0xFF
if motor_names is None:
motor_names = self.motor_names
# Prepare the packet
data = [motor_id, length, cmd, pos_low, pos_high, dur_low, dur_high]
checksum = self.calculate_checksum(data)
command = [0x55, 0x55] + data + [checksum]
if isinstance(motor_names, str):
motor_names = [motor_names]
# Send the packet
self.serial.write(bytearray(command))
print(f"Position write command sent: {' '.join(f'{byte:02X}' for byte in command)}")
if isinstance(values, (int, float)):
values = [values] * len(
motor_names
) # Replicate value for each motor if a single value is provided
def read_position(self, motor_id):
"""Send a command to read the current position of the motor."""
motor_commands = zip(motor_names, values, strict=False)
for motor_name, value in motor_commands:
motor_id, _ = self.motors[motor_name] # Using only the ID for the command
duration = 0 # Get to the next position as soon as possible
# Command structure: 55 55 ID LEN CMD P1 P2 P3 P4 CHK
# Retrieve command and length from the control table
cmd, length = HIWONDER_CONTROL_TABLE["SERVO_MOVE_TIME_WRITE"]
# Prepare the packet
data = [
motor_id,
length,
cmd,
low_byte(value),
high_byte(value),
low_byte(duration),
high_byte(duration),
]
checksum = calculate_checksum(data)
command = COMMAND_HEADER + data + [checksum]
self.serial.write(bytearray(command))
def read(self, motor_names: str | list[str] | None = None):
"""Send a command to read the current position of the motors."""
if not self.is_connected:
print("Serial port not connected.")
return
raise RobotDeviceNotConnectedError(
f"HiwonderMotorsBus({self.port}) is not connected. You need to run `motors_bus.connect()`."
)
# Command structure for reading position: 55 55 ID LEN CMD CHK
cmd = 0x1C # Command value for reading position
length = 0x03 # Length is always 3 for the read position command
if motor_names is None:
motor_names = self.motor_names
# Prepare the command packet
data = [motor_id, length, cmd]
checksum = self.calculate_checksum(data)
command = [0x55, 0x55] + data + [checksum]
if isinstance(motor_names, str):
motor_names = [motor_names]
# Send the command
self.serial.write(bytearray(command))
print(f"Position read command sent: {' '.join(f'{byte:02X}' for byte in command)}")
motor_ids = [self.motors[name][0] for name in motor_names] # Extract motor IDs based on names
# Wait for response and handle it
try:
response = self.serial.read(8) # Adjust size based on expected response
if len(response) == 8:
header1, header2, resp_id, resp_len, resp_cmd, param1, param2, resp_chk = response
# Calculate expected checksum and compare
expected_checksum = self.calculate_checksum(response[2:-1])
if resp_chk == expected_checksum:
# Combine param1 and param2 to form the actual position
position = param1 + (param2 << 8)
print(f"Current position of motor {motor_id}: {position}")
return position
positions = {}
for motor_name, motor_id in zip(motor_names, motor_ids, strict=False):
# Command structure for reading position: 55 55 ID LEN CMD CHK
# Retrieve command and length from the control table
cmd, length = HIWONDER_CONTROL_TABLE["SERVO_POS_READ"]
# Prepare the command packet
data = [motor_id, length, cmd]
checksum = calculate_checksum(data)
command = COMMAND_HEADER + data + [checksum]
# Send the command
self.serial.write(bytearray(command))
# Wait for response and handle it
try:
response = self.serial.read(8) # Adjust size based on expected response
if len(response) == 8:
header1, header2, resp_id, resp_len, resp_cmd, param1, param2, resp_chk = response
# Calculate expected checksum and compare
expected_checksum = calculate_checksum(response[2:-1])
if resp_chk == expected_checksum:
# Combine param1 and param2 to form the actual position
position = param1 + (param2 << 8)
positions[motor_name] = position # Store position with motor ID as the key
else:
print(f"Checksum mismatch for motor {motor_id}.")
else:
print("Checksum mismatch.")
else:
print("No response or invalid response")
except Exception as e:
print(f"Failed to read position: {e}")
print(f"No response or invalid response for motor {motor_id}.")
except Exception as e:
print(f"Failed to read position for motor {motor_id}: {str(e)}")
return positions
# Example usage
if __name__ == "__main__":
hw_bus = HiwonderMotorsBus("/dev/ttyUSB0") # Adjust as necessary for your setup
hw_bus.connect()
hw_bus.send_command(1, 1000, 2000) # Move motor ID 1 to position 1000 in 2000 ms
time.sleep(2.5) # Wait for the movement to complete
hw_bus.send_command(1, 500, 2000) # Move motor ID 1 to position 500 in 2000 ms
time.sleep(2.5) # Wait for the movement to complete
hw_bus.disconnect()
pass

View File

@@ -1,37 +0,0 @@
import time
from lerobot.common.robot_devices.motors.hiwonder import (
HiwonderMotorsBus, # assuming the class is saved in hiwonder_motors_bus.py
)
def main():
# Replace '/dev/ttyUSB0' with your actual port
# Try connecting with 9600 baud rate
hw_bus = HiwonderMotorsBus("/dev/ttyUSB0")
hw_bus.connect()
# hw_bus.read_motor_id()
hw_bus.write_position(1, 10, 2000) # Move motor ID 1 to position 1000 in 2000 ms
time.sleep(3.0) # Wait for the movement to complete
hw_bus.read_position(1)
hw_bus.write_position(1, 500, 2000) # Move motor ID 1 to position 500 in 2000 ms
time.sleep(3.0) # Wait for the movement to complete
hw_bus.read_position(1)
hw_bus.write_position(1, 1000, 2000) # Move motor ID 1 to position 500 in 2000 ms
time.sleep(3.0) # Wait for the movement to complete
hw_bus.read_position(1)
hw_bus.write_position(2, 10, 2000) # Move motor ID 1 to position 1000 in 2000 ms
time.sleep(3.0) # Wait for the movement to complete
hw_bus.read_position(2)
hw_bus.write_position(2, 500, 2000) # Move motor ID 1 to position 500 in 2000 ms
time.sleep(3.0) # Wait for the movement to complete
hw_bus.read_position(2)
hw_bus.write_position(2, 1000, 2000) # Move motor ID 1 to position 500 in 2000 ms
time.sleep(3.0) # Wait for the movement to complete
hw_bus.read_position(2)
hw_bus.disconnect()
if __name__ == "__main__":
main()