Add MotorsBus docstrings
This commit is contained in:
@@ -213,21 +213,19 @@ class GroupSyncWrite(Protocol):
|
|||||||
|
|
||||||
|
|
||||||
class MotorsBus(abc.ABC):
|
class MotorsBus(abc.ABC):
|
||||||
"""The main LeRobot class for implementing motors buses.
|
"""
|
||||||
|
A MotorsBus allows to efficiently read and write to the attached motors.
|
||||||
|
It represents several motors daisy-chained together and connected through a serial port.
|
||||||
There are currently two implementations of this abstract class:
|
There are currently two implementations of this abstract class:
|
||||||
- DynamixelMotorsBus
|
- DynamixelMotorsBus
|
||||||
- FeetechMotorsBus
|
- FeetechMotorsBus
|
||||||
|
|
||||||
Note: This class may evolve in the future should we add support for other manufacturers SDKs.
|
Note: This class may evolve in the future should we add support for other types of bus.
|
||||||
|
|
||||||
A MotorsBus allows to efficiently read and write to the attached motors.
|
|
||||||
It represents several motors daisy-chained together and connected through a serial port.
|
|
||||||
|
|
||||||
A MotorsBus subclass instance requires a port (e.g. `FeetechMotorsBus(port="/dev/tty.usbmodem575E0031751"`)).
|
A MotorsBus subclass instance requires a port (e.g. `FeetechMotorsBus(port="/dev/tty.usbmodem575E0031751"`)).
|
||||||
To find the port, you can run our utility script:
|
To find the port, you can run our utility script:
|
||||||
```bash
|
```bash
|
||||||
python lerobot/scripts/find_motors_bus_port.py
|
python -m lerobot.find_port.py
|
||||||
>>> Finding all available ports for the MotorsBus.
|
>>> Finding all available ports for the MotorsBus.
|
||||||
>>> ['/dev/tty.usbmodem575E0032081', '/dev/tty.usbmodem575E0031751']
|
>>> ['/dev/tty.usbmodem575E0032081', '/dev/tty.usbmodem575E0031751']
|
||||||
>>> Remove the usb cable from your MotorsBus and press Enter when done.
|
>>> Remove the usb cable from your MotorsBus and press Enter when done.
|
||||||
@@ -237,20 +235,20 @@ class MotorsBus(abc.ABC):
|
|||||||
|
|
||||||
Example of usage for 1 Feetech sts3215 motor connected to the bus:
|
Example of usage for 1 Feetech sts3215 motor connected to the bus:
|
||||||
```python
|
```python
|
||||||
motors_bus = FeetechMotorsBus(
|
bus = FeetechMotorsBus(
|
||||||
port="/dev/tty.usbmodem575E0031751",
|
port="/dev/tty.usbmodem575E0031751",
|
||||||
motors={"gripper": (6, "sts3215")},
|
motors={"my_motor": (1, "sts3215")},
|
||||||
)
|
)
|
||||||
motors_bus.connect()
|
bus.connect()
|
||||||
|
|
||||||
position = motors_bus.read("Present_Position")
|
position = bus.read("Present_Position", normalize=False)
|
||||||
|
|
||||||
# Move from a few motor steps as an example
|
# Move from a few motor steps as an example
|
||||||
few_steps = 30
|
few_steps = 30
|
||||||
motors_bus.write("Goal_Position", position + few_steps)
|
bus.write("Goal_Position", position + few_steps, normalize=False)
|
||||||
|
|
||||||
# When done, properly disconnect the port using
|
# When done, properly disconnect the port using
|
||||||
motors_bus.disconnect()
|
bus.disconnect()
|
||||||
```
|
```
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@@ -403,9 +401,20 @@ class MotorsBus(abc.ABC):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def is_connected(self) -> bool:
|
def is_connected(self) -> bool:
|
||||||
|
"""bool: `True` if the underlying serial port is open."""
|
||||||
return self.port_handler.is_open
|
return self.port_handler.is_open
|
||||||
|
|
||||||
def connect(self, handshake: bool = True) -> None:
|
def connect(self, handshake: bool = True) -> None:
|
||||||
|
"""Open the serial port and initialise communication.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
handshake (bool, optional): Pings every expected motor and performs additional
|
||||||
|
integrity checks specific to the implementation. Defaults to `True`.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
DeviceAlreadyConnectedError: The port is already open.
|
||||||
|
ConnectionError: The underlying SDK failed to open the port or the handshake did not succeed.
|
||||||
|
"""
|
||||||
if self.is_connected:
|
if self.is_connected:
|
||||||
raise DeviceAlreadyConnectedError(
|
raise DeviceAlreadyConnectedError(
|
||||||
f"{self.__class__.__name__}('{self.port}') is already connected. Do not call `{self.__class__.__name__}.connect()` twice."
|
f"{self.__class__.__name__}('{self.port}') is already connected. Do not call `{self.__class__.__name__}.connect()` twice."
|
||||||
@@ -431,8 +440,39 @@ class MotorsBus(abc.ABC):
|
|||||||
def _handshake(self) -> None:
|
def _handshake(self) -> None:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def disconnect(self, disable_torque: bool = True) -> None:
|
||||||
|
"""Close the serial port (optionally disabling torque first).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
disable_torque (bool, optional): If `True` (default) torque is disabled on every motor before
|
||||||
|
closing the port. This can prevent damaging motors if they are left applying resisting torque
|
||||||
|
after disconnect.
|
||||||
|
"""
|
||||||
|
if not self.is_connected:
|
||||||
|
raise DeviceNotConnectedError(
|
||||||
|
f"{self.__class__.__name__}('{self.port}') is not connected. Try running `{self.__class__.__name__}.connect()` first."
|
||||||
|
)
|
||||||
|
|
||||||
|
if disable_torque:
|
||||||
|
self.port_handler.clearPort()
|
||||||
|
self.port_handler.is_using = False
|
||||||
|
self.disable_torque(num_retry=5)
|
||||||
|
|
||||||
|
self.port_handler.closePort()
|
||||||
|
logger.debug(f"{self.__class__.__name__} disconnected.")
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def scan_port(cls, port: str, *args, **kwargs) -> dict[int, list[int]]:
|
def scan_port(cls, port: str, *args, **kwargs) -> dict[int, list[int]]:
|
||||||
|
"""Probe *port* at every supported baud-rate and list responding IDs.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
port (str): Serial/USB port to scan (e.g. ``"/dev/ttyUSB0"``).
|
||||||
|
*args, **kwargs: Forwarded to the subclass constructor.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict[int, list[int]]: Mapping *baud-rate → list of motor IDs*
|
||||||
|
for every baud-rate that produced at least one response.
|
||||||
|
"""
|
||||||
bus = cls(port, {}, *args, **kwargs)
|
bus = cls(port, {}, *args, **kwargs)
|
||||||
bus._connect(handshake=False)
|
bus._connect(handshake=False)
|
||||||
baudrate_ids = {}
|
baudrate_ids = {}
|
||||||
@@ -448,6 +488,22 @@ class MotorsBus(abc.ABC):
|
|||||||
def setup_motor(
|
def setup_motor(
|
||||||
self, motor: str, initial_baudrate: int | None = None, initial_id: int | None = None
|
self, motor: str, initial_baudrate: int | None = None, initial_id: int | None = None
|
||||||
) -> None:
|
) -> None:
|
||||||
|
"""Assign the correct ID and baud-rate to a single motor.
|
||||||
|
|
||||||
|
This helper temporarily switches to the motor's current settings, disables torque, sets the desired
|
||||||
|
ID, and finally programs the bus' default baud-rate.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
motor (str): Key of the motor in :pyattr:`motors`.
|
||||||
|
initial_baudrate (int | None, optional): Current baud-rate (skips scanning when provided).
|
||||||
|
Defaults to None.
|
||||||
|
initial_id (int | None, optional): Current ID (skips scanning when provided). Defaults to None.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
RuntimeError: The motor could not be found or its model number
|
||||||
|
does not match the expected one.
|
||||||
|
ConnectionError: Communication with the motor failed.
|
||||||
|
"""
|
||||||
if not self.is_connected:
|
if not self.is_connected:
|
||||||
self._connect(handshake=False)
|
self._connect(handshake=False)
|
||||||
|
|
||||||
@@ -479,10 +535,25 @@ class MotorsBus(abc.ABC):
|
|||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def configure_motors(self) -> None:
|
def configure_motors(self) -> None:
|
||||||
|
"""Write implementation-specific recommended settings to every motor.
|
||||||
|
|
||||||
|
Typical changes include shortening the return delay, increasing
|
||||||
|
acceleration limits or disabling safety locks.
|
||||||
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def disable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0) -> None:
|
def disable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0) -> None:
|
||||||
|
"""Disable torque on selected motors.
|
||||||
|
|
||||||
|
Disabling Torque allows to write to the motors' permanent memory area (EPROM/EEPROM).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
motors (int | str | list[str] | None, optional): Target motors. Accepts a motor name, an ID, a
|
||||||
|
list of names or `None` to affect every registered motor. Defaults to `None`.
|
||||||
|
num_retry (int, optional): Number of additional retry attempts on communication failure.
|
||||||
|
Defaults to 0.
|
||||||
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
@@ -491,10 +562,26 @@ class MotorsBus(abc.ABC):
|
|||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def enable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
|
def enable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
|
||||||
|
"""Enable torque on selected motors.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
motor (int): Same semantics as :pymeth:`disable_torque`. Defaults to `None`.
|
||||||
|
model (str): _description_
|
||||||
|
num_retry (int, optional): _description_. Defaults to 0.
|
||||||
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def torque_disabled(self):
|
def torque_disabled(self):
|
||||||
|
"""Context-manager that guarantees torque is re-enabled.
|
||||||
|
|
||||||
|
This helper is useful to temporarily disable torque when configuring motors.
|
||||||
|
|
||||||
|
Examples:
|
||||||
|
>>> with bus.torque_disabled():
|
||||||
|
... # Safe operations here
|
||||||
|
... pass
|
||||||
|
"""
|
||||||
self.disable_torque()
|
self.disable_torque()
|
||||||
try:
|
try:
|
||||||
yield
|
yield
|
||||||
@@ -502,34 +589,74 @@ class MotorsBus(abc.ABC):
|
|||||||
self.enable_torque()
|
self.enable_torque()
|
||||||
|
|
||||||
def set_timeout(self, timeout_ms: int | None = None):
|
def set_timeout(self, timeout_ms: int | None = None):
|
||||||
|
"""Change the packet timeout used by the SDK.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
timeout_ms (int | None, optional): Timeout in *milliseconds*. If `None` (default) the method falls
|
||||||
|
back to :pyattr:`default_timeout`.
|
||||||
|
"""
|
||||||
timeout_ms = timeout_ms if timeout_ms is not None else self.default_timeout
|
timeout_ms = timeout_ms if timeout_ms is not None else self.default_timeout
|
||||||
self.port_handler.setPacketTimeoutMillis(timeout_ms)
|
self.port_handler.setPacketTimeoutMillis(timeout_ms)
|
||||||
|
|
||||||
def get_baudrate(self) -> int:
|
def get_baudrate(self) -> int:
|
||||||
|
"""Return the current baud-rate configured on the port.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int: Baud-rate in bits / second.
|
||||||
|
"""
|
||||||
return self.port_handler.getBaudRate()
|
return self.port_handler.getBaudRate()
|
||||||
|
|
||||||
def set_baudrate(self, baudrate: int) -> None:
|
def set_baudrate(self, baudrate: int) -> None:
|
||||||
|
"""Set a new UART baud-rate on the port.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
baudrate (int): Desired baud-rate in bits / second.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
RuntimeError: The SDK failed to apply the change.
|
||||||
|
"""
|
||||||
present_bus_baudrate = self.port_handler.getBaudRate()
|
present_bus_baudrate = self.port_handler.getBaudRate()
|
||||||
if present_bus_baudrate != baudrate:
|
if present_bus_baudrate != baudrate:
|
||||||
logger.info(f"Setting bus baud rate to {baudrate}. Previously {present_bus_baudrate}.")
|
logger.info(f"Setting bus baud rate to {baudrate}. Previously {present_bus_baudrate}.")
|
||||||
self.port_handler.setBaudRate(baudrate)
|
self.port_handler.setBaudRate(baudrate)
|
||||||
|
|
||||||
if self.port_handler.getBaudRate() != baudrate:
|
if self.port_handler.getBaudRate() != baudrate:
|
||||||
raise OSError("Failed to write bus baud rate.")
|
raise RuntimeError("Failed to write bus baud rate.")
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def is_calibrated(self) -> bool:
|
def is_calibrated(self) -> bool:
|
||||||
|
"""bool: ``True`` if the cached calibration matches the motors."""
|
||||||
return self.calibration == self.read_calibration()
|
return self.calibration == self.read_calibration()
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def read_calibration(self) -> dict[str, MotorCalibration]:
|
def read_calibration(self) -> dict[str, MotorCalibration]:
|
||||||
|
"""Read calibration parameters from the motors.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict[str, MotorCalibration]: Mapping *motor name → calibration*.
|
||||||
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def write_calibration(self, calibration_dict: dict[str, MotorCalibration]) -> None:
|
def write_calibration(self, calibration_dict: dict[str, MotorCalibration]) -> None:
|
||||||
|
"""Write calibration parameters to the motors and cache them.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
calibration_dict (dict[str, MotorCalibration]): Calibration obtained from
|
||||||
|
:pymeth:`read_calibration` or crafted by the user.
|
||||||
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def reset_calibration(self, motors: NameOrID | list[NameOrID] | None = None) -> None:
|
def reset_calibration(self, motors: NameOrID | list[NameOrID] | None = None) -> None:
|
||||||
|
"""Restore factory calibration for the selected motors.
|
||||||
|
|
||||||
|
Homing offset is set to ``0`` and min/max position limits are set to the full usable range.
|
||||||
|
The in-memory :pyattr:`calibration` is cleared.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
motors (NameOrID | list[NameOrID] | None, optional): Selection of motors. `None` (default)
|
||||||
|
resets every motor.
|
||||||
|
"""
|
||||||
if motors is None:
|
if motors is None:
|
||||||
motors = list(self.motors)
|
motors = list(self.motors)
|
||||||
elif isinstance(motors, (str, int)):
|
elif isinstance(motors, (str, int)):
|
||||||
@@ -547,24 +674,16 @@ class MotorsBus(abc.ABC):
|
|||||||
self.calibration = {}
|
self.calibration = {}
|
||||||
|
|
||||||
def set_half_turn_homings(self, motors: NameOrID | list[NameOrID] | None = None) -> dict[NameOrID, Value]:
|
def set_half_turn_homings(self, motors: NameOrID | list[NameOrID] | None = None) -> dict[NameOrID, Value]:
|
||||||
"""
|
"""Centre each motor range around its current position.
|
||||||
This assumes motors present positions are roughly in the middle of their desired range
|
|
||||||
|
|
||||||
Step 1: Set homing and min max to 0
|
The function computes and writes a homing offset such that the present position becomes exactly one
|
||||||
|
half-turn (e.g. `2047` on a 12-bit encoder).
|
||||||
|
|
||||||
Step 2: Read Present_Position which will be Actual_Position since
|
Args:
|
||||||
Present_Position = Actual_Position ± Homing_Offset (1)
|
motors (NameOrID | list[NameOrID] | None, optional): Motors to adjust. Defaults to all motors (`None`).
|
||||||
and Homing_Offset = 0 from step 1
|
|
||||||
|
|
||||||
Step 3: We want to set the Homing_Offset such that the current Present_Position to be half range of 1
|
Returns:
|
||||||
revolution. For instance, if 1 revolution corresponds to 4095 (4096 steps), this means we want the
|
dict[NameOrID, Value]: Mapping *motor → written homing offset*.
|
||||||
current Present_Position to be 2047.
|
|
||||||
|
|
||||||
In that example:
|
|
||||||
Present_Position = 2047 (2)
|
|
||||||
Actual_Position = X (read in step 2)
|
|
||||||
from (1) and (2):
|
|
||||||
=> Homing_Offset = ±(X - 2048)
|
|
||||||
"""
|
"""
|
||||||
if motors is None:
|
if motors is None:
|
||||||
motors = list(self.motors)
|
motors = list(self.motors)
|
||||||
@@ -588,10 +707,19 @@ class MotorsBus(abc.ABC):
|
|||||||
def record_ranges_of_motion(
|
def record_ranges_of_motion(
|
||||||
self, motors: NameOrID | list[NameOrID] | None = None, display_values: bool = True
|
self, motors: NameOrID | list[NameOrID] | None = None, display_values: bool = True
|
||||||
) -> tuple[dict[NameOrID, Value], dict[NameOrID, Value]]:
|
) -> tuple[dict[NameOrID, Value], dict[NameOrID, Value]]:
|
||||||
"""
|
"""Interactively record the min/max encoder values of each motor.
|
||||||
This assumes that the homing offsets have been set such that all possible values in the range of
|
|
||||||
motion are positive and that the zero is not crossed. To that end, `set_half_turn_homings` should
|
Move the joints by hand (with torque disabled) while the method streams live positions. Press
|
||||||
typically be called prior to this.
|
:kbd:`Enter` to finish.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
motors (NameOrID | list[NameOrID] | None, optional): Motors to record.
|
||||||
|
Defaults to every motor (`None`).
|
||||||
|
display_values (bool, optional): When `True` (default) a live table is printed to the console.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
tuple[dict[NameOrID, Value], dict[NameOrID, Value]]: Two dictionaries *mins* and *maxes* with the
|
||||||
|
extreme values observed for each motor.
|
||||||
"""
|
"""
|
||||||
if motors is None:
|
if motors is None:
|
||||||
motors = list(self.motors)
|
motors = list(self.motors)
|
||||||
@@ -621,6 +749,7 @@ class MotorsBus(abc.ABC):
|
|||||||
# Move cursor up to overwrite the previous output
|
# Move cursor up to overwrite the previous output
|
||||||
move_cursor_up(len(motors) + 3)
|
move_cursor_up(len(motors) + 3)
|
||||||
|
|
||||||
|
# TODO(Steven, aliberts): add check to ensure mins and maxes are different
|
||||||
return mins, maxes
|
return mins, maxes
|
||||||
|
|
||||||
def _normalize(self, data_name: str, ids_values: dict[int, int]) -> dict[int, float]:
|
def _normalize(self, data_name: str, ids_values: dict[int, int]) -> dict[int, float]:
|
||||||
@@ -702,6 +831,17 @@ class MotorsBus(abc.ABC):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def ping(self, motor: NameOrID, num_retry: int = 0, raise_on_error: bool = False) -> int | None:
|
def ping(self, motor: NameOrID, num_retry: int = 0, raise_on_error: bool = False) -> int | None:
|
||||||
|
"""Ping a single motor and return its model number.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
motor (NameOrID): Target motor (name or ID).
|
||||||
|
num_retry (int, optional): Extra attempts before giving up. Defaults to `0`.
|
||||||
|
raise_on_error (bool, optional): If `True` communication errors raise exceptions instead of
|
||||||
|
returning `None`. Defaults to `False`.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int | None: Motor model number or `None` on failure.
|
||||||
|
"""
|
||||||
id_ = self._get_motor_id(motor)
|
id_ = self._get_motor_id(motor)
|
||||||
for n_try in range(1 + num_retry):
|
for n_try in range(1 + num_retry):
|
||||||
model_number, comm, error = self.packet_handler.ping(self.port_handler, id_)
|
model_number, comm, error = self.packet_handler.ping(self.port_handler, id_)
|
||||||
@@ -724,6 +864,16 @@ class MotorsBus(abc.ABC):
|
|||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def broadcast_ping(self, num_retry: int = 0, raise_on_error: bool = False) -> dict[int, int] | None:
|
def broadcast_ping(self, num_retry: int = 0, raise_on_error: bool = False) -> dict[int, int] | None:
|
||||||
|
"""Ping every ID on the bus using the broadcast address.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
num_retry (int, optional): Retry attempts. Defaults to `0`.
|
||||||
|
raise_on_error (bool, optional): When `True` failures raise an exception instead of returning
|
||||||
|
`None`. Defaults to `False`.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict[int, int] | None: Mapping *id → model number* or `None` if the call failed.
|
||||||
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def read(
|
def read(
|
||||||
@@ -734,6 +884,18 @@ class MotorsBus(abc.ABC):
|
|||||||
normalize: bool = True,
|
normalize: bool = True,
|
||||||
num_retry: int = 0,
|
num_retry: int = 0,
|
||||||
) -> Value:
|
) -> Value:
|
||||||
|
"""Read a register from a motor.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data_name (str): Control-table key (e.g. `"Present_Position"`).
|
||||||
|
motor (str): Motor name.
|
||||||
|
normalize (bool, optional): When `True` (default) scale the value to a user-friendly range as
|
||||||
|
defined by the calibration.
|
||||||
|
num_retry (int, optional): Retry attempts. Defaults to `0`.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Value: Raw or normalised value depending on *normalize*.
|
||||||
|
"""
|
||||||
if not self.is_connected:
|
if not self.is_connected:
|
||||||
raise DeviceNotConnectedError(
|
raise DeviceNotConnectedError(
|
||||||
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
|
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
|
||||||
@@ -791,6 +953,21 @@ class MotorsBus(abc.ABC):
|
|||||||
def write(
|
def write(
|
||||||
self, data_name: str, motor: str, value: Value, *, normalize: bool = True, num_retry: int = 0
|
self, data_name: str, motor: str, value: Value, *, normalize: bool = True, num_retry: int = 0
|
||||||
) -> None:
|
) -> None:
|
||||||
|
"""Write a value to a single motor's register.
|
||||||
|
|
||||||
|
Contrary to :pymeth:`sync_write`, this expects a response status packet emitted by the motor, which
|
||||||
|
provides a guarantee that the value was written to the register successfully. In consequence, it is
|
||||||
|
slower than :pymeth:`sync_write` but it is more reliable. It should typically be used when configuring
|
||||||
|
motors.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data_name (str): Register name.
|
||||||
|
motor (str): Motor name.
|
||||||
|
value (Value): Value to write. If *normalize* is `True` the value is first converted to raw
|
||||||
|
units.
|
||||||
|
normalize (bool, optional): Enable or disable normalisation. Defaults to `True`.
|
||||||
|
num_retry (int, optional): Retry attempts. Defaults to `0`.
|
||||||
|
"""
|
||||||
if not self.is_connected:
|
if not self.is_connected:
|
||||||
raise DeviceNotConnectedError(
|
raise DeviceNotConnectedError(
|
||||||
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
|
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
|
||||||
@@ -844,6 +1021,17 @@ class MotorsBus(abc.ABC):
|
|||||||
normalize: bool = True,
|
normalize: bool = True,
|
||||||
num_retry: int = 0,
|
num_retry: int = 0,
|
||||||
) -> dict[str, Value]:
|
) -> dict[str, Value]:
|
||||||
|
"""Read the same register from several motors at once.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data_name (str): Register name.
|
||||||
|
motors (str | list[str] | None, optional): Motors to query. `None` (default) reads every motor.
|
||||||
|
normalize (bool, optional): Normalisation flag. Defaults to `True`.
|
||||||
|
num_retry (int, optional): Retry attempts. Defaults to `0`.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict[str, Value]: Mapping *motor name → value*.
|
||||||
|
"""
|
||||||
if not self.is_connected:
|
if not self.is_connected:
|
||||||
raise DeviceNotConnectedError(
|
raise DeviceNotConnectedError(
|
||||||
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
|
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
|
||||||
@@ -928,6 +1116,19 @@ class MotorsBus(abc.ABC):
|
|||||||
normalize: bool = True,
|
normalize: bool = True,
|
||||||
num_retry: int = 0,
|
num_retry: int = 0,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
"""Write the same register on multiple motors.
|
||||||
|
|
||||||
|
Contrary to :pymeth:`write`, this *does not* expects a response status packet emitted by the motor, which
|
||||||
|
can allow for lost packets. It is faster than :pymeth:`write` and should typically be used when
|
||||||
|
frequency matters and losing some packets is acceptable (e.g. teleoperation loops).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data_name (str): Register name.
|
||||||
|
values (Value | dict[str, Value]): Either a single value (applied to every motor) or a mapping
|
||||||
|
*motor name → value*.
|
||||||
|
normalize (bool, optional): If `True` (default) convert values from the user range to raw units.
|
||||||
|
num_retry (int, optional): Retry attempts. Defaults to `0`.
|
||||||
|
"""
|
||||||
if not self.is_connected:
|
if not self.is_connected:
|
||||||
raise DeviceNotConnectedError(
|
raise DeviceNotConnectedError(
|
||||||
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
|
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
|
||||||
@@ -980,17 +1181,3 @@ class MotorsBus(abc.ABC):
|
|||||||
for id_, value in ids_values.items():
|
for id_, value in ids_values.items():
|
||||||
data = self._serialize_data(value, length)
|
data = self._serialize_data(value, length)
|
||||||
self.sync_writer.addParam(id_, data)
|
self.sync_writer.addParam(id_, data)
|
||||||
|
|
||||||
def disconnect(self, disable_torque: bool = True) -> None:
|
|
||||||
if not self.is_connected:
|
|
||||||
raise DeviceNotConnectedError(
|
|
||||||
f"{self.__class__.__name__}('{self.port}') is not connected. Try running `{self.__class__.__name__}.connect()` first."
|
|
||||||
)
|
|
||||||
|
|
||||||
if disable_torque:
|
|
||||||
self.port_handler.clearPort()
|
|
||||||
self.port_handler.is_using = False
|
|
||||||
self.disable_torque(num_retry=5)
|
|
||||||
|
|
||||||
self.port_handler.closePort()
|
|
||||||
logger.debug(f"{self.__class__.__name__} disconnected.")
|
|
||||||
|
|||||||
Reference in New Issue
Block a user