Hack feetech firmware bug
This commit is contained in:
@@ -24,7 +24,7 @@ from enum import Enum
|
|||||||
|
|
||||||
from lerobot.common.utils.encoding_utils import decode_twos_complement, encode_twos_complement
|
from lerobot.common.utils.encoding_utils import decode_twos_complement, encode_twos_complement
|
||||||
|
|
||||||
from ..motors_bus import Motor, MotorCalibration, MotorsBus, NameOrID, Value
|
from ..motors_bus import Motor, MotorCalibration, MotorsBus, NameOrID, Value, get_address
|
||||||
from .tables import (
|
from .tables import (
|
||||||
AVAILABLE_BAUDRATES,
|
AVAILABLE_BAUDRATES,
|
||||||
MODEL_BAUDRATE_TABLE,
|
MODEL_BAUDRATE_TABLE,
|
||||||
@@ -192,3 +192,20 @@ class DynamixelMotorsBus(MotorsBus):
|
|||||||
return data_list if data_list else None
|
return data_list if data_list else None
|
||||||
|
|
||||||
return {id_: data[0] for id_, data in data_list.items()}
|
return {id_: data[0] for id_, data in data_list.items()}
|
||||||
|
|
||||||
|
def _write(self, data_name: str, motor_id: int, value: int, num_retry: int = 0) -> tuple[int, int]:
|
||||||
|
model = self._id_to_model(motor_id)
|
||||||
|
addr, n_bytes = get_address(self.model_ctrl_table, model, data_name)
|
||||||
|
value = self._encode_value(value, data_name, n_bytes)
|
||||||
|
data = self._split_int_to_bytes(value, n_bytes)
|
||||||
|
|
||||||
|
for n_try in range(1 + num_retry):
|
||||||
|
comm, error = self.packet_handler.writeTxRx(self.port_handler, motor_id, addr, n_bytes, data)
|
||||||
|
if self._is_comm_success(comm):
|
||||||
|
break
|
||||||
|
logger.debug(
|
||||||
|
f"Failed to write '{data_name}' ({addr=} {n_bytes=}) on {motor_id=} with '{value}' ({n_try=})"
|
||||||
|
)
|
||||||
|
logger.debug(self.packet_handler.getRxPacketError(comm))
|
||||||
|
|
||||||
|
return comm, error
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ from pprint import pformat
|
|||||||
|
|
||||||
from lerobot.common.utils.encoding_utils import decode_sign_magnitude, encode_sign_magnitude
|
from lerobot.common.utils.encoding_utils import decode_sign_magnitude, encode_sign_magnitude
|
||||||
|
|
||||||
from ..motors_bus import Motor, MotorCalibration, MotorsBus, NameOrID, Value
|
from ..motors_bus import Motor, MotorCalibration, MotorsBus, NameOrID, Value, get_address
|
||||||
from .tables import (
|
from .tables import (
|
||||||
AVAILABLE_BAUDRATES,
|
AVAILABLE_BAUDRATES,
|
||||||
ENCODINGS,
|
ENCODINGS,
|
||||||
@@ -261,3 +261,36 @@ class FeetechMotorsBus(MotorsBus):
|
|||||||
return model_numbers if model_numbers else None
|
return model_numbers if model_numbers else None
|
||||||
|
|
||||||
return model_numbers
|
return model_numbers
|
||||||
|
|
||||||
|
def _is_eprom(self, address: int) -> bool:
|
||||||
|
"""
|
||||||
|
HACK: because of https://gitee.com/ftservo/SCServoSDK/issues/IBY2S6
|
||||||
|
When writing to EPROM on for Feetech motors with Lock=0, we need to:
|
||||||
|
- 1. write several times
|
||||||
|
- 2. reset serial's buffer
|
||||||
|
"""
|
||||||
|
return address < 40
|
||||||
|
|
||||||
|
def _write(self, data_name: str, motor_id: int, value: int, num_retry: int = 0) -> tuple[int, int]:
|
||||||
|
model = self._id_to_model(motor_id)
|
||||||
|
addr, n_bytes = get_address(self.model_ctrl_table, model, data_name)
|
||||||
|
value = self._encode_value(value, data_name, n_bytes)
|
||||||
|
data = self._split_int_to_bytes(value, n_bytes)
|
||||||
|
|
||||||
|
if self._is_eprom(addr): # HACK
|
||||||
|
num_retry = max(num_retry, 5)
|
||||||
|
|
||||||
|
for n_try in range(1 + num_retry):
|
||||||
|
comm, error = self.packet_handler.writeTxRx(self.port_handler, motor_id, addr, n_bytes, data)
|
||||||
|
if self._is_comm_success(comm):
|
||||||
|
break
|
||||||
|
logger.debug(
|
||||||
|
f"Failed to write '{data_name}' ({addr=} {n_bytes=}) on {motor_id=} with '{value}' ({n_try=})"
|
||||||
|
)
|
||||||
|
logger.debug(self.packet_handler.getRxPacketError(comm))
|
||||||
|
|
||||||
|
if self._is_eprom(addr): # HACK
|
||||||
|
self.port_handler.ser.reset_output_buffer()
|
||||||
|
self.port_handler.ser.reset_input_buffer()
|
||||||
|
|
||||||
|
return comm, error
|
||||||
|
|||||||
@@ -2,6 +2,7 @@
|
|||||||
# https://docs.google.com/spreadsheets/d/1GVs7W1VS1PqdhA1nW-abeyAHhTUxKUdR/edit?usp=sharing&ouid=116566590112741600240&rtpof=true&sd=true
|
# https://docs.google.com/spreadsheets/d/1GVs7W1VS1PqdhA1nW-abeyAHhTUxKUdR/edit?usp=sharing&ouid=116566590112741600240&rtpof=true&sd=true
|
||||||
# data_name: (address, size_byte)
|
# data_name: (address, size_byte)
|
||||||
SCS_SERIES_CONTROL_TABLE = {
|
SCS_SERIES_CONTROL_TABLE = {
|
||||||
|
# EPROM
|
||||||
"Firmware_Version": (0, 2),
|
"Firmware_Version": (0, 2),
|
||||||
"Model_Number": (3, 2),
|
"Model_Number": (3, 2),
|
||||||
"ID": (5, 1),
|
"ID": (5, 1),
|
||||||
@@ -33,6 +34,7 @@ SCS_SERIES_CONTROL_TABLE = {
|
|||||||
"Speed_closed_loop_P_proportional_coefficient": (37, 1),
|
"Speed_closed_loop_P_proportional_coefficient": (37, 1),
|
||||||
"Over_Current_Protection_Time": (38, 1),
|
"Over_Current_Protection_Time": (38, 1),
|
||||||
"Velocity_closed_loop_I_integral_coefficient": (39, 1),
|
"Velocity_closed_loop_I_integral_coefficient": (39, 1),
|
||||||
|
# SRAM
|
||||||
"Torque_Enable": (40, 1),
|
"Torque_Enable": (40, 1),
|
||||||
"Acceleration": (41, 1),
|
"Acceleration": (41, 1),
|
||||||
"Goal_Position": (42, 2),
|
"Goal_Position": (42, 2),
|
||||||
|
|||||||
@@ -888,28 +888,20 @@ class MotorsBus(abc.ABC):
|
|||||||
f"\n{self.packet_handler.getRxPacketError(error)}"
|
f"\n{self.packet_handler.getRxPacketError(error)}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
def _write(self, data_name: str, motor_id: int, value: int, num_retry: int = 0) -> tuple[int, int]:
|
def _write(self, data_name: str, motor_id: int, value: int, num_retry: int = 0) -> tuple[int, int]:
|
||||||
model = self._id_to_model(motor_id)
|
pass
|
||||||
addr, n_bytes = get_address(self.model_ctrl_table, model, data_name)
|
|
||||||
value = self._encode_value(value, data_name, n_bytes)
|
|
||||||
data = self._split_int_to_bytes(value, n_bytes)
|
|
||||||
|
|
||||||
for n_try in range(1 + num_retry):
|
def disconnect(self, disable_torque: bool = True) -> None:
|
||||||
comm, error = self.packet_handler.writeTxRx(self.port_handler, motor_id, addr, n_bytes, data)
|
|
||||||
if self._is_comm_success(comm):
|
|
||||||
break
|
|
||||||
logger.debug(
|
|
||||||
f"Failed to write '{data_name}' ({addr=} {n_bytes=}) on {motor_id=} with '{value}' ({n_try=})"
|
|
||||||
)
|
|
||||||
logger.debug(self.packet_handler.getRxPacketError(comm))
|
|
||||||
|
|
||||||
return comm, error
|
|
||||||
|
|
||||||
def disconnect(self) -> None:
|
|
||||||
if not self.is_connected:
|
if not self.is_connected:
|
||||||
raise DeviceNotConnectedError(
|
raise DeviceNotConnectedError(
|
||||||
f"{self.__class__.__name__}('{self.port}') is not connected. Try running `{self.__class__.__name__}.connect()` first."
|
f"{self.__class__.__name__}('{self.port}') is not connected. Try running `{self.__class__.__name__}.connect()` first."
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if disable_torque:
|
||||||
|
self.port_handler.clearPort()
|
||||||
|
self.port_handler.is_using = False
|
||||||
|
self.disable_torque()
|
||||||
|
|
||||||
self.port_handler.closePort()
|
self.port_handler.closePort()
|
||||||
logger.debug(f"{self.__class__.__name__} disconnected.")
|
logger.debug(f"{self.__class__.__name__} disconnected.")
|
||||||
|
|||||||
Reference in New Issue
Block a user