Fix calibration

This commit is contained in:
Simon Alibert
2025-04-02 22:27:49 +02:00
parent 65569ba90e
commit 1ebd81552c
4 changed files with 154 additions and 83 deletions

View File

@@ -24,7 +24,7 @@ from enum import Enum
from lerobot.common.utils.encoding_utils import decode_twos_complement, encode_twos_complement
from ..motors_bus import Motor, MotorsBus, NameOrID, Value
from ..motors_bus import Motor, MotorCalibration, MotorsBus, NameOrID, Value
from .tables import (
AVAILABLE_BAUDRATES,
MODEL_BAUDRATE_TABLE,
@@ -102,8 +102,9 @@ class DynamixelMotorsBus(MotorsBus):
self,
port: str,
motors: dict[str, Motor],
calibration: dict[str, MotorCalibration] | None = None,
):
super().__init__(port, motors)
super().__init__(port, motors, calibration)
import dynamixel_sdk as dxl
self.port_handler = dxl.PortHandler(self.port)
@@ -113,12 +114,26 @@ class DynamixelMotorsBus(MotorsBus):
self._comm_success = dxl.COMM_SUCCESS
self._no_error = 0x00
def _configure_motors(self) -> None:
def configure_motors(self) -> None:
# By default, Dynamixel motors have a 500µs delay response time (corresponding to a value of 250 on
# the 'Return_Delay_Time' address). We ensure this is reduced to the minimum of 2µs (value of 0).
for id_ in self.ids:
self.write("Return_Delay_Time", id_, 0)
def _disable_torque(self, motors: list[NameOrID]) -> None:
for motor in motors:
self.write("Torque_Enable", motor, TorqueMode.DISABLED.value)
def _enable_torque(self, motors: list[NameOrID]) -> None:
for motor in motors:
self.write("Torque_Enable", motor, TorqueMode.ENABLED.value)
def _encode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int:
return encode_twos_complement(value, n_bytes)
def _decode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int:
return decode_twos_complement(value, n_bytes)
def _get_half_turn_homings(self, positions: dict[NameOrID, Value]) -> dict[NameOrID, Value]:
"""
On Dynamixel Motors:
@@ -132,12 +147,6 @@ class DynamixelMotorsBus(MotorsBus):
return half_turn_homings
def _encode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int:
return encode_twos_complement(value, n_bytes)
def _decode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int:
return decode_twos_complement(value, n_bytes)
@staticmethod
def _split_int_to_bytes(value: int, n_bytes: int) -> list[int]:
# Validate input

View File

@@ -19,7 +19,7 @@ from pprint import pformat
from lerobot.common.utils.encoding_utils import decode_sign_magnitude, encode_sign_magnitude
from ..motors_bus import Motor, MotorsBus, NameOrID, Value
from ..motors_bus import Motor, MotorCalibration, MotorsBus, NameOrID, Value
from .tables import (
AVAILABLE_BAUDRATES,
ENCODINGS,
@@ -82,8 +82,9 @@ class FeetechMotorsBus(MotorsBus):
self,
port: str,
motors: dict[str, Motor],
calibration: dict[str, MotorCalibration] | None = None,
):
super().__init__(port, motors)
super().__init__(port, motors, calibration)
import scservo_sdk as scs
self.port_handler = scs.PortHandler(self.port)
@@ -93,7 +94,7 @@ class FeetechMotorsBus(MotorsBus):
self._comm_success = scs.COMM_SUCCESS
self._no_error = 0x00
def _configure_motors(self) -> None:
def configure_motors(self) -> None:
# By default, Feetech motors have a 500µs delay response time (corresponding to a value of 250 on the
# 'Return_Delay' address). We ensure this is reduced to the minimum of 2µs (value of 0).
for id_ in self.ids:
@@ -112,6 +113,16 @@ class FeetechMotorsBus(MotorsBus):
return half_turn_homings
def _disable_torque(self, motors: list[NameOrID]) -> None:
for motor in motors:
self.write("Torque_Enable", motor, TorqueMode.DISABLED.value)
self.write("Lock", motor, 0)
def _enable_torque(self, motors: list[NameOrID]) -> None:
for motor in motors:
self.write("Torque_Enable", motor, TorqueMode.ENABLED.value)
self.write("Lock", motor, 1)
def _encode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int:
sign_bit = self.encodings.get(data_name)
return encode_sign_magnitude(value, sign_bit) if sign_bit is not None else value

View File

@@ -32,7 +32,7 @@ from deepdiff import DeepDiff
from tqdm import tqdm
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.utils.utils import _enter_pressed
from lerobot.common.utils.utils import enter_pressed, move_cursor_up
NameOrID: TypeAlias = str | int
Value: TypeAlias = int | float
@@ -265,9 +265,11 @@ class MotorsBus(abc.ABC):
self,
port: str,
motors: dict[str, Motor],
calibration: dict[str, MotorCalibration] | None = None,
):
self.port = port
self.motors = motors
self.calibration = calibration if calibration else {}
self.port_handler: PortHandler
self.packet_handler: PacketHandler
@@ -276,8 +278,6 @@ class MotorsBus(abc.ABC):
self._comm_success: int
self._no_error: int
self.calibration: dict[str, MotorCalibration] = {}
self._id_to_model_dict = {m.id: m.model for m in self.motors.values()}
self._id_to_name_dict = {m.id: name for name, m in self.motors.items()}
self._model_nb_to_model_dict = {v: k for k, v in self.model_number_table.items()}
@@ -396,7 +396,6 @@ class MotorsBus(abc.ABC):
) from e
self.set_timeout()
self.calibration = self.get_calibration()
logger.debug(f"{self.__class__.__name__} connected.")
@classmethod
@@ -420,7 +419,37 @@ class MotorsBus(abc.ABC):
return baudrate_ids
@abc.abstractmethod
def _configure_motors(self) -> None:
def configure_motors(self) -> None:
pass
def disable_torque(self, motors: NameOrID | list[NameOrID] | None = None) -> None:
pass
if motors is None:
motors = self.names
elif isinstance(motors, (str, int)):
motors = [motors]
elif not isinstance(motors, list):
raise TypeError(motors)
self._disable_torque(motors)
def enable_torque(self, motors: NameOrID | list[NameOrID] | None = None) -> None:
pass
if motors is None:
motors = self.names
elif isinstance(motors, (str, int)):
motors = [motors]
elif not isinstance(motors, list):
raise TypeError(motors)
self._enable_torque(motors)
@abc.abstractmethod
def _enable_torque(self, motors: list[NameOrID]) -> None:
pass
@abc.abstractmethod
def _disable_torque(self, motors: list[NameOrID]) -> None:
pass
def set_timeout(self, timeout_ms: int | None = None):
@@ -439,39 +468,7 @@ class MotorsBus(abc.ABC):
if self.port_handler.getBaudRate() != baudrate:
raise OSError("Failed to write bus baud rate.")
def set_half_turn_homings(self, motors: NameOrID | list[NameOrID] | None = None) -> dict[NameOrID, Value]:
"""This assumes motors present positions are roughly in the middle of their desired range"""
if motors is None:
motors = self.names
elif isinstance(motors, (str, int)):
motors = [motors]
else:
raise TypeError(motors)
# Step 1: Set homing and min max to 0
self.reset_homing_ranges(motors)
# Step 2: Read Present_Position which will be Actual_Position since
# Present_Position = Actual_Position ± Homing_Offset (1)
# and Homing_Offset = 0 from step 1
actual_positions = self.sync_read("Present_Position", motors, normalize=False)
# Step 3: We want to set the Homing_Offset such that the current Present_Position to be half range of
# 1 revolution.
# For instance, if 1 revolution corresponds to 4095 (4096 steps), this means we want the current
# Present_Position to be 2047. In that example:
# Present_Position = 2047 (2)
# Actual_Position = X (read in step 2)
# from (1) and (2):
# => Homing_Offset = ±(X - 2048)
homing_offsets = self._get_half_turn_homings(actual_positions)
for motor, offset in homing_offsets.items():
self.write("Homing_Offset", motor, offset, normalize=False)
return homing_offsets
def reset_homing_ranges(self, motors: NameOrID | list[NameOrID] | None = None) -> None:
def reset_calibration(self, motors: NameOrID | list[NameOrID] | None = None) -> None:
if motors is None:
motors = self.names
elif isinstance(motors, (str, int)):
@@ -486,39 +483,11 @@ class MotorsBus(abc.ABC):
self.write("Min_Position_Limit", motor, 0, normalize=False)
self.write("Max_Position_Limit", motor, max_res, normalize=False)
def register_ranges_of_motion(
self, motors: NameOrID | list[NameOrID] | None = None
) -> dict[NameOrID, dict[str, Value]]:
"""
This assumes that the homing offsets have been set such that all possible values in the range of
motion are positive and that the zero is not crossed. To that end, `set_half_turn_homings` should
typically be called prior to this.
"""
if motors is None:
motors = self.names
elif isinstance(motors, (str, int)):
motors = [motors]
elif not isinstance(motors, list):
raise TypeError(motors)
@property
def is_calibrated(self) -> bool:
return self.calibration == self.read_calibration()
start_positions = self.sync_read("Present_Position", motors, normalize=False)
mins = start_positions.copy()
maxes = start_positions.copy()
while True:
positions = self.sync_read("Present_Position", motors, normalize=False)
mins = {motor: min(positions[motor], min_) for motor, min_ in mins.items()}
maxes = {motor: max(positions[motor], max_) for motor, max_ in maxes.items()}
if _enter_pressed():
break
for motor in motors:
self.write("Min_Position_Limit", motor, mins[motor], normalize=False)
self.write("Max_Position_Limit", motor, maxes[motor], normalize=False)
return {motor: {"min": mins[motor], "max": maxes[motor]} for motor in motors}
def get_calibration(self) -> dict[str, MotorCalibration]:
def read_calibration(self) -> dict[str, MotorCalibration]:
offsets = self.sync_read("Homing_Offset", normalize=False)
mins = self.sync_read("Min_Position_Limit", normalize=False)
maxes = self.sync_read("Max_Position_Limit", normalize=False)
@@ -540,6 +509,83 @@ class MotorsBus(abc.ABC):
return calibration
def write_calibration(self, calibration_dict: dict[str, MotorCalibration]) -> None:
for motor, calibration in calibration_dict.items():
self.write("Homing_Offset", motor, calibration.homing_offset)
self.write("Min_Position_Limit", motor, calibration.range_min)
self.write("Max_Position_Limit", motor, calibration.range_max)
self.calibration = calibration_dict
def set_half_turn_homings(self, motors: NameOrID | list[NameOrID] | None = None) -> dict[NameOrID, Value]:
"""This assumes motors present positions are roughly in the middle of their desired range"""
if motors is None:
motors = self.names
elif isinstance(motors, (str, int)):
motors = [motors]
else:
raise TypeError(motors)
# Step 1: Set homing and min max to 0
self.reset_calibration(motors)
# Step 2: Read Present_Position which will be Actual_Position since
# Present_Position = Actual_Position ± Homing_Offset (1)
# and Homing_Offset = 0 from step 1
actual_positions = self.sync_read("Present_Position", motors, normalize=False)
# Step 3: We want to set the Homing_Offset such that the current Present_Position to be half range of
# 1 revolution.
# For instance, if 1 revolution corresponds to 4095 (4096 steps), this means we want the current
# Present_Position to be 2047. In that example:
# Present_Position = 2047 (2)
# Actual_Position = X (read in step 2)
# from (1) and (2):
# => Homing_Offset = ±(X - 2048)
homing_offsets = self._get_half_turn_homings(actual_positions)
for motor, offset in homing_offsets.items():
self.write("Homing_Offset", motor, offset)
return homing_offsets
def record_ranges_of_motion(
self, motors: NameOrID | list[NameOrID] | None = None, display_values: bool = True
) -> tuple[dict[NameOrID, Value], dict[NameOrID, Value]]:
"""
This assumes that the homing offsets have been set such that all possible values in the range of
motion are positive and that the zero is not crossed. To that end, `set_half_turn_homings` should
typically be called prior to this.
"""
if motors is None:
motors = self.names
elif isinstance(motors, (str, int)):
motors = [motors]
elif not isinstance(motors, list):
raise TypeError(motors)
start_positions = self.sync_read("Present_Position", motors, normalize=False)
mins = start_positions.copy()
maxes = start_positions.copy()
while True:
positions = self.sync_read("Present_Position", motors, normalize=False)
mins = {motor: min(positions[motor], min_) for motor, min_ in mins.items()}
maxes = {motor: max(positions[motor], max_) for motor, max_ in maxes.items()}
if display_values:
print("\n-------------------------------------------")
print(f"{'NAME':<15} | {'MIN':>6} | {'POS':>6} | {'MAX':>6}")
for name in motors:
print(f"{name:<15} | {mins[name]:>6} | {positions[name]:>6} | {maxes[name]:>6}")
if enter_pressed():
break
if display_values:
# Move cursor up to overwrite the previous output
move_cursor_up(len(motors) + 3)
return mins, maxes
@abc.abstractmethod
def _get_half_turn_homings(self, positions: dict[NameOrID, Value]) -> dict[NameOrID, Value]:
pass

View File

@@ -232,5 +232,10 @@ def is_valid_numpy_dtype_string(dtype_str: str) -> bool:
return False
def _enter_pressed() -> bool:
def enter_pressed() -> bool:
return select.select([sys.stdin], [], [], 0)[0] and sys.stdin.readline().strip() == ""
def move_cursor_up(lines):
"""Move the cursor up by a specified number of lines."""
print(f"\033[{lines}A", end="")