Add support for feetech scs series + various fixes

This commit is contained in:
Simon Alibert
2025-04-08 10:46:29 +02:00
parent 99c0938b42
commit e998dddcfa
5 changed files with 291 additions and 307 deletions

View File

@@ -21,19 +21,22 @@ from lerobot.common.utils.encoding_utils import decode_sign_magnitude, encode_si
from ..motors_bus import Motor, MotorCalibration, MotorsBus, NameOrID, Value
from .tables import (
AVAILABLE_BAUDRATES,
ENCODINGS,
FIRMWARE_VERSION,
MODEL_BAUDRATE_TABLE,
MODEL_CONTROL_TABLE,
MODEL_ENCODING_TABLE,
MODEL_NUMBER,
MODEL_NUMBER_TABLE,
MODEL_RESOLUTION,
NORMALIZATION_REQUIRED,
SCAN_BAUDRATES,
)
PROTOCOL_VERSION = 0
BAUDRATE = 1_000_000
DEFAULT_TIMEOUT_MS = 1000
NORMALIZED_DATA = ["Goal_Position", "Present_Position"]
logger = logging.getLogger(__name__)
@@ -80,16 +83,14 @@ class FeetechMotorsBus(MotorsBus):
python feetech sdk to communicate with the motors, which is itself based on the dynamixel sdk.
"""
available_baudrates = deepcopy(AVAILABLE_BAUDRATES)
available_baudrates = deepcopy(SCAN_BAUDRATES)
default_timeout = DEFAULT_TIMEOUT_MS
model_baudrate_table = deepcopy(MODEL_BAUDRATE_TABLE)
model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE)
model_number_table = deepcopy(MODEL_NUMBER)
model_encoding_table = deepcopy(MODEL_ENCODING_TABLE)
model_number_table = deepcopy(MODEL_NUMBER_TABLE)
model_resolution_table = deepcopy(MODEL_RESOLUTION)
normalization_required = deepcopy(NORMALIZATION_REQUIRED)
# Feetech specific
encodings = deepcopy(ENCODINGS)
normalized_data = deepcopy(NORMALIZED_DATA)
def __init__(
self,
@@ -140,13 +141,25 @@ class FeetechMotorsBus(MotorsBus):
self.write("Torque_Enable", motor, TorqueMode.ENABLED.value)
self.write("Lock", motor, 1)
def _encode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int:
sign_bit = self.encodings.get(data_name)
return encode_sign_magnitude(value, sign_bit) if sign_bit is not None else value
def _encode_sign(self, data_name: str, ids_values: dict[int, int]) -> dict[int, int]:
for id_ in ids_values:
model = self._id_to_model(id_)
encoding_table = self.model_encoding_table.get(model)
if encoding_table and data_name in encoding_table:
sign_bit = encoding_table[data_name]
ids_values[id_] = encode_sign_magnitude(ids_values[id_], sign_bit)
def _decode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int:
sign_bit = self.encodings.get(data_name)
return decode_sign_magnitude(value, sign_bit) if sign_bit is not None else value
return ids_values
def _decode_sign(self, data_name: str, ids_values: dict[int, int]) -> dict[int, int]:
for id_ in ids_values:
model = self._id_to_model(id_)
encoding_table = self.model_encoding_table.get(model)
if encoding_table and data_name in encoding_table:
sign_bit = encoding_table[data_name]
ids_values[id_] = decode_sign_magnitude(ids_values[id_], sign_bit)
return ids_values
@staticmethod
def _split_int_to_bytes(value: int, n_bytes: int) -> list[int]:
@@ -220,15 +233,15 @@ class FeetechMotorsBus(MotorsBus):
return data_list, scs.COMM_RX_CORRUPT
# find packet header
for id_ in range(0, (rx_length - 1)):
if (rxpacket[id_] == 0xFF) and (rxpacket[id_ + 1] == 0xFF):
for idx in range(0, (rx_length - 1)):
if (rxpacket[idx] == 0xFF) and (rxpacket[idx + 1] == 0xFF):
break
if id_ == 0: # found at the beginning of the packet
if idx == 0: # found at the beginning of the packet
# calculate checksum
checksum = 0
for id_ in range(2, status_length - 1): # except header & checksum
checksum += rxpacket[id_]
for idx in range(2, status_length - 1): # except header & checksum
checksum += rxpacket[idx]
checksum = scs.SCS_LOBYTE(~checksum)
if rxpacket[status_length - 1] == checksum:
@@ -247,34 +260,71 @@ class FeetechMotorsBus(MotorsBus):
rx_length = rx_length - 2
else:
# remove unnecessary packets
del rxpacket[0:id_]
rx_length = rx_length - id_
del rxpacket[0:idx]
rx_length = rx_length - idx
def broadcast_ping(self, num_retry: int = 0, raise_on_error: bool = False) -> dict[int, int] | None:
for n_try in range(1 + num_retry):
ids_status, comm = self._broadcast_ping()
if self._is_comm_success(comm):
break
logger.debug(f"Broadcast failed on port '{self.port}' ({n_try=})")
logger.debug(self.packet_handler.getRxPacketError(comm))
logger.debug(f"Broadcast ping failed on port '{self.port}' ({n_try=})")
logger.debug(self.packet_handler.getTxRxResult(comm))
if not self._is_comm_success(comm):
if raise_on_error:
raise ConnectionError(self.packet_handler.getRxPacketError(comm))
return ids_status if ids_status else None
raise ConnectionError(self.packet_handler.getTxRxResult(comm))
return
ids_errors = {id_: status for id_, status in ids_status.items() if self._is_error(status)}
if ids_errors:
display_dict = {id_: self.packet_handler.getRxPacketError(err) for id_, err in ids_errors.items()}
logger.error(f"Some motors found returned an error status:\n{pformat(display_dict, indent=4)}")
comm, model_numbers = self._sync_read(
"Model_Number", list(ids_status), model="scs_series", num_retry=num_retry
)
return self._get_model_number(list(ids_status), raise_on_error)
def _get_firmware_version(self, motor_ids: list[int], raise_on_error: bool = False) -> dict[int, int]:
# comm, major = self._sync_read(*FIRMWARE_MAJOR_VERSION, motor_ids)
# if not self._is_comm_success(comm):
# if raise_on_error:
# raise ConnectionError(self.packet_handler.getTxRxResult(comm))
# return
# comm, minor = self._sync_read(*FIRMWARE_MINOR_VERSION, motor_ids)
# if not self._is_comm_success(comm):
# if raise_on_error:
# raise ConnectionError(self.packet_handler.getTxRxResult(comm))
# return
# return {id_: f"{major[id_]}.{minor[id_]}" for id_ in motor_ids}
comm, firmware_versions = self._sync_read(*FIRMWARE_VERSION, motor_ids)
if not self._is_comm_success(comm):
if raise_on_error:
raise ConnectionError(self.packet_handler.getRxPacketError(comm))
raise ConnectionError(self.packet_handler.getTxRxResult(comm))
return
return model_numbers if model_numbers else None
return firmware_versions
def _get_model_number(self, motor_ids: list[int], raise_on_error: bool = False) -> dict[int, int]:
# comm, major = self._sync_read(*MODEL_MAJOR_VERSION, motor_ids)
# if not self._is_comm_success(comm):
# if raise_on_error:
# raise ConnectionError(self.packet_handler.getTxRxResult(comm))
# return
# comm, minor = self._sync_read(*MODEL_MINOR_VERSION, motor_ids)
# if not self._is_comm_success(comm):
# if raise_on_error:
# raise ConnectionError(self.packet_handler.getTxRxResult(comm))
# return
# return {id_: f"{major[id_]}.{minor[id_]}" for id_ in motor_ids}
comm, model_numbers = self._sync_read(*MODEL_NUMBER, motor_ids)
if not self._is_comm_success(comm):
if raise_on_error:
raise ConnectionError(self.packet_handler.getTxRxResult(comm))
return
return model_numbers

View File

@@ -1,10 +1,22 @@
FIRMWARE_MAJOR_VERSION = (0, 1)
FIRMWARE_MINOR_VERSION = (1, 1)
MODEL_MAJOR_VERSION = (3, 1)
MODEL_MINOR_VERSION = (4, 1)
FIRMWARE_VERSION = (0, 2)
MODEL_NUMBER = (3, 2)
# See this link for STS3215 Memory Table:
# https://docs.google.com/spreadsheets/d/1GVs7W1VS1PqdhA1nW-abeyAHhTUxKUdR/edit?usp=sharing&ouid=116566590112741600240&rtpof=true&sd=true
# data_name: (address, size_byte)
SCS_SERIES_CONTROL_TABLE = {
STS_SMS_SERIES_CONTROL_TABLE = {
# EPROM
"Firmware_Version": (0, 2),
"Model_Number": (3, 2),
"Firmware_Version": FIRMWARE_VERSION, # read-only
"Model_Number": MODEL_NUMBER, # read-only
# "Firmware_Major_Version": FIRMWARE_MAJOR_VERSION, # read-only
# "Firmware_Minor_Version": FIRMWARE_MINOR_VERSION, # read-only
# "Model_Major_Version": MODEL_MAJOR_VERSION, # read-only
# "Model_Minor_Version": MODEL_MINOR_VERSION,
"ID": (5, 1),
"Baud_Rate": (6, 1),
"Return_Delay_Time": (7, 1),
@@ -42,18 +54,75 @@ SCS_SERIES_CONTROL_TABLE = {
"Goal_Speed": (46, 2),
"Torque_Limit": (48, 2),
"Lock": (55, 1),
"Present_Position": (56, 2),
"Present_Speed": (58, 2),
"Present_Load": (60, 2),
"Present_Voltage": (62, 1),
"Present_Temperature": (63, 1),
"Status": (65, 1),
"Moving": (66, 1),
"Present_Current": (69, 2),
"Present_Position": (56, 2), # read-only
"Present_Speed": (58, 2), # read-only
"Present_Load": (60, 2), # read-only
"Present_Voltage": (62, 1), # read-only
"Present_Temperature": (63, 1), # read-only
"Status": (65, 1), # read-only
"Moving": (66, 1), # read-only
"Present_Current": (69, 2), # read-only
# Not in the Memory Table
"Maximum_Acceleration": (85, 2),
}
SCS_SERIES_CONTROL_TABLE = {
# EPROM
"Firmware_Version": FIRMWARE_VERSION, # read-only
"Model_Number": MODEL_NUMBER, # read-only
# "Firmware_Major_Version": FIRMWARE_MAJOR_VERSION, # read-only
# "Firmware_Minor_Version": FIRMWARE_MINOR_VERSION, # read-only
# "Model_Major_Version": MODEL_MAJOR_VERSION, # read-only
# "Model_Minor_Version": MODEL_MINOR_VERSION,
"ID": (5, 1),
"Baud_Rate": (6, 1),
"Return_Delay": (7, 1),
"Response_Status_Level": (8, 1),
"Min_Position_Limit": (9, 2),
"Max_Position_Limit": (11, 2),
"Max_Temperature_Limit": (13, 1),
"Max_Voltage_Limit": (14, 1),
"Min_Voltage_Limit": (15, 1),
"Max_Torque_Limit": (16, 2),
"Phase": (18, 1),
"Unloading_Condition": (19, 1),
"LED_Alarm_Condition": (20, 1),
"P_Coefficient": (21, 1),
"D_Coefficient": (22, 1),
"I_Coefficient": (23, 1),
"Minimum_Startup_Force": (24, 2),
"CW_Dead_Zone": (26, 1),
"CCW_Dead_Zone": (27, 1),
"Protective_Torque": (37, 1),
"Protection_Time": (38, 1),
# SRAM
"Torque_Enable": (40, 1),
"Acceleration": (41, 1),
"Goal_Position": (42, 2),
"Running_Time": (44, 2),
"Goal_Speed": (46, 2),
"Lock": (48, 1),
"Present_Position": (56, 2), # read-only
"Present_Speed": (58, 2), # read-only
"Present_Load": (60, 2), # read-only
"Present_Voltage": (62, 1), # read-only
"Present_Temperature": (63, 1), # read-only
"Sync_Write_Flag": (64, 1), # read-only
"Status": (65, 1), # read-only
"Moving": (66, 1), # read-only
}
STS_SMS_SERIES_BAUDRATE_TABLE = {
0: 1_000_000,
1: 500_000,
2: 250_000,
3: 128_000,
4: 115_200,
5: 57_600,
6: 38_400,
7: 19_200,
}
SCS_SERIES_BAUDRATE_TABLE = {
0: 1_000_000,
1: 500_000,
@@ -66,34 +135,52 @@ SCS_SERIES_BAUDRATE_TABLE = {
}
MODEL_CONTROL_TABLE = {
"sts_series": STS_SMS_SERIES_CONTROL_TABLE,
"scs_series": SCS_SERIES_CONTROL_TABLE,
"sts3215": SCS_SERIES_CONTROL_TABLE,
"sms_series": STS_SMS_SERIES_CONTROL_TABLE,
"sts3215": STS_SMS_SERIES_CONTROL_TABLE,
"sts3250": STS_SMS_SERIES_CONTROL_TABLE,
"scs0009": SCS_SERIES_CONTROL_TABLE,
"sm8512bl": STS_SMS_SERIES_CONTROL_TABLE,
}
MODEL_RESOLUTION = {
"scs_series": 4096,
"sts_series": 4096,
"sms_series": 4096,
"scs_series": 1024,
"sts3215": 4096,
}
# {model: model_number}
MODEL_NUMBER = {
"sts3215": 777,
"sts3250": 4096,
"sm8512bl": 4096,
"scs0009": 1024,
}
MODEL_BAUDRATE_TABLE = {
"sts_series": STS_SMS_SERIES_BAUDRATE_TABLE,
"sms_series": STS_SMS_SERIES_BAUDRATE_TABLE,
"scs_series": SCS_SERIES_BAUDRATE_TABLE,
"sts3215": SCS_SERIES_BAUDRATE_TABLE,
"sm8512bl": STS_SMS_SERIES_BAUDRATE_TABLE,
"sts3215": STS_SMS_SERIES_BAUDRATE_TABLE,
"sts3250": STS_SMS_SERIES_BAUDRATE_TABLE,
"scs0009": SCS_SERIES_BAUDRATE_TABLE,
}
NORMALIZATION_REQUIRED = ["Goal_Position", "Present_Position"]
# Sign-Magnitude encoding bits
ENCODINGS = {
STS_SMS_SERIES_ENCODINGS_TABLE = {
"Homing_Offset": 11,
"Goal_Speed": 15,
}
AVAILABLE_BAUDRATES = [
MODEL_ENCODING_TABLE = {
"sts_series": STS_SMS_SERIES_ENCODINGS_TABLE,
"sms_series": STS_SMS_SERIES_ENCODINGS_TABLE,
"scs_series": {},
"sts3215": STS_SMS_SERIES_ENCODINGS_TABLE,
"sts3250": STS_SMS_SERIES_ENCODINGS_TABLE,
"sm8512bl": STS_SMS_SERIES_ENCODINGS_TABLE,
"scs0009": {},
}
SCAN_BAUDRATES = [
4_800,
9_600,
14_400,
@@ -106,3 +193,11 @@ AVAILABLE_BAUDRATES = [
500_000,
1_000_000,
]
# {model: model_number} TODO
MODEL_NUMBER_TABLE = {
"sts3215": 777,
"sts3250": None,
"sm8512bl": None,
"scs0009": None,
}