Fix normalization drive_mode

This commit is contained in:
Simon Alibert
2025-05-22 11:32:52 +02:00
parent cac4289619
commit 386ad61007
4 changed files with 32 additions and 19 deletions

View File

@@ -108,6 +108,7 @@ class DynamixelMotorsBus(MotorsBus):
https://emanual.robotis.com/docs/en/software/dynamixel/dynamixel_sdk/sample_code/python_read_write_protocol_2_0/#python-read-write-protocol-20
"""
apply_drive_mode = False
available_baudrates = deepcopy(AVAILABLE_BAUDRATES)
default_baudrate = DEFAULT_BAUDRATE
default_timeout = DEFAULT_TIMEOUT_MS

View File

@@ -102,6 +102,7 @@ class FeetechMotorsBus(MotorsBus):
python feetech sdk to communicate with the motors, which is itself based on the dynamixel sdk.
"""
apply_drive_mode = True
available_baudrates = deepcopy(SCAN_BAUDRATES)
default_baudrate = DEFAULT_BAUDRATE
default_timeout = DEFAULT_TIMEOUT_MS

View File

@@ -252,6 +252,7 @@ class MotorsBus(abc.ABC):
```
"""
apply_drive_mode: bool
available_baudrates: list[int]
default_baudrate: int
default_timeout: int
@@ -755,7 +756,7 @@ class MotorsBus(abc.ABC):
return mins, maxes
def _normalize(self, data_name: str, ids_values: dict[int, int]) -> dict[int, float]:
def _normalize(self, ids_values: dict[int, int]) -> dict[int, float]:
if not self.calibration:
raise RuntimeError(f"{self} has no calibration registered.")
@@ -764,20 +765,24 @@ class MotorsBus(abc.ABC):
motor = self._id_to_name(id_)
min_ = self.calibration[motor].range_min
max_ = self.calibration[motor].range_max
drive_mode = self.calibration[motor].drive_mode and self.apply_drive_mode
if max_ == min_:
raise ValueError(f"Invalid calibration for motor '{motor}': min and max are equal.")
bounded_val = min(max_, max(min_, val))
# TODO(Steven): normalization can go boom if max_ == min_, we should add a check probably in record_ranges_of_motions
# (which probably indicates the user forgot to move a motor, most likely a gripper-like one)
if self.motors[motor].norm_mode is MotorNormMode.RANGE_M100_100:
normalized_values[id_] = (((bounded_val - min_) / (max_ - min_)) * 200) - 100
norm = (((bounded_val - min_) / (max_ - min_)) * 200) - 100
normalized_values[id_] = -norm if drive_mode else norm
elif self.motors[motor].norm_mode is MotorNormMode.RANGE_0_100:
normalized_values[id_] = ((bounded_val - min_) / (max_ - min_)) * 100
norm = ((bounded_val - min_) / (max_ - min_)) * 100
normalized_values[id_] = 100 - norm if drive_mode else norm
else:
# TODO(alibers): velocity and degree modes
# TODO(alibers): degree mode
raise NotImplementedError
return normalized_values
def _unnormalize(self, data_name: str, ids_values: dict[int, float]) -> dict[int, int]:
def _unnormalize(self, ids_values: dict[int, float]) -> dict[int, int]:
if not self.calibration:
raise RuntimeError(f"{self} has no calibration registered.")
@@ -786,14 +791,20 @@ class MotorsBus(abc.ABC):
motor = self._id_to_name(id_)
min_ = self.calibration[motor].range_min
max_ = self.calibration[motor].range_max
drive_mode = self.calibration[motor].drive_mode and self.apply_drive_mode
if max_ == min_:
raise ValueError(f"Invalid calibration for motor '{motor}': min and max are equal.")
if self.motors[motor].norm_mode is MotorNormMode.RANGE_M100_100:
val = -val if drive_mode else val
bounded_val = min(100.0, max(-100.0, val))
unnormalized_values[id_] = int(((bounded_val + 100) / 200) * (max_ - min_) + min_)
elif self.motors[motor].norm_mode is MotorNormMode.RANGE_0_100:
val = 100 - val if drive_mode else val
bounded_val = min(100.0, max(0.0, val))
unnormalized_values[id_] = int((bounded_val / 100) * (max_ - min_) + min_)
else:
# TODO(alibers): velocity and degree modes
# TODO(aliberts): degree mode
raise NotImplementedError
return unnormalized_values
@@ -914,7 +925,7 @@ class MotorsBus(abc.ABC):
id_value = self._decode_sign(data_name, {id_: value})
if normalize and data_name in self.normalized_data:
id_value = self._normalize(data_name, id_value)
id_value = self._normalize(id_value)
return id_value[id_]
@@ -981,7 +992,7 @@ class MotorsBus(abc.ABC):
addr, length = get_address(self.model_ctrl_table, model, data_name)
if normalize and data_name in self.normalized_data:
value = self._unnormalize(data_name, {id_: value})[id_]
value = self._unnormalize({id_: value})[id_]
value = self._encode_sign(data_name, {id_: value})[id_]
@@ -1060,7 +1071,7 @@ class MotorsBus(abc.ABC):
ids_values = self._decode_sign(data_name, ids_values)
if normalize and data_name in self.normalized_data:
ids_values = self._normalize(data_name, ids_values)
ids_values = self._normalize(ids_values)
return {self._id_to_name(id_): value for id_, value in ids_values.items()}
@@ -1146,7 +1157,7 @@ class MotorsBus(abc.ABC):
addr, length = get_address(self.model_ctrl_table, model, data_name)
if normalize and data_name in self.normalized_data:
ids_values = self._unnormalize(data_name, ids_values)
ids_values = self._unnormalize(ids_values)
ids_values = self._encode_sign(data_name, ids_values)

View File

@@ -125,7 +125,7 @@ def test_read(data_name, id_, value, dummy_motors):
)
mock__decode_sign.assert_called_once_with(data_name, {id_: value})
if data_name in bus.normalized_data:
mock__normalize.assert_called_once_with(data_name, {id_: value})
mock__normalize.assert_called_once_with({id_: value})
@pytest.mark.parametrize(
@@ -159,7 +159,7 @@ def test_write(data_name, id_, value, dummy_motors):
)
mock__encode_sign.assert_called_once_with(data_name, {id_: value})
if data_name in bus.normalized_data:
mock__unnormalize.assert_called_once_with(data_name, {id_: value})
mock__unnormalize.assert_called_once_with({id_: value})
@pytest.mark.parametrize(
@@ -196,7 +196,7 @@ def test_sync_read_by_str(data_name, id_, value, dummy_motors):
)
mock__decode_sign.assert_called_once_with(data_name, {id_: value})
if data_name in bus.normalized_data:
mock__normalize.assert_called_once_with(data_name, {id_: value})
mock__normalize.assert_called_once_with({id_: value})
@pytest.mark.parametrize(
@@ -233,7 +233,7 @@ def test_sync_read_by_list(data_name, ids_values, dummy_motors):
)
mock__decode_sign.assert_called_once_with(data_name, ids_values)
if data_name in bus.normalized_data:
mock__normalize.assert_called_once_with(data_name, ids_values)
mock__normalize.assert_called_once_with(ids_values)
@pytest.mark.parametrize(
@@ -270,7 +270,7 @@ def test_sync_read_by_none(data_name, ids_values, dummy_motors):
)
mock__decode_sign.assert_called_once_with(data_name, ids_values)
if data_name in bus.normalized_data:
mock__normalize.assert_called_once_with(data_name, ids_values)
mock__normalize.assert_called_once_with(ids_values)
@pytest.mark.parametrize(
@@ -304,7 +304,7 @@ def test_sync_write_by_single_value(data_name, value, dummy_motors):
)
mock__encode_sign.assert_called_once_with(data_name, ids_values)
if data_name in bus.normalized_data:
mock__unnormalize.assert_called_once_with(data_name, ids_values)
mock__unnormalize.assert_called_once_with(ids_values)
@pytest.mark.parametrize(
@@ -339,4 +339,4 @@ def test_sync_write_by_value_dict(data_name, ids_values, dummy_motors):
)
mock__encode_sign.assert_called_once_with(data_name, ids_values)
if data_name in bus.normalized_data:
mock__unnormalize.assert_called_once_with(data_name, ids_values)
mock__unnormalize.assert_called_once_with(ids_values)