From d6007c6e7d2c4ded400bd320fac52d078719a3b4 Mon Sep 17 00:00:00 2001 From: Simon Alibert Date: Sun, 30 Mar 2025 15:41:39 +0200 Subject: [PATCH] Add calibration utilities --- lerobot/common/motors/dynamixel/dynamixel.py | 75 +++++++-- lerobot/common/motors/feetech/feetech.py | 72 +++++++-- lerobot/common/motors/feetech/tables.py | 18 ++- lerobot/common/motors/motors_bus.py | 160 ++++++++++++++++--- lerobot/common/utils/utils.py | 6 + 5 files changed, 277 insertions(+), 54 deletions(-) diff --git a/lerobot/common/motors/dynamixel/dynamixel.py b/lerobot/common/motors/dynamixel/dynamixel.py index 5db015ef..f7e4569c 100644 --- a/lerobot/common/motors/dynamixel/dynamixel.py +++ b/lerobot/common/motors/dynamixel/dynamixel.py @@ -22,19 +22,50 @@ import logging from copy import deepcopy from enum import Enum -from ..motors_bus import Motor, MotorsBus -from .tables import MODEL_BAUDRATE_TABLE, MODEL_CONTROL_TABLE, MODEL_NUMBER, MODEL_RESOLUTION +from ..motors_bus import Motor, MotorsBus, NameOrID, Value +from .tables import ( + AVAILABLE_BAUDRATES, + MODEL_BAUDRATE_TABLE, + MODEL_CONTROL_TABLE, + MODEL_NUMBER, + MODEL_RESOLUTION, +) PROTOCOL_VERSION = 2.0 BAUDRATE = 1_000_000 DEFAULT_TIMEOUT_MS = 1000 -CALIBRATION_REQUIRED = ["Goal_Position", "Present_Position"] +NORMALIZATION_REQUIRED = ["Goal_Position", "Present_Position"] CONVERT_UINT32_TO_INT32_REQUIRED = ["Goal_Position", "Present_Position"] logger = logging.getLogger(__name__) +def encode_twos_complement(value: int, n_bytes: int): + if value >= 0: + return value + + bit_width = n_bytes * 8 + min_val = -(1 << (bit_width - 1)) + max_val = (1 << (bit_width - 1)) - 1 + + if not (min_val <= value <= max_val): + raise ValueError( + f"Value {value} out of range for {n_bytes}-byte two's complement: [{min_val}, {max_val}]" + ) + + return (1 << bit_width) + value + + +def decode_twos_complement(value: int, n_bytes: int) -> int: + # https://en.wikipedia.org/wiki/Two%27s_complement + bits = n_bytes * 8 + sign_bit = 1 << (bits - 1) + if value & sign_bit: + value -= 1 << bits + return value + + class OperatingMode(Enum): # DYNAMIXEL only controls current(torque) regardless of speed and position. This mode is ideal for a # gripper or a system that only uses current(torque) control or a system that has additional @@ -82,12 +113,13 @@ class DynamixelMotorsBus(MotorsBus): https://emanual.robotis.com/docs/en/software/dynamixel/dynamixel_sdk/sample_code/python_read_write_protocol_2_0/#python-read-write-protocol-20 """ - model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE) - model_resolution_table = deepcopy(MODEL_RESOLUTION) - model_baudrate_table = deepcopy(MODEL_BAUDRATE_TABLE) - model_number_table = deepcopy(MODEL_NUMBER) - calibration_required = deepcopy(CALIBRATION_REQUIRED) + available_baudrates = deepcopy(AVAILABLE_BAUDRATES) default_timeout = DEFAULT_TIMEOUT_MS + model_baudrate_table = deepcopy(MODEL_BAUDRATE_TABLE) + model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE) + model_number_table = deepcopy(MODEL_NUMBER) + model_resolution_table = deepcopy(MODEL_RESOLUTION) + normalization_required = deepcopy(NORMALIZATION_REQUIRED) def __init__( self, @@ -110,14 +142,33 @@ class DynamixelMotorsBus(MotorsBus): for id_ in self.ids: self.write("Return_Delay_Time", id_, 0) - def _calibrate_values(self, ids_values: dict[int, int]) -> dict[int, float]: + def _get_half_turn_homings(self, positions: dict[NameOrID, Value]) -> dict[NameOrID, Value]: + """ + On Dynamixel Motors: + Present_Position = Actual_Position + Homing_Offset + """ + half_turn_homings = {} + for motor, pos in positions.items(): + model = self._get_motor_model(motor) + max_res = self.model_resolution_table[model] - 1 + half_turn_homings[motor] = int(max_res / 2) - pos + + return half_turn_homings + + def _normalize(self, ids_values: dict[int, int]) -> dict[int, float]: # TODO return ids_values - def _uncalibrate_values(self, ids_values: dict[int, float]) -> dict[int, int]: + def _unnormalize(self, ids_values: dict[int, float]) -> dict[int, int]: # TODO return ids_values + def _encode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int: + return encode_twos_complement(value, n_bytes) + + def _decode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int: + return decode_twos_complement(value, n_bytes) + @staticmethod def _split_int_to_bytes(value: int, n_bytes: int) -> list[int]: # Validate input @@ -154,11 +205,11 @@ class DynamixelMotorsBus(MotorsBus): if self._is_comm_success(comm): break logger.debug(f"Broadcast failed on port '{self.port}' ({n_try=})") - logger.debug(self.packet_handler.getRxPacketError(comm)) + logger.debug(self.packet_handler.getTxRxResult(comm)) if not self._is_comm_success(comm): if raise_on_error: - raise ConnectionError(self.packet_handler.getRxPacketError(comm)) + raise ConnectionError(self.packet_handler.getTxRxResult(comm)) return data_list if data_list else None diff --git a/lerobot/common/motors/feetech/feetech.py b/lerobot/common/motors/feetech/feetech.py index 54f86824..6eb37314 100644 --- a/lerobot/common/motors/feetech/feetech.py +++ b/lerobot/common/motors/feetech/feetech.py @@ -17,13 +17,15 @@ from copy import deepcopy from enum import Enum from pprint import pformat -from ..motors_bus import Motor, MotorsBus +from ..motors_bus import Motor, MotorsBus, NameOrID, Value from .tables import ( - CALIBRATION_REQUIRED, + AVAILABLE_BAUDRATES, + ENCODINGS, MODEL_BAUDRATE_TABLE, MODEL_CONTROL_TABLE, MODEL_NUMBER, MODEL_RESOLUTION, + NORMALIZATION_REQUIRED, ) PROTOCOL_VERSION = 0 @@ -33,6 +35,29 @@ DEFAULT_TIMEOUT_MS = 1000 logger = logging.getLogger(__name__) +def encode_sign_magnitude(value: int, sign_bit_index: int): + """ + https://en.wikipedia.org/wiki/Signed_number_representations#Sign%E2%80%93magnitude + """ + max_magnitude = (1 << sign_bit_index) - 1 + magnitude = abs(value) + if magnitude > max_magnitude: + raise ValueError(f"Magnitude {magnitude} exceeds {max_magnitude} (max for {sign_bit_index=})") + + direction_bit = 1 if value < 0 else 0 + return (direction_bit << sign_bit_index) | magnitude + + +def decode_sign_magnitude(encoded_value: int, sign_bit_index: int): + """ + https://en.wikipedia.org/wiki/Signed_number_representations#Sign%E2%80%93magnitude + """ + direction_bit = (encoded_value >> sign_bit_index) & 1 + magnitude_mask = (1 << sign_bit_index) - 1 + magnitude = encoded_value & magnitude_mask + return -magnitude if direction_bit else magnitude + + class OperatingMode(Enum): # position servo mode POSITION = 0 @@ -63,12 +88,16 @@ class FeetechMotorsBus(MotorsBus): python feetech sdk to communicate with the motors, which is itself based on the dynamixel sdk. """ - model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE) - model_resolution_table = deepcopy(MODEL_RESOLUTION) - model_baudrate_table = deepcopy(MODEL_BAUDRATE_TABLE) - model_number_table = deepcopy(MODEL_NUMBER) - calibration_required = deepcopy(CALIBRATION_REQUIRED) + available_baudrates = deepcopy(AVAILABLE_BAUDRATES) default_timeout = DEFAULT_TIMEOUT_MS + model_baudrate_table = deepcopy(MODEL_BAUDRATE_TABLE) + model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE) + model_number_table = deepcopy(MODEL_NUMBER) + model_resolution_table = deepcopy(MODEL_RESOLUTION) + normalization_required = deepcopy(NORMALIZATION_REQUIRED) + + # Feetech specific + encodings = deepcopy(ENCODINGS) def __init__( self, @@ -89,16 +118,37 @@ class FeetechMotorsBus(MotorsBus): # By default, Feetech motors have a 500µs delay response time (corresponding to a value of 250 on the # 'Return_Delay' address). We ensure this is reduced to the minimum of 2µs (value of 0). for id_ in self.ids: - self.write("Return_Delay", id_, 0) + self.write("Return_Delay_Time", id_, 0) - def _calibrate_values(self, ids_values: dict[int, int]) -> dict[int, float]: + def _get_half_turn_homings(self, positions: dict[NameOrID, Value]) -> dict[NameOrID, Value]: + """ + On Feetech Motors: + Present_Position = Actual_Position - Homing_Offset + """ + half_turn_homings = {} + for motor, pos in positions.items(): + model = self._get_motor_model(motor) + max_res = self.model_resolution_table[model] - 1 + half_turn_homings[motor] = pos - int(max_res / 2) + + return half_turn_homings + + def _normalize(self, data_name: str, ids_values: dict[int, int]) -> dict[int, float]: # TODO return ids_values - def _uncalibrate_values(self, ids_values: dict[int, float]) -> dict[int, int]: + def _unnormalize(self, data_name: str, ids_values: dict[int, float]) -> dict[int, int]: # TODO return ids_values + def _encode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int: + sign_bit = self.encodings.get(data_name) + return encode_sign_magnitude(value, sign_bit) if sign_bit is not None else value + + def _decode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int: + sign_bit = self.encodings.get(data_name) + return decode_sign_magnitude(value, sign_bit) if sign_bit is not None else value + @staticmethod def _split_int_to_bytes(value: int, n_bytes: int) -> list[int]: # Validate input @@ -114,8 +164,6 @@ class FeetechMotorsBus(MotorsBus): import scservo_sdk as scs - # Note: No need to convert back into unsigned int, since this byte preprocessing - # already handles it for us. if n_bytes == 1: data = [value] elif n_bytes == 2: diff --git a/lerobot/common/motors/feetech/tables.py b/lerobot/common/motors/feetech/tables.py index a947611d..0ea0962b 100644 --- a/lerobot/common/motors/feetech/tables.py +++ b/lerobot/common/motors/feetech/tables.py @@ -6,10 +6,10 @@ SCS_SERIES_CONTROL_TABLE = { "Model_Number": (3, 2), "ID": (5, 1), "Baud_Rate": (6, 1), - "Return_Delay": (7, 1), + "Return_Delay_Time": (7, 1), "Response_Status_Level": (8, 1), - "Min_Angle_Limit": (9, 2), - "Max_Angle_Limit": (11, 2), + "Min_Position_Limit": (9, 2), + "Max_Position_Limit": (11, 2), "Max_Temperature_Limit": (13, 1), "Max_Voltage_Limit": (14, 1), "Min_Voltage_Limit": (15, 1), @@ -25,8 +25,8 @@ SCS_SERIES_CONTROL_TABLE = { "CCW_Dead_Zone": (27, 1), "Protection_Current": (28, 2), "Angular_Resolution": (30, 1), - "Offset": (31, 2), - "Mode": (33, 1), + "Homing_Offset": (31, 2), + "Operating_Mode": (33, 1), "Protective_Torque": (34, 1), "Protection_Time": (35, 1), "Overload_Torque": (36, 1), @@ -83,6 +83,14 @@ MODEL_BAUDRATE_TABLE = { "sts3215": SCS_SERIES_BAUDRATE_TABLE, } +NORMALIZATION_REQUIRED = ["Goal_Position", "Present_Position"] + +# Sign-Magnitude encoding bits +ENCODINGS = { + "Homing_Offset": 11, + "Goal_Speed": 15, +} + AVAILABLE_BAUDRATES = [ 4800, 9600, diff --git a/lerobot/common/motors/motors_bus.py b/lerobot/common/motors/motors_bus.py index 1dde8e77..7389f88a 100644 --- a/lerobot/common/motors/motors_bus.py +++ b/lerobot/common/motors/motors_bus.py @@ -32,6 +32,7 @@ from deepdiff import DeepDiff from tqdm import tqdm from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError +from lerobot.common.utils.utils import _enter_pressed NameOrID: TypeAlias = str | int Value: TypeAlias = int | float @@ -243,12 +244,13 @@ class MotorsBus(abc.ABC): ``` """ - model_ctrl_table: dict[str, dict] - model_resolution_table: dict[str, int] - model_baudrate_table: dict[str, dict] - model_number_table: dict[str, int] - calibration_required: list[str] + available_baudrates: list[int] default_timeout: int + model_baudrate_table: dict[str, dict] + model_ctrl_table: dict[str, dict] + model_number_table: dict[str, int] + model_resolution_table: dict[str, int] + normalization_required: list[str] def __init__( self, @@ -257,7 +259,6 @@ class MotorsBus(abc.ABC): ): self.port = port self.motors = motors - self._validate_motors() self.port_handler: PortHandler self.packet_handler: PacketHandler @@ -322,6 +323,14 @@ class MotorsBus(abc.ABC): else: raise TypeError(f"'{motor}' should be int, str.") + def _get_motor_model(self, motor: NameOrID) -> int: + if isinstance(motor, str): + return self.motors[motor].model + elif isinstance(motor, int): + return self._id_to_model_dict[motor] + else: + raise TypeError(f"'{motor}' should be int, str.") + def _validate_motors(self) -> None: if len(self.ids) != len(set(self.ids)): raise ValueError(f"Some motors have the same id!\n{self}") @@ -337,9 +346,10 @@ class MotorsBus(abc.ABC): return error != self._no_error def _assert_motors_exist(self) -> None: + # TODO(aliberts): collect all wrong ids/models and display them at once found_models = self.broadcast_ping() expected_models = {m.id: self.model_number_table[m.model] for m in self.motors.values()} - if not set(found_models) == set(self.ids): + if not found_models or set(found_models) != set(self.ids): raise RuntimeError( f"{self.__class__.__name__} is supposed to have these motors: ({{id: model_nb}})" f"\n{pformat(expected_models, indent=4, sort_dicts=False)}\n" @@ -371,11 +381,10 @@ class MotorsBus(abc.ABC): elif assert_motors_exist: self._assert_motors_exist() except (FileNotFoundError, OSError, serial.SerialException) as e: - logger.error( + raise ConnectionError( f"\nCould not connect on port '{self.port}'. Make sure you are using the correct port." "\nTry running `python lerobot/scripts/find_motors_bus_port.py`\n" - ) - raise e + ) from e self.set_timeout() logger.debug(f"{self.__class__.__name__} connected.") @@ -433,12 +442,107 @@ class MotorsBus(abc.ABC): logger.error(e) return False + def set_half_turn_homings(self, motors: NameOrID | list[NameOrID] | None = None) -> dict[NameOrID, Value]: + """This assumes motors present positions are roughly in the middle of their desired range""" + if motors is None: + motors = self.names + elif isinstance(motors, (str, int)): + motors = [motors] + else: + raise TypeError(motors) + + # Step 1: Set homing and min max to 0 + self.reset_homing_ranges(motors) + + # Step 2: Read Present_Position which will be Actual_Position since + # Present_Position = Actual_Position ± Homing_Offset (1) + # and Homing_Offset = 0 from step 1 + actual_positions = self.sync_read("Present_Position", motors, normalize=True) + + # Step 3: We want to set the Homing_Offset such that the current Present_Position to be half range of + # 1 revolution. + # For instance, if 1 revolution corresponds to 4095 (4096 steps), this means we want the current + # Present_Position to be 2047. In that example: + # Present_Position = 2047 (2) + # Actual_Position = X (read in step 2) + # from (1) and (2): + # => Homing_Offset = ±(X - 2048) + homing_offsets = self._get_half_turn_homings(actual_positions) + + for motor, offset in homing_offsets.items(): + self.write("Homing_Offset", motor, offset, normalize=False) + + return homing_offsets + + def reset_homing_ranges(self, motors: NameOrID | list[NameOrID] | None = None) -> None: + if motors is None: + motors = self.names + elif isinstance(motors, (str, int)): + motors = [motors] + else: + raise TypeError(motors) + + for motor in motors: + model = self._get_motor_model(motor) + max_res = self.model_resolution_table[model] - 1 + self.write("Homing_Offset", motor, 0, normalize=False) + self.write("Min_Position_Limit", motor, 0, normalize=False) + self.write("Max_Position_Limit", motor, max_res, normalize=False) + + def register_ranges_of_motion( + self, motors: NameOrID | list[NameOrID] | None = None + ) -> dict[NameOrID, dict[str, Value]]: + """ + This assumes that the homing offsets have been set such that all possible values in the range of + motion are positive and that the zero is not crossed. To that end, `set_half_turn_homings` should + typically be called prior to this. + """ + if motors is None: + motors = self.names + elif isinstance(motors, (str, int)): + motors = [motors] + else: + raise TypeError(motors) + + start_positions = self.sync_read("Present_Position", motors, normalize=False) + mins = start_positions.copy() + maxes = start_positions.copy() + while True: + positions = self.sync_read("Present_Position", motors, normalize=False) + mins = {motor: min(positions[motor], min_) for motor, min_ in mins.items()} + maxes = {motor: max(positions[motor], max_) for motor, max_ in maxes.items()} + + if _enter_pressed(): + break + + for motor in motors: + self.write("Min_Position_Limit", motor, mins[motor], normalize=False) + self.write("Max_Position_Limit", motor, maxes[motor], normalize=False) + + return {motor: {"min": mins[motor], "max": maxes[motor]} for motor in motors} + @abc.abstractmethod - def _calibrate_values(self, ids_values: dict[int, int]) -> dict[int, float]: + def _get_half_turn_homings(self, positions: dict[NameOrID, Value]) -> dict[NameOrID, Value]: pass @abc.abstractmethod - def _uncalibrate_values(self, ids_values: dict[int, float]) -> dict[int, int]: + def _normalize(self, data_name: str, ids_values: dict[int, int]) -> dict[int, float]: + pass + + @abc.abstractmethod + def _unnormalize(self, data_name: str, ids_values: dict[int, float]) -> dict[int, int]: + pass + + @abc.abstractmethod + def _encode_value( + self, value: int, data_name: str | None = None, n_bytes: int | None = None + ) -> dict[int, int]: + pass + + @abc.abstractmethod + def _decode_value( + self, value: int, data_name: str | None = None, n_bytes: int | None = None + ) -> dict[int, int]: pass @staticmethod @@ -508,7 +612,7 @@ class MotorsBus(abc.ABC): @overload def sync_read( - self, data_name: str, motors: None = ..., *, raw_values: bool = ..., num_retry: int = ... + self, data_name: str, motors: None = ..., *, normalize: bool = ..., num_retry: int = ... ) -> dict[str, Value]: ... @overload def sync_read( @@ -516,7 +620,7 @@ class MotorsBus(abc.ABC): data_name: str, motors: NameOrID | list[NameOrID], *, - raw_values: bool = ..., + normalize: bool = ..., num_retry: int = ..., ) -> dict[NameOrID, Value]: ... def sync_read( @@ -524,7 +628,7 @@ class MotorsBus(abc.ABC): data_name: str, motors: NameOrID | list[NameOrID] | None = None, *, - raw_values: bool = False, + normalize: bool = True, num_retry: int = 0, ) -> dict[NameOrID, Value]: if not self.is_connected: @@ -551,8 +655,8 @@ class MotorsBus(abc.ABC): f"{self.packet_handler.getTxRxResult(comm)}" ) - if not raw_values and data_name in self.calibration_required and self.calibration is not None: - ids_values = self._calibrate_values(ids_values) + if normalize and data_name in self.normalization_required and self.calibration is not None: + ids_values = self._normalize(ids_values) return {id_key_map[id_]: val for id_, val in ids_values.items()} @@ -579,7 +683,11 @@ class MotorsBus(abc.ABC): logger.debug(f"Failed to sync read '{data_name}' ({addr=} {n_bytes=}) on {motor_ids=} ({n_try=})") logger.debug(self.packet_handler.getRxPacketError(comm)) - values = {id_: self.sync_reader.getData(id_, addr, n_bytes) for id_ in motor_ids} + values = {} + for id_ in motor_ids: + val = self.sync_reader.getData(id_, addr, n_bytes) + values[id_] = self._decode_value(val, data_name, n_bytes) + return comm, values def _setup_sync_reader(self, motor_ids: list[str], addr: int, n_bytes: int) -> None: @@ -608,7 +716,7 @@ class MotorsBus(abc.ABC): data_name: str, values: Value | dict[NameOrID, Value], *, - raw_values: bool = False, + normalize: bool = True, num_retry: int = 0, ) -> None: if not self.is_connected: @@ -621,10 +729,10 @@ class MotorsBus(abc.ABC): elif isinstance(values, dict): ids_values = {self._get_motor_id(motor): val for motor, val in values.items()} else: - raise ValueError(f"'values' is expected to be a single value or a dict. Got {values}") + raise TypeError(f"'values' is expected to be a single value or a dict. Got {values}") - if not raw_values and data_name in self.calibration_required and self.calibration is not None: - ids_values = self._uncalibrate_values(ids_values) + if normalize and data_name in self.normalization_required and self.calibration is not None: + ids_values = self._unnormalize(ids_values) comm = self._sync_write(data_name, ids_values, num_retry=num_retry) if not self._is_comm_success(comm): @@ -640,6 +748,7 @@ class MotorsBus(abc.ABC): model = self._id_to_model(next(iter(ids_values))) addr, n_bytes = get_address(self.model_ctrl_table, model, data_name) + ids_values = {id_: self._encode_value(value, data_name, n_bytes) for id_, value in ids_values.items()} self._setup_sync_writer(ids_values, addr, n_bytes) for n_try in range(1 + num_retry): @@ -662,7 +771,7 @@ class MotorsBus(abc.ABC): self.sync_writer.addParam(id_, data) def write( - self, data_name: str, motor: NameOrID, value: Value, *, raw_value: bool = False, num_retry: int = 0 + self, data_name: str, motor: NameOrID, value: Value, *, normalize: bool = True, num_retry: int = 0 ) -> None: if not self.is_connected: raise DeviceNotConnectedError( @@ -671,8 +780,8 @@ class MotorsBus(abc.ABC): id_ = self._get_motor_id(motor) - if not raw_value and data_name in self.calibration_required and self.calibration is not None: - id_value = self._uncalibrate_values({id_: value}) + if normalize and data_name in self.normalization_required and self.calibration is not None: + id_value = self._unnormalize({id_: value}) value = id_value[id_] comm, error = self._write(data_name, id_, value, num_retry=num_retry) @@ -690,6 +799,7 @@ class MotorsBus(abc.ABC): def _write(self, data_name: str, motor_id: int, value: int, num_retry: int = 0) -> tuple[int, int]: model = self._id_to_model(motor_id) addr, n_bytes = get_address(self.model_ctrl_table, model, data_name) + value = self._encode_value(value, data_name, n_bytes) data = self._split_int_to_bytes(value, n_bytes) for n_try in range(1 + num_retry): diff --git a/lerobot/common/utils/utils.py b/lerobot/common/utils/utils.py index 563a7b81..9440d1d2 100644 --- a/lerobot/common/utils/utils.py +++ b/lerobot/common/utils/utils.py @@ -17,7 +17,9 @@ import logging import os import os.path as osp import platform +import select import subprocess +import sys from copy import copy from datetime import datetime, timezone from pathlib import Path @@ -228,3 +230,7 @@ def is_valid_numpy_dtype_string(dtype_str: str) -> bool: except TypeError: # If a TypeError is raised, the string is not a valid dtype return False + + +def _enter_pressed() -> bool: + return select.select([sys.stdin], [], [], 0)[0] and sys.stdin.readline().strip() == ""