forked from tangger/lerobot
Add more calibration utilities
This commit is contained in:
@@ -1 +1 @@
|
||||
from .motors_bus import Motor, MotorNormMode, MotorsBus
|
||||
from .motors_bus import Motor, MotorCalibration, MotorNormMode, MotorsBus
|
||||
|
||||
@@ -132,14 +132,6 @@ class DynamixelMotorsBus(MotorsBus):
|
||||
|
||||
return half_turn_homings
|
||||
|
||||
def _normalize(self, ids_values: dict[int, int]) -> dict[int, float]:
|
||||
# TODO
|
||||
return ids_values
|
||||
|
||||
def _unnormalize(self, ids_values: dict[int, float]) -> dict[int, int]:
|
||||
# TODO
|
||||
return ids_values
|
||||
|
||||
def _encode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int:
|
||||
return encode_twos_complement(value, n_bytes)
|
||||
|
||||
|
||||
@@ -112,14 +112,6 @@ class FeetechMotorsBus(MotorsBus):
|
||||
|
||||
return half_turn_homings
|
||||
|
||||
def _normalize(self, data_name: str, ids_values: dict[int, int]) -> dict[int, float]:
|
||||
# TODO
|
||||
return ids_values
|
||||
|
||||
def _unnormalize(self, data_name: str, ids_values: dict[int, float]) -> dict[int, int]:
|
||||
# TODO
|
||||
return ids_values
|
||||
|
||||
def _encode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int:
|
||||
sign_bit = self.encodings.get(data_name)
|
||||
return encode_sign_magnitude(value, sign_bit) if sign_bit is not None else value
|
||||
|
||||
@@ -85,6 +85,15 @@ class MotorNormMode(Enum):
|
||||
VELOCITY = 3
|
||||
|
||||
|
||||
@dataclass
|
||||
class MotorCalibration:
|
||||
id: int
|
||||
drive_mode: int
|
||||
homing_offset: int
|
||||
range_min: int
|
||||
range_max: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class Motor:
|
||||
id: int
|
||||
@@ -267,7 +276,7 @@ class MotorsBus(abc.ABC):
|
||||
self._comm_success: int
|
||||
self._no_error: int
|
||||
|
||||
self.calibration = None
|
||||
self.calibration: dict[str, MotorCalibration] = {}
|
||||
|
||||
self._id_to_model_dict = {m.id: m.model for m in self.motors.values()}
|
||||
self._id_to_name_dict = {m.id: name for name, m in self.motors.items()}
|
||||
@@ -387,6 +396,7 @@ class MotorsBus(abc.ABC):
|
||||
) from e
|
||||
|
||||
self.set_timeout()
|
||||
self.calibration = self.get_calibration()
|
||||
logger.debug(f"{self.__class__.__name__} connected.")
|
||||
|
||||
@classmethod
|
||||
@@ -429,19 +439,6 @@ class MotorsBus(abc.ABC):
|
||||
if self.port_handler.getBaudRate() != baudrate:
|
||||
raise OSError("Failed to write bus baud rate.")
|
||||
|
||||
@property
|
||||
def are_motors_configured(self) -> bool:
|
||||
"""
|
||||
Only check the motor indices and not baudrate, since if the motor baudrates are incorrect, a
|
||||
ConnectionError will be raised anyway.
|
||||
"""
|
||||
try:
|
||||
# TODO(aliberts): use ping instead
|
||||
return (self.ids == self.sync_read("ID")).all()
|
||||
except ConnectionError as e:
|
||||
logger.error(e)
|
||||
return False
|
||||
|
||||
def set_half_turn_homings(self, motors: NameOrID | list[NameOrID] | None = None) -> dict[NameOrID, Value]:
|
||||
"""This assumes motors present positions are roughly in the middle of their desired range"""
|
||||
if motors is None:
|
||||
@@ -457,7 +454,7 @@ class MotorsBus(abc.ABC):
|
||||
# Step 2: Read Present_Position which will be Actual_Position since
|
||||
# Present_Position = Actual_Position ± Homing_Offset (1)
|
||||
# and Homing_Offset = 0 from step 1
|
||||
actual_positions = self.sync_read("Present_Position", motors, normalize=True)
|
||||
actual_positions = self.sync_read("Present_Position", motors, normalize=False)
|
||||
|
||||
# Step 3: We want to set the Homing_Offset such that the current Present_Position to be half range of
|
||||
# 1 revolution.
|
||||
@@ -479,7 +476,7 @@ class MotorsBus(abc.ABC):
|
||||
motors = self.names
|
||||
elif isinstance(motors, (str, int)):
|
||||
motors = [motors]
|
||||
else:
|
||||
elif not isinstance(motors, list):
|
||||
raise TypeError(motors)
|
||||
|
||||
for motor in motors:
|
||||
@@ -501,7 +498,7 @@ class MotorsBus(abc.ABC):
|
||||
motors = self.names
|
||||
elif isinstance(motors, (str, int)):
|
||||
motors = [motors]
|
||||
else:
|
||||
elif not isinstance(motors, list):
|
||||
raise TypeError(motors)
|
||||
|
||||
start_positions = self.sync_read("Present_Position", motors, normalize=False)
|
||||
@@ -521,17 +518,66 @@ class MotorsBus(abc.ABC):
|
||||
|
||||
return {motor: {"min": mins[motor], "max": maxes[motor]} for motor in motors}
|
||||
|
||||
def get_calibration(self) -> dict[str, MotorCalibration]:
|
||||
offsets = self.sync_read("Homing_Offset", normalize=False)
|
||||
mins = self.sync_read("Min_Position_Limit", normalize=False)
|
||||
maxes = self.sync_read("Max_Position_Limit", normalize=False)
|
||||
|
||||
try:
|
||||
drive_modes = self.sync_read("Drive_Mode", normalize=False)
|
||||
except KeyError:
|
||||
drive_modes = {name: 0 for name in self.names}
|
||||
|
||||
calibration = {}
|
||||
for name, motor in self.motors.items():
|
||||
calibration[name] = MotorCalibration(
|
||||
id=motor.id,
|
||||
drive_mode=drive_modes[name],
|
||||
homing_offset=offsets[name],
|
||||
range_min=mins[name],
|
||||
range_max=maxes[name],
|
||||
)
|
||||
|
||||
return calibration
|
||||
|
||||
@abc.abstractmethod
|
||||
def _get_half_turn_homings(self, positions: dict[NameOrID, Value]) -> dict[NameOrID, Value]:
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def _normalize(self, data_name: str, ids_values: dict[int, int]) -> dict[int, float]:
|
||||
pass
|
||||
normalized_values = {}
|
||||
for id_, val in ids_values.items():
|
||||
name = self._id_to_name(id_)
|
||||
min_ = self.calibration[name].range_min
|
||||
max_ = self.calibration[name].range_max
|
||||
bounded_val = min(max_, max(min_, val))
|
||||
if self.motors[name].norm_mode is MotorNormMode.RANGE_M100_100:
|
||||
normalized_values[id_] = (((bounded_val - min_) / (max_ - min_)) * 200) - 100
|
||||
elif self.motors[name].norm_mode is MotorNormMode.RANGE_0_100:
|
||||
normalized_values[id_] = ((bounded_val - min_) / (max_ - min_)) * 100
|
||||
else:
|
||||
# TODO(alibers): velocity and degree modes
|
||||
raise NotImplementedError
|
||||
|
||||
return normalized_values
|
||||
|
||||
@abc.abstractmethod
|
||||
def _unnormalize(self, data_name: str, ids_values: dict[int, float]) -> dict[int, int]:
|
||||
pass
|
||||
unnormalized_values = {}
|
||||
for id_, val in ids_values.items():
|
||||
name = self._id_to_name(id_)
|
||||
min_ = self.calibration[name].range_min
|
||||
max_ = self.calibration[name].range_max
|
||||
if self.motors[name].norm_mode is MotorNormMode.RANGE_M100_100:
|
||||
bounded_val = min(100.0, max(-100.0, val))
|
||||
unnormalized_values[id_] = int(((bounded_val + 100) / 200) * (max_ - min_) + min_)
|
||||
elif self.motors[name].norm_mode is MotorNormMode.RANGE_0_100:
|
||||
bounded_val = min(100.0, max(0.0, val))
|
||||
unnormalized_values[id_] = int((bounded_val / 100) * (max_ - min_) + min_)
|
||||
else:
|
||||
# TODO(alibers): velocity and degree modes
|
||||
raise NotImplementedError
|
||||
|
||||
return unnormalized_values
|
||||
|
||||
@abc.abstractmethod
|
||||
def _encode_value(
|
||||
@@ -655,8 +701,8 @@ class MotorsBus(abc.ABC):
|
||||
f"{self.packet_handler.getTxRxResult(comm)}"
|
||||
)
|
||||
|
||||
if normalize and data_name in self.normalization_required and self.calibration is not None:
|
||||
ids_values = self._normalize(ids_values)
|
||||
if normalize and data_name in self.normalization_required:
|
||||
ids_values = self._normalize(data_name, ids_values)
|
||||
|
||||
return {id_key_map[id_]: val for id_, val in ids_values.items()}
|
||||
|
||||
@@ -732,7 +778,7 @@ class MotorsBus(abc.ABC):
|
||||
raise TypeError(f"'values' is expected to be a single value or a dict. Got {values}")
|
||||
|
||||
if normalize and data_name in self.normalization_required and self.calibration is not None:
|
||||
ids_values = self._unnormalize(ids_values)
|
||||
ids_values = self._unnormalize(data_name, ids_values)
|
||||
|
||||
comm = self._sync_write(data_name, ids_values, num_retry=num_retry)
|
||||
if not self._is_comm_success(comm):
|
||||
@@ -781,7 +827,7 @@ class MotorsBus(abc.ABC):
|
||||
id_ = self._get_motor_id(motor)
|
||||
|
||||
if normalize and data_name in self.normalization_required and self.calibration is not None:
|
||||
id_value = self._unnormalize({id_: value})
|
||||
id_value = self._unnormalize(data_name, {id_: value})
|
||||
value = id_value[id_]
|
||||
|
||||
comm, error = self._write(data_name, id_, value, num_retry=num_retry)
|
||||
|
||||
Reference in New Issue
Block a user