diff --git a/lerobot/common/motors/__init__.py b/lerobot/common/motors/__init__.py index 852317ea..dfbfbaee 100644 --- a/lerobot/common/motors/__init__.py +++ b/lerobot/common/motors/__init__.py @@ -1 +1 @@ -from .motors_bus import Motor, MotorNormMode, MotorsBus +from .motors_bus import Motor, MotorCalibration, MotorNormMode, MotorsBus diff --git a/lerobot/common/motors/dynamixel/dynamixel.py b/lerobot/common/motors/dynamixel/dynamixel.py index e6d6808c..bae0f977 100644 --- a/lerobot/common/motors/dynamixel/dynamixel.py +++ b/lerobot/common/motors/dynamixel/dynamixel.py @@ -132,14 +132,6 @@ class DynamixelMotorsBus(MotorsBus): return half_turn_homings - def _normalize(self, ids_values: dict[int, int]) -> dict[int, float]: - # TODO - return ids_values - - def _unnormalize(self, ids_values: dict[int, float]) -> dict[int, int]: - # TODO - return ids_values - def _encode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int: return encode_twos_complement(value, n_bytes) diff --git a/lerobot/common/motors/feetech/feetech.py b/lerobot/common/motors/feetech/feetech.py index c0bd41fd..dc5de28d 100644 --- a/lerobot/common/motors/feetech/feetech.py +++ b/lerobot/common/motors/feetech/feetech.py @@ -112,14 +112,6 @@ class FeetechMotorsBus(MotorsBus): return half_turn_homings - def _normalize(self, data_name: str, ids_values: dict[int, int]) -> dict[int, float]: - # TODO - return ids_values - - def _unnormalize(self, data_name: str, ids_values: dict[int, float]) -> dict[int, int]: - # TODO - return ids_values - def _encode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int: sign_bit = self.encodings.get(data_name) return encode_sign_magnitude(value, sign_bit) if sign_bit is not None else value diff --git a/lerobot/common/motors/motors_bus.py b/lerobot/common/motors/motors_bus.py index 7389f88a..29903be8 100644 --- a/lerobot/common/motors/motors_bus.py +++ b/lerobot/common/motors/motors_bus.py @@ -85,6 +85,15 @@ class MotorNormMode(Enum): VELOCITY = 3 +@dataclass +class MotorCalibration: + id: int + drive_mode: int + homing_offset: int + range_min: int + range_max: int + + @dataclass class Motor: id: int @@ -267,7 +276,7 @@ class MotorsBus(abc.ABC): self._comm_success: int self._no_error: int - self.calibration = None + self.calibration: dict[str, MotorCalibration] = {} self._id_to_model_dict = {m.id: m.model for m in self.motors.values()} self._id_to_name_dict = {m.id: name for name, m in self.motors.items()} @@ -387,6 +396,7 @@ class MotorsBus(abc.ABC): ) from e self.set_timeout() + self.calibration = self.get_calibration() logger.debug(f"{self.__class__.__name__} connected.") @classmethod @@ -429,19 +439,6 @@ class MotorsBus(abc.ABC): if self.port_handler.getBaudRate() != baudrate: raise OSError("Failed to write bus baud rate.") - @property - def are_motors_configured(self) -> bool: - """ - Only check the motor indices and not baudrate, since if the motor baudrates are incorrect, a - ConnectionError will be raised anyway. - """ - try: - # TODO(aliberts): use ping instead - return (self.ids == self.sync_read("ID")).all() - except ConnectionError as e: - logger.error(e) - return False - def set_half_turn_homings(self, motors: NameOrID | list[NameOrID] | None = None) -> dict[NameOrID, Value]: """This assumes motors present positions are roughly in the middle of their desired range""" if motors is None: @@ -457,7 +454,7 @@ class MotorsBus(abc.ABC): # Step 2: Read Present_Position which will be Actual_Position since # Present_Position = Actual_Position ± Homing_Offset (1) # and Homing_Offset = 0 from step 1 - actual_positions = self.sync_read("Present_Position", motors, normalize=True) + actual_positions = self.sync_read("Present_Position", motors, normalize=False) # Step 3: We want to set the Homing_Offset such that the current Present_Position to be half range of # 1 revolution. @@ -479,7 +476,7 @@ class MotorsBus(abc.ABC): motors = self.names elif isinstance(motors, (str, int)): motors = [motors] - else: + elif not isinstance(motors, list): raise TypeError(motors) for motor in motors: @@ -501,7 +498,7 @@ class MotorsBus(abc.ABC): motors = self.names elif isinstance(motors, (str, int)): motors = [motors] - else: + elif not isinstance(motors, list): raise TypeError(motors) start_positions = self.sync_read("Present_Position", motors, normalize=False) @@ -521,17 +518,66 @@ class MotorsBus(abc.ABC): return {motor: {"min": mins[motor], "max": maxes[motor]} for motor in motors} + def get_calibration(self) -> dict[str, MotorCalibration]: + offsets = self.sync_read("Homing_Offset", normalize=False) + mins = self.sync_read("Min_Position_Limit", normalize=False) + maxes = self.sync_read("Max_Position_Limit", normalize=False) + + try: + drive_modes = self.sync_read("Drive_Mode", normalize=False) + except KeyError: + drive_modes = {name: 0 for name in self.names} + + calibration = {} + for name, motor in self.motors.items(): + calibration[name] = MotorCalibration( + id=motor.id, + drive_mode=drive_modes[name], + homing_offset=offsets[name], + range_min=mins[name], + range_max=maxes[name], + ) + + return calibration + @abc.abstractmethod def _get_half_turn_homings(self, positions: dict[NameOrID, Value]) -> dict[NameOrID, Value]: pass - @abc.abstractmethod def _normalize(self, data_name: str, ids_values: dict[int, int]) -> dict[int, float]: - pass + normalized_values = {} + for id_, val in ids_values.items(): + name = self._id_to_name(id_) + min_ = self.calibration[name].range_min + max_ = self.calibration[name].range_max + bounded_val = min(max_, max(min_, val)) + if self.motors[name].norm_mode is MotorNormMode.RANGE_M100_100: + normalized_values[id_] = (((bounded_val - min_) / (max_ - min_)) * 200) - 100 + elif self.motors[name].norm_mode is MotorNormMode.RANGE_0_100: + normalized_values[id_] = ((bounded_val - min_) / (max_ - min_)) * 100 + else: + # TODO(alibers): velocity and degree modes + raise NotImplementedError + + return normalized_values - @abc.abstractmethod def _unnormalize(self, data_name: str, ids_values: dict[int, float]) -> dict[int, int]: - pass + unnormalized_values = {} + for id_, val in ids_values.items(): + name = self._id_to_name(id_) + min_ = self.calibration[name].range_min + max_ = self.calibration[name].range_max + if self.motors[name].norm_mode is MotorNormMode.RANGE_M100_100: + bounded_val = min(100.0, max(-100.0, val)) + unnormalized_values[id_] = int(((bounded_val + 100) / 200) * (max_ - min_) + min_) + elif self.motors[name].norm_mode is MotorNormMode.RANGE_0_100: + bounded_val = min(100.0, max(0.0, val)) + unnormalized_values[id_] = int((bounded_val / 100) * (max_ - min_) + min_) + else: + # TODO(alibers): velocity and degree modes + raise NotImplementedError + + return unnormalized_values @abc.abstractmethod def _encode_value( @@ -655,8 +701,8 @@ class MotorsBus(abc.ABC): f"{self.packet_handler.getTxRxResult(comm)}" ) - if normalize and data_name in self.normalization_required and self.calibration is not None: - ids_values = self._normalize(ids_values) + if normalize and data_name in self.normalization_required: + ids_values = self._normalize(data_name, ids_values) return {id_key_map[id_]: val for id_, val in ids_values.items()} @@ -732,7 +778,7 @@ class MotorsBus(abc.ABC): raise TypeError(f"'values' is expected to be a single value or a dict. Got {values}") if normalize and data_name in self.normalization_required and self.calibration is not None: - ids_values = self._unnormalize(ids_values) + ids_values = self._unnormalize(data_name, ids_values) comm = self._sync_write(data_name, ids_values, num_retry=num_retry) if not self._is_comm_success(comm): @@ -781,7 +827,7 @@ class MotorsBus(abc.ABC): id_ = self._get_motor_id(motor) if normalize and data_name in self.normalization_required and self.calibration is not None: - id_value = self._unnormalize({id_: value}) + id_value = self._unnormalize(data_name, {id_: value}) value = id_value[id_] comm, error = self._write(data_name, id_, value, num_retry=num_retry)