forked from tangger/lerobot
Add calibration utilities
This commit is contained in:
@@ -22,19 +22,50 @@ import logging
|
|||||||
from copy import deepcopy
|
from copy import deepcopy
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
|
|
||||||
from ..motors_bus import Motor, MotorsBus
|
from ..motors_bus import Motor, MotorsBus, NameOrID, Value
|
||||||
from .tables import MODEL_BAUDRATE_TABLE, MODEL_CONTROL_TABLE, MODEL_NUMBER, MODEL_RESOLUTION
|
from .tables import (
|
||||||
|
AVAILABLE_BAUDRATES,
|
||||||
|
MODEL_BAUDRATE_TABLE,
|
||||||
|
MODEL_CONTROL_TABLE,
|
||||||
|
MODEL_NUMBER,
|
||||||
|
MODEL_RESOLUTION,
|
||||||
|
)
|
||||||
|
|
||||||
PROTOCOL_VERSION = 2.0
|
PROTOCOL_VERSION = 2.0
|
||||||
BAUDRATE = 1_000_000
|
BAUDRATE = 1_000_000
|
||||||
DEFAULT_TIMEOUT_MS = 1000
|
DEFAULT_TIMEOUT_MS = 1000
|
||||||
|
|
||||||
CALIBRATION_REQUIRED = ["Goal_Position", "Present_Position"]
|
NORMALIZATION_REQUIRED = ["Goal_Position", "Present_Position"]
|
||||||
CONVERT_UINT32_TO_INT32_REQUIRED = ["Goal_Position", "Present_Position"]
|
CONVERT_UINT32_TO_INT32_REQUIRED = ["Goal_Position", "Present_Position"]
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def encode_twos_complement(value: int, n_bytes: int):
|
||||||
|
if value >= 0:
|
||||||
|
return value
|
||||||
|
|
||||||
|
bit_width = n_bytes * 8
|
||||||
|
min_val = -(1 << (bit_width - 1))
|
||||||
|
max_val = (1 << (bit_width - 1)) - 1
|
||||||
|
|
||||||
|
if not (min_val <= value <= max_val):
|
||||||
|
raise ValueError(
|
||||||
|
f"Value {value} out of range for {n_bytes}-byte two's complement: [{min_val}, {max_val}]"
|
||||||
|
)
|
||||||
|
|
||||||
|
return (1 << bit_width) + value
|
||||||
|
|
||||||
|
|
||||||
|
def decode_twos_complement(value: int, n_bytes: int) -> int:
|
||||||
|
# https://en.wikipedia.org/wiki/Two%27s_complement
|
||||||
|
bits = n_bytes * 8
|
||||||
|
sign_bit = 1 << (bits - 1)
|
||||||
|
if value & sign_bit:
|
||||||
|
value -= 1 << bits
|
||||||
|
return value
|
||||||
|
|
||||||
|
|
||||||
class OperatingMode(Enum):
|
class OperatingMode(Enum):
|
||||||
# DYNAMIXEL only controls current(torque) regardless of speed and position. This mode is ideal for a
|
# DYNAMIXEL only controls current(torque) regardless of speed and position. This mode is ideal for a
|
||||||
# gripper or a system that only uses current(torque) control or a system that has additional
|
# gripper or a system that only uses current(torque) control or a system that has additional
|
||||||
@@ -82,12 +113,13 @@ class DynamixelMotorsBus(MotorsBus):
|
|||||||
https://emanual.robotis.com/docs/en/software/dynamixel/dynamixel_sdk/sample_code/python_read_write_protocol_2_0/#python-read-write-protocol-20
|
https://emanual.robotis.com/docs/en/software/dynamixel/dynamixel_sdk/sample_code/python_read_write_protocol_2_0/#python-read-write-protocol-20
|
||||||
"""
|
"""
|
||||||
|
|
||||||
model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE)
|
available_baudrates = deepcopy(AVAILABLE_BAUDRATES)
|
||||||
model_resolution_table = deepcopy(MODEL_RESOLUTION)
|
|
||||||
model_baudrate_table = deepcopy(MODEL_BAUDRATE_TABLE)
|
|
||||||
model_number_table = deepcopy(MODEL_NUMBER)
|
|
||||||
calibration_required = deepcopy(CALIBRATION_REQUIRED)
|
|
||||||
default_timeout = DEFAULT_TIMEOUT_MS
|
default_timeout = DEFAULT_TIMEOUT_MS
|
||||||
|
model_baudrate_table = deepcopy(MODEL_BAUDRATE_TABLE)
|
||||||
|
model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE)
|
||||||
|
model_number_table = deepcopy(MODEL_NUMBER)
|
||||||
|
model_resolution_table = deepcopy(MODEL_RESOLUTION)
|
||||||
|
normalization_required = deepcopy(NORMALIZATION_REQUIRED)
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
@@ -110,14 +142,33 @@ class DynamixelMotorsBus(MotorsBus):
|
|||||||
for id_ in self.ids:
|
for id_ in self.ids:
|
||||||
self.write("Return_Delay_Time", id_, 0)
|
self.write("Return_Delay_Time", id_, 0)
|
||||||
|
|
||||||
def _calibrate_values(self, ids_values: dict[int, int]) -> dict[int, float]:
|
def _get_half_turn_homings(self, positions: dict[NameOrID, Value]) -> dict[NameOrID, Value]:
|
||||||
|
"""
|
||||||
|
On Dynamixel Motors:
|
||||||
|
Present_Position = Actual_Position + Homing_Offset
|
||||||
|
"""
|
||||||
|
half_turn_homings = {}
|
||||||
|
for motor, pos in positions.items():
|
||||||
|
model = self._get_motor_model(motor)
|
||||||
|
max_res = self.model_resolution_table[model] - 1
|
||||||
|
half_turn_homings[motor] = int(max_res / 2) - pos
|
||||||
|
|
||||||
|
return half_turn_homings
|
||||||
|
|
||||||
|
def _normalize(self, ids_values: dict[int, int]) -> dict[int, float]:
|
||||||
# TODO
|
# TODO
|
||||||
return ids_values
|
return ids_values
|
||||||
|
|
||||||
def _uncalibrate_values(self, ids_values: dict[int, float]) -> dict[int, int]:
|
def _unnormalize(self, ids_values: dict[int, float]) -> dict[int, int]:
|
||||||
# TODO
|
# TODO
|
||||||
return ids_values
|
return ids_values
|
||||||
|
|
||||||
|
def _encode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int:
|
||||||
|
return encode_twos_complement(value, n_bytes)
|
||||||
|
|
||||||
|
def _decode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int:
|
||||||
|
return decode_twos_complement(value, n_bytes)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _split_int_to_bytes(value: int, n_bytes: int) -> list[int]:
|
def _split_int_to_bytes(value: int, n_bytes: int) -> list[int]:
|
||||||
# Validate input
|
# Validate input
|
||||||
@@ -154,11 +205,11 @@ class DynamixelMotorsBus(MotorsBus):
|
|||||||
if self._is_comm_success(comm):
|
if self._is_comm_success(comm):
|
||||||
break
|
break
|
||||||
logger.debug(f"Broadcast failed on port '{self.port}' ({n_try=})")
|
logger.debug(f"Broadcast failed on port '{self.port}' ({n_try=})")
|
||||||
logger.debug(self.packet_handler.getRxPacketError(comm))
|
logger.debug(self.packet_handler.getTxRxResult(comm))
|
||||||
|
|
||||||
if not self._is_comm_success(comm):
|
if not self._is_comm_success(comm):
|
||||||
if raise_on_error:
|
if raise_on_error:
|
||||||
raise ConnectionError(self.packet_handler.getRxPacketError(comm))
|
raise ConnectionError(self.packet_handler.getTxRxResult(comm))
|
||||||
|
|
||||||
return data_list if data_list else None
|
return data_list if data_list else None
|
||||||
|
|
||||||
|
|||||||
@@ -17,13 +17,15 @@ from copy import deepcopy
|
|||||||
from enum import Enum
|
from enum import Enum
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
|
|
||||||
from ..motors_bus import Motor, MotorsBus
|
from ..motors_bus import Motor, MotorsBus, NameOrID, Value
|
||||||
from .tables import (
|
from .tables import (
|
||||||
CALIBRATION_REQUIRED,
|
AVAILABLE_BAUDRATES,
|
||||||
|
ENCODINGS,
|
||||||
MODEL_BAUDRATE_TABLE,
|
MODEL_BAUDRATE_TABLE,
|
||||||
MODEL_CONTROL_TABLE,
|
MODEL_CONTROL_TABLE,
|
||||||
MODEL_NUMBER,
|
MODEL_NUMBER,
|
||||||
MODEL_RESOLUTION,
|
MODEL_RESOLUTION,
|
||||||
|
NORMALIZATION_REQUIRED,
|
||||||
)
|
)
|
||||||
|
|
||||||
PROTOCOL_VERSION = 0
|
PROTOCOL_VERSION = 0
|
||||||
@@ -33,6 +35,29 @@ DEFAULT_TIMEOUT_MS = 1000
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def encode_sign_magnitude(value: int, sign_bit_index: int):
|
||||||
|
"""
|
||||||
|
https://en.wikipedia.org/wiki/Signed_number_representations#Sign%E2%80%93magnitude
|
||||||
|
"""
|
||||||
|
max_magnitude = (1 << sign_bit_index) - 1
|
||||||
|
magnitude = abs(value)
|
||||||
|
if magnitude > max_magnitude:
|
||||||
|
raise ValueError(f"Magnitude {magnitude} exceeds {max_magnitude} (max for {sign_bit_index=})")
|
||||||
|
|
||||||
|
direction_bit = 1 if value < 0 else 0
|
||||||
|
return (direction_bit << sign_bit_index) | magnitude
|
||||||
|
|
||||||
|
|
||||||
|
def decode_sign_magnitude(encoded_value: int, sign_bit_index: int):
|
||||||
|
"""
|
||||||
|
https://en.wikipedia.org/wiki/Signed_number_representations#Sign%E2%80%93magnitude
|
||||||
|
"""
|
||||||
|
direction_bit = (encoded_value >> sign_bit_index) & 1
|
||||||
|
magnitude_mask = (1 << sign_bit_index) - 1
|
||||||
|
magnitude = encoded_value & magnitude_mask
|
||||||
|
return -magnitude if direction_bit else magnitude
|
||||||
|
|
||||||
|
|
||||||
class OperatingMode(Enum):
|
class OperatingMode(Enum):
|
||||||
# position servo mode
|
# position servo mode
|
||||||
POSITION = 0
|
POSITION = 0
|
||||||
@@ -63,12 +88,16 @@ class FeetechMotorsBus(MotorsBus):
|
|||||||
python feetech sdk to communicate with the motors, which is itself based on the dynamixel sdk.
|
python feetech sdk to communicate with the motors, which is itself based on the dynamixel sdk.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE)
|
available_baudrates = deepcopy(AVAILABLE_BAUDRATES)
|
||||||
model_resolution_table = deepcopy(MODEL_RESOLUTION)
|
|
||||||
model_baudrate_table = deepcopy(MODEL_BAUDRATE_TABLE)
|
|
||||||
model_number_table = deepcopy(MODEL_NUMBER)
|
|
||||||
calibration_required = deepcopy(CALIBRATION_REQUIRED)
|
|
||||||
default_timeout = DEFAULT_TIMEOUT_MS
|
default_timeout = DEFAULT_TIMEOUT_MS
|
||||||
|
model_baudrate_table = deepcopy(MODEL_BAUDRATE_TABLE)
|
||||||
|
model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE)
|
||||||
|
model_number_table = deepcopy(MODEL_NUMBER)
|
||||||
|
model_resolution_table = deepcopy(MODEL_RESOLUTION)
|
||||||
|
normalization_required = deepcopy(NORMALIZATION_REQUIRED)
|
||||||
|
|
||||||
|
# Feetech specific
|
||||||
|
encodings = deepcopy(ENCODINGS)
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
@@ -89,16 +118,37 @@ class FeetechMotorsBus(MotorsBus):
|
|||||||
# By default, Feetech motors have a 500µs delay response time (corresponding to a value of 250 on the
|
# By default, Feetech motors have a 500µs delay response time (corresponding to a value of 250 on the
|
||||||
# 'Return_Delay' address). We ensure this is reduced to the minimum of 2µs (value of 0).
|
# 'Return_Delay' address). We ensure this is reduced to the minimum of 2µs (value of 0).
|
||||||
for id_ in self.ids:
|
for id_ in self.ids:
|
||||||
self.write("Return_Delay", id_, 0)
|
self.write("Return_Delay_Time", id_, 0)
|
||||||
|
|
||||||
def _calibrate_values(self, ids_values: dict[int, int]) -> dict[int, float]:
|
def _get_half_turn_homings(self, positions: dict[NameOrID, Value]) -> dict[NameOrID, Value]:
|
||||||
|
"""
|
||||||
|
On Feetech Motors:
|
||||||
|
Present_Position = Actual_Position - Homing_Offset
|
||||||
|
"""
|
||||||
|
half_turn_homings = {}
|
||||||
|
for motor, pos in positions.items():
|
||||||
|
model = self._get_motor_model(motor)
|
||||||
|
max_res = self.model_resolution_table[model] - 1
|
||||||
|
half_turn_homings[motor] = pos - int(max_res / 2)
|
||||||
|
|
||||||
|
return half_turn_homings
|
||||||
|
|
||||||
|
def _normalize(self, data_name: str, ids_values: dict[int, int]) -> dict[int, float]:
|
||||||
# TODO
|
# TODO
|
||||||
return ids_values
|
return ids_values
|
||||||
|
|
||||||
def _uncalibrate_values(self, ids_values: dict[int, float]) -> dict[int, int]:
|
def _unnormalize(self, data_name: str, ids_values: dict[int, float]) -> dict[int, int]:
|
||||||
# TODO
|
# TODO
|
||||||
return ids_values
|
return ids_values
|
||||||
|
|
||||||
|
def _encode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int:
|
||||||
|
sign_bit = self.encodings.get(data_name)
|
||||||
|
return encode_sign_magnitude(value, sign_bit) if sign_bit is not None else value
|
||||||
|
|
||||||
|
def _decode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int:
|
||||||
|
sign_bit = self.encodings.get(data_name)
|
||||||
|
return decode_sign_magnitude(value, sign_bit) if sign_bit is not None else value
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _split_int_to_bytes(value: int, n_bytes: int) -> list[int]:
|
def _split_int_to_bytes(value: int, n_bytes: int) -> list[int]:
|
||||||
# Validate input
|
# Validate input
|
||||||
@@ -114,8 +164,6 @@ class FeetechMotorsBus(MotorsBus):
|
|||||||
|
|
||||||
import scservo_sdk as scs
|
import scservo_sdk as scs
|
||||||
|
|
||||||
# Note: No need to convert back into unsigned int, since this byte preprocessing
|
|
||||||
# already handles it for us.
|
|
||||||
if n_bytes == 1:
|
if n_bytes == 1:
|
||||||
data = [value]
|
data = [value]
|
||||||
elif n_bytes == 2:
|
elif n_bytes == 2:
|
||||||
|
|||||||
@@ -6,10 +6,10 @@ SCS_SERIES_CONTROL_TABLE = {
|
|||||||
"Model_Number": (3, 2),
|
"Model_Number": (3, 2),
|
||||||
"ID": (5, 1),
|
"ID": (5, 1),
|
||||||
"Baud_Rate": (6, 1),
|
"Baud_Rate": (6, 1),
|
||||||
"Return_Delay": (7, 1),
|
"Return_Delay_Time": (7, 1),
|
||||||
"Response_Status_Level": (8, 1),
|
"Response_Status_Level": (8, 1),
|
||||||
"Min_Angle_Limit": (9, 2),
|
"Min_Position_Limit": (9, 2),
|
||||||
"Max_Angle_Limit": (11, 2),
|
"Max_Position_Limit": (11, 2),
|
||||||
"Max_Temperature_Limit": (13, 1),
|
"Max_Temperature_Limit": (13, 1),
|
||||||
"Max_Voltage_Limit": (14, 1),
|
"Max_Voltage_Limit": (14, 1),
|
||||||
"Min_Voltage_Limit": (15, 1),
|
"Min_Voltage_Limit": (15, 1),
|
||||||
@@ -25,8 +25,8 @@ SCS_SERIES_CONTROL_TABLE = {
|
|||||||
"CCW_Dead_Zone": (27, 1),
|
"CCW_Dead_Zone": (27, 1),
|
||||||
"Protection_Current": (28, 2),
|
"Protection_Current": (28, 2),
|
||||||
"Angular_Resolution": (30, 1),
|
"Angular_Resolution": (30, 1),
|
||||||
"Offset": (31, 2),
|
"Homing_Offset": (31, 2),
|
||||||
"Mode": (33, 1),
|
"Operating_Mode": (33, 1),
|
||||||
"Protective_Torque": (34, 1),
|
"Protective_Torque": (34, 1),
|
||||||
"Protection_Time": (35, 1),
|
"Protection_Time": (35, 1),
|
||||||
"Overload_Torque": (36, 1),
|
"Overload_Torque": (36, 1),
|
||||||
@@ -83,6 +83,14 @@ MODEL_BAUDRATE_TABLE = {
|
|||||||
"sts3215": SCS_SERIES_BAUDRATE_TABLE,
|
"sts3215": SCS_SERIES_BAUDRATE_TABLE,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
NORMALIZATION_REQUIRED = ["Goal_Position", "Present_Position"]
|
||||||
|
|
||||||
|
# Sign-Magnitude encoding bits
|
||||||
|
ENCODINGS = {
|
||||||
|
"Homing_Offset": 11,
|
||||||
|
"Goal_Speed": 15,
|
||||||
|
}
|
||||||
|
|
||||||
AVAILABLE_BAUDRATES = [
|
AVAILABLE_BAUDRATES = [
|
||||||
4800,
|
4800,
|
||||||
9600,
|
9600,
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ from deepdiff import DeepDiff
|
|||||||
from tqdm import tqdm
|
from tqdm import tqdm
|
||||||
|
|
||||||
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||||
|
from lerobot.common.utils.utils import _enter_pressed
|
||||||
|
|
||||||
NameOrID: TypeAlias = str | int
|
NameOrID: TypeAlias = str | int
|
||||||
Value: TypeAlias = int | float
|
Value: TypeAlias = int | float
|
||||||
@@ -243,12 +244,13 @@ class MotorsBus(abc.ABC):
|
|||||||
```
|
```
|
||||||
"""
|
"""
|
||||||
|
|
||||||
model_ctrl_table: dict[str, dict]
|
available_baudrates: list[int]
|
||||||
model_resolution_table: dict[str, int]
|
|
||||||
model_baudrate_table: dict[str, dict]
|
|
||||||
model_number_table: dict[str, int]
|
|
||||||
calibration_required: list[str]
|
|
||||||
default_timeout: int
|
default_timeout: int
|
||||||
|
model_baudrate_table: dict[str, dict]
|
||||||
|
model_ctrl_table: dict[str, dict]
|
||||||
|
model_number_table: dict[str, int]
|
||||||
|
model_resolution_table: dict[str, int]
|
||||||
|
normalization_required: list[str]
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
@@ -257,7 +259,6 @@ class MotorsBus(abc.ABC):
|
|||||||
):
|
):
|
||||||
self.port = port
|
self.port = port
|
||||||
self.motors = motors
|
self.motors = motors
|
||||||
self._validate_motors()
|
|
||||||
|
|
||||||
self.port_handler: PortHandler
|
self.port_handler: PortHandler
|
||||||
self.packet_handler: PacketHandler
|
self.packet_handler: PacketHandler
|
||||||
@@ -322,6 +323,14 @@ class MotorsBus(abc.ABC):
|
|||||||
else:
|
else:
|
||||||
raise TypeError(f"'{motor}' should be int, str.")
|
raise TypeError(f"'{motor}' should be int, str.")
|
||||||
|
|
||||||
|
def _get_motor_model(self, motor: NameOrID) -> int:
|
||||||
|
if isinstance(motor, str):
|
||||||
|
return self.motors[motor].model
|
||||||
|
elif isinstance(motor, int):
|
||||||
|
return self._id_to_model_dict[motor]
|
||||||
|
else:
|
||||||
|
raise TypeError(f"'{motor}' should be int, str.")
|
||||||
|
|
||||||
def _validate_motors(self) -> None:
|
def _validate_motors(self) -> None:
|
||||||
if len(self.ids) != len(set(self.ids)):
|
if len(self.ids) != len(set(self.ids)):
|
||||||
raise ValueError(f"Some motors have the same id!\n{self}")
|
raise ValueError(f"Some motors have the same id!\n{self}")
|
||||||
@@ -337,9 +346,10 @@ class MotorsBus(abc.ABC):
|
|||||||
return error != self._no_error
|
return error != self._no_error
|
||||||
|
|
||||||
def _assert_motors_exist(self) -> None:
|
def _assert_motors_exist(self) -> None:
|
||||||
|
# TODO(aliberts): collect all wrong ids/models and display them at once
|
||||||
found_models = self.broadcast_ping()
|
found_models = self.broadcast_ping()
|
||||||
expected_models = {m.id: self.model_number_table[m.model] for m in self.motors.values()}
|
expected_models = {m.id: self.model_number_table[m.model] for m in self.motors.values()}
|
||||||
if not set(found_models) == set(self.ids):
|
if not found_models or set(found_models) != set(self.ids):
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f"{self.__class__.__name__} is supposed to have these motors: ({{id: model_nb}})"
|
f"{self.__class__.__name__} is supposed to have these motors: ({{id: model_nb}})"
|
||||||
f"\n{pformat(expected_models, indent=4, sort_dicts=False)}\n"
|
f"\n{pformat(expected_models, indent=4, sort_dicts=False)}\n"
|
||||||
@@ -371,11 +381,10 @@ class MotorsBus(abc.ABC):
|
|||||||
elif assert_motors_exist:
|
elif assert_motors_exist:
|
||||||
self._assert_motors_exist()
|
self._assert_motors_exist()
|
||||||
except (FileNotFoundError, OSError, serial.SerialException) as e:
|
except (FileNotFoundError, OSError, serial.SerialException) as e:
|
||||||
logger.error(
|
raise ConnectionError(
|
||||||
f"\nCould not connect on port '{self.port}'. Make sure you are using the correct port."
|
f"\nCould not connect on port '{self.port}'. Make sure you are using the correct port."
|
||||||
"\nTry running `python lerobot/scripts/find_motors_bus_port.py`\n"
|
"\nTry running `python lerobot/scripts/find_motors_bus_port.py`\n"
|
||||||
)
|
) from e
|
||||||
raise e
|
|
||||||
|
|
||||||
self.set_timeout()
|
self.set_timeout()
|
||||||
logger.debug(f"{self.__class__.__name__} connected.")
|
logger.debug(f"{self.__class__.__name__} connected.")
|
||||||
@@ -433,12 +442,107 @@ class MotorsBus(abc.ABC):
|
|||||||
logger.error(e)
|
logger.error(e)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
def set_half_turn_homings(self, motors: NameOrID | list[NameOrID] | None = None) -> dict[NameOrID, Value]:
|
||||||
|
"""This assumes motors present positions are roughly in the middle of their desired range"""
|
||||||
|
if motors is None:
|
||||||
|
motors = self.names
|
||||||
|
elif isinstance(motors, (str, int)):
|
||||||
|
motors = [motors]
|
||||||
|
else:
|
||||||
|
raise TypeError(motors)
|
||||||
|
|
||||||
|
# Step 1: Set homing and min max to 0
|
||||||
|
self.reset_homing_ranges(motors)
|
||||||
|
|
||||||
|
# Step 2: Read Present_Position which will be Actual_Position since
|
||||||
|
# Present_Position = Actual_Position ± Homing_Offset (1)
|
||||||
|
# and Homing_Offset = 0 from step 1
|
||||||
|
actual_positions = self.sync_read("Present_Position", motors, normalize=True)
|
||||||
|
|
||||||
|
# Step 3: We want to set the Homing_Offset such that the current Present_Position to be half range of
|
||||||
|
# 1 revolution.
|
||||||
|
# For instance, if 1 revolution corresponds to 4095 (4096 steps), this means we want the current
|
||||||
|
# Present_Position to be 2047. In that example:
|
||||||
|
# Present_Position = 2047 (2)
|
||||||
|
# Actual_Position = X (read in step 2)
|
||||||
|
# from (1) and (2):
|
||||||
|
# => Homing_Offset = ±(X - 2048)
|
||||||
|
homing_offsets = self._get_half_turn_homings(actual_positions)
|
||||||
|
|
||||||
|
for motor, offset in homing_offsets.items():
|
||||||
|
self.write("Homing_Offset", motor, offset, normalize=False)
|
||||||
|
|
||||||
|
return homing_offsets
|
||||||
|
|
||||||
|
def reset_homing_ranges(self, motors: NameOrID | list[NameOrID] | None = None) -> None:
|
||||||
|
if motors is None:
|
||||||
|
motors = self.names
|
||||||
|
elif isinstance(motors, (str, int)):
|
||||||
|
motors = [motors]
|
||||||
|
else:
|
||||||
|
raise TypeError(motors)
|
||||||
|
|
||||||
|
for motor in motors:
|
||||||
|
model = self._get_motor_model(motor)
|
||||||
|
max_res = self.model_resolution_table[model] - 1
|
||||||
|
self.write("Homing_Offset", motor, 0, normalize=False)
|
||||||
|
self.write("Min_Position_Limit", motor, 0, normalize=False)
|
||||||
|
self.write("Max_Position_Limit", motor, max_res, normalize=False)
|
||||||
|
|
||||||
|
def register_ranges_of_motion(
|
||||||
|
self, motors: NameOrID | list[NameOrID] | None = None
|
||||||
|
) -> dict[NameOrID, dict[str, Value]]:
|
||||||
|
"""
|
||||||
|
This assumes that the homing offsets have been set such that all possible values in the range of
|
||||||
|
motion are positive and that the zero is not crossed. To that end, `set_half_turn_homings` should
|
||||||
|
typically be called prior to this.
|
||||||
|
"""
|
||||||
|
if motors is None:
|
||||||
|
motors = self.names
|
||||||
|
elif isinstance(motors, (str, int)):
|
||||||
|
motors = [motors]
|
||||||
|
else:
|
||||||
|
raise TypeError(motors)
|
||||||
|
|
||||||
|
start_positions = self.sync_read("Present_Position", motors, normalize=False)
|
||||||
|
mins = start_positions.copy()
|
||||||
|
maxes = start_positions.copy()
|
||||||
|
while True:
|
||||||
|
positions = self.sync_read("Present_Position", motors, normalize=False)
|
||||||
|
mins = {motor: min(positions[motor], min_) for motor, min_ in mins.items()}
|
||||||
|
maxes = {motor: max(positions[motor], max_) for motor, max_ in maxes.items()}
|
||||||
|
|
||||||
|
if _enter_pressed():
|
||||||
|
break
|
||||||
|
|
||||||
|
for motor in motors:
|
||||||
|
self.write("Min_Position_Limit", motor, mins[motor], normalize=False)
|
||||||
|
self.write("Max_Position_Limit", motor, maxes[motor], normalize=False)
|
||||||
|
|
||||||
|
return {motor: {"min": mins[motor], "max": maxes[motor]} for motor in motors}
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def _calibrate_values(self, ids_values: dict[int, int]) -> dict[int, float]:
|
def _get_half_turn_homings(self, positions: dict[NameOrID, Value]) -> dict[NameOrID, Value]:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def _uncalibrate_values(self, ids_values: dict[int, float]) -> dict[int, int]:
|
def _normalize(self, data_name: str, ids_values: dict[int, int]) -> dict[int, float]:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def _unnormalize(self, data_name: str, ids_values: dict[int, float]) -> dict[int, int]:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def _encode_value(
|
||||||
|
self, value: int, data_name: str | None = None, n_bytes: int | None = None
|
||||||
|
) -> dict[int, int]:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def _decode_value(
|
||||||
|
self, value: int, data_name: str | None = None, n_bytes: int | None = None
|
||||||
|
) -> dict[int, int]:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@@ -508,7 +612,7 @@ class MotorsBus(abc.ABC):
|
|||||||
|
|
||||||
@overload
|
@overload
|
||||||
def sync_read(
|
def sync_read(
|
||||||
self, data_name: str, motors: None = ..., *, raw_values: bool = ..., num_retry: int = ...
|
self, data_name: str, motors: None = ..., *, normalize: bool = ..., num_retry: int = ...
|
||||||
) -> dict[str, Value]: ...
|
) -> dict[str, Value]: ...
|
||||||
@overload
|
@overload
|
||||||
def sync_read(
|
def sync_read(
|
||||||
@@ -516,7 +620,7 @@ class MotorsBus(abc.ABC):
|
|||||||
data_name: str,
|
data_name: str,
|
||||||
motors: NameOrID | list[NameOrID],
|
motors: NameOrID | list[NameOrID],
|
||||||
*,
|
*,
|
||||||
raw_values: bool = ...,
|
normalize: bool = ...,
|
||||||
num_retry: int = ...,
|
num_retry: int = ...,
|
||||||
) -> dict[NameOrID, Value]: ...
|
) -> dict[NameOrID, Value]: ...
|
||||||
def sync_read(
|
def sync_read(
|
||||||
@@ -524,7 +628,7 @@ class MotorsBus(abc.ABC):
|
|||||||
data_name: str,
|
data_name: str,
|
||||||
motors: NameOrID | list[NameOrID] | None = None,
|
motors: NameOrID | list[NameOrID] | None = None,
|
||||||
*,
|
*,
|
||||||
raw_values: bool = False,
|
normalize: bool = True,
|
||||||
num_retry: int = 0,
|
num_retry: int = 0,
|
||||||
) -> dict[NameOrID, Value]:
|
) -> dict[NameOrID, Value]:
|
||||||
if not self.is_connected:
|
if not self.is_connected:
|
||||||
@@ -551,8 +655,8 @@ class MotorsBus(abc.ABC):
|
|||||||
f"{self.packet_handler.getTxRxResult(comm)}"
|
f"{self.packet_handler.getTxRxResult(comm)}"
|
||||||
)
|
)
|
||||||
|
|
||||||
if not raw_values and data_name in self.calibration_required and self.calibration is not None:
|
if normalize and data_name in self.normalization_required and self.calibration is not None:
|
||||||
ids_values = self._calibrate_values(ids_values)
|
ids_values = self._normalize(ids_values)
|
||||||
|
|
||||||
return {id_key_map[id_]: val for id_, val in ids_values.items()}
|
return {id_key_map[id_]: val for id_, val in ids_values.items()}
|
||||||
|
|
||||||
@@ -579,7 +683,11 @@ class MotorsBus(abc.ABC):
|
|||||||
logger.debug(f"Failed to sync read '{data_name}' ({addr=} {n_bytes=}) on {motor_ids=} ({n_try=})")
|
logger.debug(f"Failed to sync read '{data_name}' ({addr=} {n_bytes=}) on {motor_ids=} ({n_try=})")
|
||||||
logger.debug(self.packet_handler.getRxPacketError(comm))
|
logger.debug(self.packet_handler.getRxPacketError(comm))
|
||||||
|
|
||||||
values = {id_: self.sync_reader.getData(id_, addr, n_bytes) for id_ in motor_ids}
|
values = {}
|
||||||
|
for id_ in motor_ids:
|
||||||
|
val = self.sync_reader.getData(id_, addr, n_bytes)
|
||||||
|
values[id_] = self._decode_value(val, data_name, n_bytes)
|
||||||
|
|
||||||
return comm, values
|
return comm, values
|
||||||
|
|
||||||
def _setup_sync_reader(self, motor_ids: list[str], addr: int, n_bytes: int) -> None:
|
def _setup_sync_reader(self, motor_ids: list[str], addr: int, n_bytes: int) -> None:
|
||||||
@@ -608,7 +716,7 @@ class MotorsBus(abc.ABC):
|
|||||||
data_name: str,
|
data_name: str,
|
||||||
values: Value | dict[NameOrID, Value],
|
values: Value | dict[NameOrID, Value],
|
||||||
*,
|
*,
|
||||||
raw_values: bool = False,
|
normalize: bool = True,
|
||||||
num_retry: int = 0,
|
num_retry: int = 0,
|
||||||
) -> None:
|
) -> None:
|
||||||
if not self.is_connected:
|
if not self.is_connected:
|
||||||
@@ -621,10 +729,10 @@ class MotorsBus(abc.ABC):
|
|||||||
elif isinstance(values, dict):
|
elif isinstance(values, dict):
|
||||||
ids_values = {self._get_motor_id(motor): val for motor, val in values.items()}
|
ids_values = {self._get_motor_id(motor): val for motor, val in values.items()}
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"'values' is expected to be a single value or a dict. Got {values}")
|
raise TypeError(f"'values' is expected to be a single value or a dict. Got {values}")
|
||||||
|
|
||||||
if not raw_values and data_name in self.calibration_required and self.calibration is not None:
|
if normalize and data_name in self.normalization_required and self.calibration is not None:
|
||||||
ids_values = self._uncalibrate_values(ids_values)
|
ids_values = self._unnormalize(ids_values)
|
||||||
|
|
||||||
comm = self._sync_write(data_name, ids_values, num_retry=num_retry)
|
comm = self._sync_write(data_name, ids_values, num_retry=num_retry)
|
||||||
if not self._is_comm_success(comm):
|
if not self._is_comm_success(comm):
|
||||||
@@ -640,6 +748,7 @@ class MotorsBus(abc.ABC):
|
|||||||
|
|
||||||
model = self._id_to_model(next(iter(ids_values)))
|
model = self._id_to_model(next(iter(ids_values)))
|
||||||
addr, n_bytes = get_address(self.model_ctrl_table, model, data_name)
|
addr, n_bytes = get_address(self.model_ctrl_table, model, data_name)
|
||||||
|
ids_values = {id_: self._encode_value(value, data_name, n_bytes) for id_, value in ids_values.items()}
|
||||||
self._setup_sync_writer(ids_values, addr, n_bytes)
|
self._setup_sync_writer(ids_values, addr, n_bytes)
|
||||||
|
|
||||||
for n_try in range(1 + num_retry):
|
for n_try in range(1 + num_retry):
|
||||||
@@ -662,7 +771,7 @@ class MotorsBus(abc.ABC):
|
|||||||
self.sync_writer.addParam(id_, data)
|
self.sync_writer.addParam(id_, data)
|
||||||
|
|
||||||
def write(
|
def write(
|
||||||
self, data_name: str, motor: NameOrID, value: Value, *, raw_value: bool = False, num_retry: int = 0
|
self, data_name: str, motor: NameOrID, value: Value, *, normalize: bool = True, num_retry: int = 0
|
||||||
) -> None:
|
) -> None:
|
||||||
if not self.is_connected:
|
if not self.is_connected:
|
||||||
raise DeviceNotConnectedError(
|
raise DeviceNotConnectedError(
|
||||||
@@ -671,8 +780,8 @@ class MotorsBus(abc.ABC):
|
|||||||
|
|
||||||
id_ = self._get_motor_id(motor)
|
id_ = self._get_motor_id(motor)
|
||||||
|
|
||||||
if not raw_value and data_name in self.calibration_required and self.calibration is not None:
|
if normalize and data_name in self.normalization_required and self.calibration is not None:
|
||||||
id_value = self._uncalibrate_values({id_: value})
|
id_value = self._unnormalize({id_: value})
|
||||||
value = id_value[id_]
|
value = id_value[id_]
|
||||||
|
|
||||||
comm, error = self._write(data_name, id_, value, num_retry=num_retry)
|
comm, error = self._write(data_name, id_, value, num_retry=num_retry)
|
||||||
@@ -690,6 +799,7 @@ class MotorsBus(abc.ABC):
|
|||||||
def _write(self, data_name: str, motor_id: int, value: int, num_retry: int = 0) -> tuple[int, int]:
|
def _write(self, data_name: str, motor_id: int, value: int, num_retry: int = 0) -> tuple[int, int]:
|
||||||
model = self._id_to_model(motor_id)
|
model = self._id_to_model(motor_id)
|
||||||
addr, n_bytes = get_address(self.model_ctrl_table, model, data_name)
|
addr, n_bytes = get_address(self.model_ctrl_table, model, data_name)
|
||||||
|
value = self._encode_value(value, data_name, n_bytes)
|
||||||
data = self._split_int_to_bytes(value, n_bytes)
|
data = self._split_int_to_bytes(value, n_bytes)
|
||||||
|
|
||||||
for n_try in range(1 + num_retry):
|
for n_try in range(1 + num_retry):
|
||||||
|
|||||||
@@ -17,7 +17,9 @@ import logging
|
|||||||
import os
|
import os
|
||||||
import os.path as osp
|
import os.path as osp
|
||||||
import platform
|
import platform
|
||||||
|
import select
|
||||||
import subprocess
|
import subprocess
|
||||||
|
import sys
|
||||||
from copy import copy
|
from copy import copy
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
@@ -228,3 +230,7 @@ def is_valid_numpy_dtype_string(dtype_str: str) -> bool:
|
|||||||
except TypeError:
|
except TypeError:
|
||||||
# If a TypeError is raised, the string is not a valid dtype
|
# If a TypeError is raised, the string is not a valid dtype
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def _enter_pressed() -> bool:
|
||||||
|
return select.select([sys.stdin], [], [], 0)[0] and sys.stdin.readline().strip() == ""
|
||||||
|
|||||||
Reference in New Issue
Block a user