diff --git a/lerobot/common/motors/dynamixel/dynamixel.py b/lerobot/common/motors/dynamixel/dynamixel.py index bae0f977..2abce818 100644 --- a/lerobot/common/motors/dynamixel/dynamixel.py +++ b/lerobot/common/motors/dynamixel/dynamixel.py @@ -24,7 +24,7 @@ from enum import Enum from lerobot.common.utils.encoding_utils import decode_twos_complement, encode_twos_complement -from ..motors_bus import Motor, MotorsBus, NameOrID, Value +from ..motors_bus import Motor, MotorCalibration, MotorsBus, NameOrID, Value from .tables import ( AVAILABLE_BAUDRATES, MODEL_BAUDRATE_TABLE, @@ -102,8 +102,9 @@ class DynamixelMotorsBus(MotorsBus): self, port: str, motors: dict[str, Motor], + calibration: dict[str, MotorCalibration] | None = None, ): - super().__init__(port, motors) + super().__init__(port, motors, calibration) import dynamixel_sdk as dxl self.port_handler = dxl.PortHandler(self.port) @@ -113,12 +114,26 @@ class DynamixelMotorsBus(MotorsBus): self._comm_success = dxl.COMM_SUCCESS self._no_error = 0x00 - def _configure_motors(self) -> None: + def configure_motors(self) -> None: # By default, Dynamixel motors have a 500µs delay response time (corresponding to a value of 250 on # the 'Return_Delay_Time' address). We ensure this is reduced to the minimum of 2µs (value of 0). for id_ in self.ids: self.write("Return_Delay_Time", id_, 0) + def _disable_torque(self, motors: list[NameOrID]) -> None: + for motor in motors: + self.write("Torque_Enable", motor, TorqueMode.DISABLED.value) + + def _enable_torque(self, motors: list[NameOrID]) -> None: + for motor in motors: + self.write("Torque_Enable", motor, TorqueMode.ENABLED.value) + + def _encode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int: + return encode_twos_complement(value, n_bytes) + + def _decode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int: + return decode_twos_complement(value, n_bytes) + def _get_half_turn_homings(self, positions: dict[NameOrID, Value]) -> dict[NameOrID, Value]: """ On Dynamixel Motors: @@ -132,12 +147,6 @@ class DynamixelMotorsBus(MotorsBus): return half_turn_homings - def _encode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int: - return encode_twos_complement(value, n_bytes) - - def _decode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int: - return decode_twos_complement(value, n_bytes) - @staticmethod def _split_int_to_bytes(value: int, n_bytes: int) -> list[int]: # Validate input diff --git a/lerobot/common/motors/feetech/feetech.py b/lerobot/common/motors/feetech/feetech.py index dc5de28d..4f49f870 100644 --- a/lerobot/common/motors/feetech/feetech.py +++ b/lerobot/common/motors/feetech/feetech.py @@ -19,7 +19,7 @@ from pprint import pformat from lerobot.common.utils.encoding_utils import decode_sign_magnitude, encode_sign_magnitude -from ..motors_bus import Motor, MotorsBus, NameOrID, Value +from ..motors_bus import Motor, MotorCalibration, MotorsBus, NameOrID, Value from .tables import ( AVAILABLE_BAUDRATES, ENCODINGS, @@ -82,8 +82,9 @@ class FeetechMotorsBus(MotorsBus): self, port: str, motors: dict[str, Motor], + calibration: dict[str, MotorCalibration] | None = None, ): - super().__init__(port, motors) + super().__init__(port, motors, calibration) import scservo_sdk as scs self.port_handler = scs.PortHandler(self.port) @@ -93,7 +94,7 @@ class FeetechMotorsBus(MotorsBus): self._comm_success = scs.COMM_SUCCESS self._no_error = 0x00 - def _configure_motors(self) -> None: + def configure_motors(self) -> None: # By default, Feetech motors have a 500µs delay response time (corresponding to a value of 250 on the # 'Return_Delay' address). We ensure this is reduced to the minimum of 2µs (value of 0). for id_ in self.ids: @@ -112,6 +113,16 @@ class FeetechMotorsBus(MotorsBus): return half_turn_homings + def _disable_torque(self, motors: list[NameOrID]) -> None: + for motor in motors: + self.write("Torque_Enable", motor, TorqueMode.DISABLED.value) + self.write("Lock", motor, 0) + + def _enable_torque(self, motors: list[NameOrID]) -> None: + for motor in motors: + self.write("Torque_Enable", motor, TorqueMode.ENABLED.value) + self.write("Lock", motor, 1) + def _encode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int: sign_bit = self.encodings.get(data_name) return encode_sign_magnitude(value, sign_bit) if sign_bit is not None else value diff --git a/lerobot/common/motors/motors_bus.py b/lerobot/common/motors/motors_bus.py index 29903be8..dc97d686 100644 --- a/lerobot/common/motors/motors_bus.py +++ b/lerobot/common/motors/motors_bus.py @@ -32,7 +32,7 @@ from deepdiff import DeepDiff from tqdm import tqdm from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError -from lerobot.common.utils.utils import _enter_pressed +from lerobot.common.utils.utils import enter_pressed, move_cursor_up NameOrID: TypeAlias = str | int Value: TypeAlias = int | float @@ -265,9 +265,11 @@ class MotorsBus(abc.ABC): self, port: str, motors: dict[str, Motor], + calibration: dict[str, MotorCalibration] | None = None, ): self.port = port self.motors = motors + self.calibration = calibration if calibration else {} self.port_handler: PortHandler self.packet_handler: PacketHandler @@ -276,8 +278,6 @@ class MotorsBus(abc.ABC): self._comm_success: int self._no_error: int - self.calibration: dict[str, MotorCalibration] = {} - self._id_to_model_dict = {m.id: m.model for m in self.motors.values()} self._id_to_name_dict = {m.id: name for name, m in self.motors.items()} self._model_nb_to_model_dict = {v: k for k, v in self.model_number_table.items()} @@ -396,7 +396,6 @@ class MotorsBus(abc.ABC): ) from e self.set_timeout() - self.calibration = self.get_calibration() logger.debug(f"{self.__class__.__name__} connected.") @classmethod @@ -420,7 +419,37 @@ class MotorsBus(abc.ABC): return baudrate_ids @abc.abstractmethod - def _configure_motors(self) -> None: + def configure_motors(self) -> None: + pass + + def disable_torque(self, motors: NameOrID | list[NameOrID] | None = None) -> None: + pass + if motors is None: + motors = self.names + elif isinstance(motors, (str, int)): + motors = [motors] + elif not isinstance(motors, list): + raise TypeError(motors) + + self._disable_torque(motors) + + def enable_torque(self, motors: NameOrID | list[NameOrID] | None = None) -> None: + pass + if motors is None: + motors = self.names + elif isinstance(motors, (str, int)): + motors = [motors] + elif not isinstance(motors, list): + raise TypeError(motors) + + self._enable_torque(motors) + + @abc.abstractmethod + def _enable_torque(self, motors: list[NameOrID]) -> None: + pass + + @abc.abstractmethod + def _disable_torque(self, motors: list[NameOrID]) -> None: pass def set_timeout(self, timeout_ms: int | None = None): @@ -439,39 +468,7 @@ class MotorsBus(abc.ABC): if self.port_handler.getBaudRate() != baudrate: raise OSError("Failed to write bus baud rate.") - def set_half_turn_homings(self, motors: NameOrID | list[NameOrID] | None = None) -> dict[NameOrID, Value]: - """This assumes motors present positions are roughly in the middle of their desired range""" - if motors is None: - motors = self.names - elif isinstance(motors, (str, int)): - motors = [motors] - else: - raise TypeError(motors) - - # Step 1: Set homing and min max to 0 - self.reset_homing_ranges(motors) - - # Step 2: Read Present_Position which will be Actual_Position since - # Present_Position = Actual_Position ± Homing_Offset (1) - # and Homing_Offset = 0 from step 1 - actual_positions = self.sync_read("Present_Position", motors, normalize=False) - - # Step 3: We want to set the Homing_Offset such that the current Present_Position to be half range of - # 1 revolution. - # For instance, if 1 revolution corresponds to 4095 (4096 steps), this means we want the current - # Present_Position to be 2047. In that example: - # Present_Position = 2047 (2) - # Actual_Position = X (read in step 2) - # from (1) and (2): - # => Homing_Offset = ±(X - 2048) - homing_offsets = self._get_half_turn_homings(actual_positions) - - for motor, offset in homing_offsets.items(): - self.write("Homing_Offset", motor, offset, normalize=False) - - return homing_offsets - - def reset_homing_ranges(self, motors: NameOrID | list[NameOrID] | None = None) -> None: + def reset_calibration(self, motors: NameOrID | list[NameOrID] | None = None) -> None: if motors is None: motors = self.names elif isinstance(motors, (str, int)): @@ -486,39 +483,11 @@ class MotorsBus(abc.ABC): self.write("Min_Position_Limit", motor, 0, normalize=False) self.write("Max_Position_Limit", motor, max_res, normalize=False) - def register_ranges_of_motion( - self, motors: NameOrID | list[NameOrID] | None = None - ) -> dict[NameOrID, dict[str, Value]]: - """ - This assumes that the homing offsets have been set such that all possible values in the range of - motion are positive and that the zero is not crossed. To that end, `set_half_turn_homings` should - typically be called prior to this. - """ - if motors is None: - motors = self.names - elif isinstance(motors, (str, int)): - motors = [motors] - elif not isinstance(motors, list): - raise TypeError(motors) + @property + def is_calibrated(self) -> bool: + return self.calibration == self.read_calibration() - start_positions = self.sync_read("Present_Position", motors, normalize=False) - mins = start_positions.copy() - maxes = start_positions.copy() - while True: - positions = self.sync_read("Present_Position", motors, normalize=False) - mins = {motor: min(positions[motor], min_) for motor, min_ in mins.items()} - maxes = {motor: max(positions[motor], max_) for motor, max_ in maxes.items()} - - if _enter_pressed(): - break - - for motor in motors: - self.write("Min_Position_Limit", motor, mins[motor], normalize=False) - self.write("Max_Position_Limit", motor, maxes[motor], normalize=False) - - return {motor: {"min": mins[motor], "max": maxes[motor]} for motor in motors} - - def get_calibration(self) -> dict[str, MotorCalibration]: + def read_calibration(self) -> dict[str, MotorCalibration]: offsets = self.sync_read("Homing_Offset", normalize=False) mins = self.sync_read("Min_Position_Limit", normalize=False) maxes = self.sync_read("Max_Position_Limit", normalize=False) @@ -540,6 +509,83 @@ class MotorsBus(abc.ABC): return calibration + def write_calibration(self, calibration_dict: dict[str, MotorCalibration]) -> None: + for motor, calibration in calibration_dict.items(): + self.write("Homing_Offset", motor, calibration.homing_offset) + self.write("Min_Position_Limit", motor, calibration.range_min) + self.write("Max_Position_Limit", motor, calibration.range_max) + + self.calibration = calibration_dict + + def set_half_turn_homings(self, motors: NameOrID | list[NameOrID] | None = None) -> dict[NameOrID, Value]: + """This assumes motors present positions are roughly in the middle of their desired range""" + if motors is None: + motors = self.names + elif isinstance(motors, (str, int)): + motors = [motors] + else: + raise TypeError(motors) + + # Step 1: Set homing and min max to 0 + self.reset_calibration(motors) + + # Step 2: Read Present_Position which will be Actual_Position since + # Present_Position = Actual_Position ± Homing_Offset (1) + # and Homing_Offset = 0 from step 1 + actual_positions = self.sync_read("Present_Position", motors, normalize=False) + + # Step 3: We want to set the Homing_Offset such that the current Present_Position to be half range of + # 1 revolution. + # For instance, if 1 revolution corresponds to 4095 (4096 steps), this means we want the current + # Present_Position to be 2047. In that example: + # Present_Position = 2047 (2) + # Actual_Position = X (read in step 2) + # from (1) and (2): + # => Homing_Offset = ±(X - 2048) + homing_offsets = self._get_half_turn_homings(actual_positions) + for motor, offset in homing_offsets.items(): + self.write("Homing_Offset", motor, offset) + + return homing_offsets + + def record_ranges_of_motion( + self, motors: NameOrID | list[NameOrID] | None = None, display_values: bool = True + ) -> tuple[dict[NameOrID, Value], dict[NameOrID, Value]]: + """ + This assumes that the homing offsets have been set such that all possible values in the range of + motion are positive and that the zero is not crossed. To that end, `set_half_turn_homings` should + typically be called prior to this. + """ + if motors is None: + motors = self.names + elif isinstance(motors, (str, int)): + motors = [motors] + elif not isinstance(motors, list): + raise TypeError(motors) + + start_positions = self.sync_read("Present_Position", motors, normalize=False) + mins = start_positions.copy() + maxes = start_positions.copy() + while True: + positions = self.sync_read("Present_Position", motors, normalize=False) + mins = {motor: min(positions[motor], min_) for motor, min_ in mins.items()} + maxes = {motor: max(positions[motor], max_) for motor, max_ in maxes.items()} + + if display_values: + print("\n-------------------------------------------") + print(f"{'NAME':<15} | {'MIN':>6} | {'POS':>6} | {'MAX':>6}") + for name in motors: + print(f"{name:<15} | {mins[name]:>6} | {positions[name]:>6} | {maxes[name]:>6}") + + if enter_pressed(): + break + + if display_values: + # Move cursor up to overwrite the previous output + move_cursor_up(len(motors) + 3) + + return mins, maxes + @abc.abstractmethod def _get_half_turn_homings(self, positions: dict[NameOrID, Value]) -> dict[NameOrID, Value]: pass diff --git a/lerobot/common/utils/utils.py b/lerobot/common/utils/utils.py index 9440d1d2..756ad9f0 100644 --- a/lerobot/common/utils/utils.py +++ b/lerobot/common/utils/utils.py @@ -232,5 +232,10 @@ def is_valid_numpy_dtype_string(dtype_str: str) -> bool: return False -def _enter_pressed() -> bool: +def enter_pressed() -> bool: return select.select([sys.stdin], [], [], 0)[0] and sys.stdin.readline().strip() == "" + + +def move_cursor_up(lines): + """Move the cursor up by a specified number of lines.""" + print(f"\033[{lines}A", end="")