From 742708942c0a808f91bfe6e02c34c0667a99562e Mon Sep 17 00:00:00 2001 From: Simon Alibert Date: Tue, 13 May 2025 13:24:46 +0200 Subject: [PATCH] Add MotorsBus docstrings --- lerobot/common/motors/motors_bus.py | 281 +++++++++++++++++++++++----- 1 file changed, 234 insertions(+), 47 deletions(-) diff --git a/lerobot/common/motors/motors_bus.py b/lerobot/common/motors/motors_bus.py index fbae15ee..2a444d45 100644 --- a/lerobot/common/motors/motors_bus.py +++ b/lerobot/common/motors/motors_bus.py @@ -213,21 +213,19 @@ class GroupSyncWrite(Protocol): class MotorsBus(abc.ABC): - """The main LeRobot class for implementing motors buses. - + """ + A MotorsBus allows to efficiently read and write to the attached motors. + It represents several motors daisy-chained together and connected through a serial port. There are currently two implementations of this abstract class: - DynamixelMotorsBus - FeetechMotorsBus - Note: This class may evolve in the future should we add support for other manufacturers SDKs. - - A MotorsBus allows to efficiently read and write to the attached motors. - It represents several motors daisy-chained together and connected through a serial port. + Note: This class may evolve in the future should we add support for other types of bus. A MotorsBus subclass instance requires a port (e.g. `FeetechMotorsBus(port="/dev/tty.usbmodem575E0031751"`)). To find the port, you can run our utility script: ```bash - python lerobot/scripts/find_motors_bus_port.py + python -m lerobot.find_port.py >>> Finding all available ports for the MotorsBus. >>> ['/dev/tty.usbmodem575E0032081', '/dev/tty.usbmodem575E0031751'] >>> Remove the usb cable from your MotorsBus and press Enter when done. @@ -237,20 +235,20 @@ class MotorsBus(abc.ABC): Example of usage for 1 Feetech sts3215 motor connected to the bus: ```python - motors_bus = FeetechMotorsBus( + bus = FeetechMotorsBus( port="/dev/tty.usbmodem575E0031751", - motors={"gripper": (6, "sts3215")}, + motors={"my_motor": (1, "sts3215")}, ) - motors_bus.connect() + bus.connect() - position = motors_bus.read("Present_Position") + position = bus.read("Present_Position", normalize=False) # Move from a few motor steps as an example few_steps = 30 - motors_bus.write("Goal_Position", position + few_steps) + bus.write("Goal_Position", position + few_steps, normalize=False) # When done, properly disconnect the port using - motors_bus.disconnect() + bus.disconnect() ``` """ @@ -403,9 +401,20 @@ class MotorsBus(abc.ABC): @property def is_connected(self) -> bool: + """bool: `True` if the underlying serial port is open.""" return self.port_handler.is_open def connect(self, handshake: bool = True) -> None: + """Open the serial port and initialise communication. + + Args: + handshake (bool, optional): Pings every expected motor and performs additional + integrity checks specific to the implementation. Defaults to `True`. + + Raises: + DeviceAlreadyConnectedError: The port is already open. + ConnectionError: The underlying SDK failed to open the port or the handshake did not succeed. + """ if self.is_connected: raise DeviceAlreadyConnectedError( f"{self.__class__.__name__}('{self.port}') is already connected. Do not call `{self.__class__.__name__}.connect()` twice." @@ -431,8 +440,39 @@ class MotorsBus(abc.ABC): def _handshake(self) -> None: pass + def disconnect(self, disable_torque: bool = True) -> None: + """Close the serial port (optionally disabling torque first). + + Args: + disable_torque (bool, optional): If `True` (default) torque is disabled on every motor before + closing the port. This can prevent damaging motors if they are left applying resisting torque + after disconnect. + """ + if not self.is_connected: + raise DeviceNotConnectedError( + f"{self.__class__.__name__}('{self.port}') is not connected. Try running `{self.__class__.__name__}.connect()` first." + ) + + if disable_torque: + self.port_handler.clearPort() + self.port_handler.is_using = False + self.disable_torque(num_retry=5) + + self.port_handler.closePort() + logger.debug(f"{self.__class__.__name__} disconnected.") + @classmethod def scan_port(cls, port: str, *args, **kwargs) -> dict[int, list[int]]: + """Probe *port* at every supported baud-rate and list responding IDs. + + Args: + port (str): Serial/USB port to scan (e.g. ``"/dev/ttyUSB0"``). + *args, **kwargs: Forwarded to the subclass constructor. + + Returns: + dict[int, list[int]]: Mapping *baud-rate → list of motor IDs* + for every baud-rate that produced at least one response. + """ bus = cls(port, {}, *args, **kwargs) bus._connect(handshake=False) baudrate_ids = {} @@ -448,6 +488,22 @@ class MotorsBus(abc.ABC): def setup_motor( self, motor: str, initial_baudrate: int | None = None, initial_id: int | None = None ) -> None: + """Assign the correct ID and baud-rate to a single motor. + + This helper temporarily switches to the motor's current settings, disables torque, sets the desired + ID, and finally programs the bus' default baud-rate. + + Args: + motor (str): Key of the motor in :pyattr:`motors`. + initial_baudrate (int | None, optional): Current baud-rate (skips scanning when provided). + Defaults to None. + initial_id (int | None, optional): Current ID (skips scanning when provided). Defaults to None. + + Raises: + RuntimeError: The motor could not be found or its model number + does not match the expected one. + ConnectionError: Communication with the motor failed. + """ if not self.is_connected: self._connect(handshake=False) @@ -479,10 +535,25 @@ class MotorsBus(abc.ABC): @abc.abstractmethod def configure_motors(self) -> None: + """Write implementation-specific recommended settings to every motor. + + Typical changes include shortening the return delay, increasing + acceleration limits or disabling safety locks. + """ pass @abc.abstractmethod def disable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0) -> None: + """Disable torque on selected motors. + + Disabling Torque allows to write to the motors' permanent memory area (EPROM/EEPROM). + + Args: + motors (int | str | list[str] | None, optional): Target motors. Accepts a motor name, an ID, a + list of names or `None` to affect every registered motor. Defaults to `None`. + num_retry (int, optional): Number of additional retry attempts on communication failure. + Defaults to 0. + """ pass @abc.abstractmethod @@ -491,10 +562,26 @@ class MotorsBus(abc.ABC): @abc.abstractmethod def enable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None: + """Enable torque on selected motors. + + Args: + motor (int): Same semantics as :pymeth:`disable_torque`. Defaults to `None`. + model (str): _description_ + num_retry (int, optional): _description_. Defaults to 0. + """ pass @contextmanager def torque_disabled(self): + """Context-manager that guarantees torque is re-enabled. + + This helper is useful to temporarily disable torque when configuring motors. + + Examples: + >>> with bus.torque_disabled(): + ... # Safe operations here + ... pass + """ self.disable_torque() try: yield @@ -502,34 +589,74 @@ class MotorsBus(abc.ABC): self.enable_torque() def set_timeout(self, timeout_ms: int | None = None): + """Change the packet timeout used by the SDK. + + Args: + timeout_ms (int | None, optional): Timeout in *milliseconds*. If `None` (default) the method falls + back to :pyattr:`default_timeout`. + """ timeout_ms = timeout_ms if timeout_ms is not None else self.default_timeout self.port_handler.setPacketTimeoutMillis(timeout_ms) def get_baudrate(self) -> int: + """Return the current baud-rate configured on the port. + + Returns: + int: Baud-rate in bits / second. + """ return self.port_handler.getBaudRate() def set_baudrate(self, baudrate: int) -> None: + """Set a new UART baud-rate on the port. + + Args: + baudrate (int): Desired baud-rate in bits / second. + + Raises: + RuntimeError: The SDK failed to apply the change. + """ present_bus_baudrate = self.port_handler.getBaudRate() if present_bus_baudrate != baudrate: logger.info(f"Setting bus baud rate to {baudrate}. Previously {present_bus_baudrate}.") self.port_handler.setBaudRate(baudrate) if self.port_handler.getBaudRate() != baudrate: - raise OSError("Failed to write bus baud rate.") + raise RuntimeError("Failed to write bus baud rate.") @property def is_calibrated(self) -> bool: + """bool: ``True`` if the cached calibration matches the motors.""" return self.calibration == self.read_calibration() @abc.abstractmethod def read_calibration(self) -> dict[str, MotorCalibration]: + """Read calibration parameters from the motors. + + Returns: + dict[str, MotorCalibration]: Mapping *motor name → calibration*. + """ pass @abc.abstractmethod def write_calibration(self, calibration_dict: dict[str, MotorCalibration]) -> None: + """Write calibration parameters to the motors and cache them. + + Args: + calibration_dict (dict[str, MotorCalibration]): Calibration obtained from + :pymeth:`read_calibration` or crafted by the user. + """ pass def reset_calibration(self, motors: NameOrID | list[NameOrID] | None = None) -> None: + """Restore factory calibration for the selected motors. + + Homing offset is set to ``0`` and min/max position limits are set to the full usable range. + The in-memory :pyattr:`calibration` is cleared. + + Args: + motors (NameOrID | list[NameOrID] | None, optional): Selection of motors. `None` (default) + resets every motor. + """ if motors is None: motors = list(self.motors) elif isinstance(motors, (str, int)): @@ -547,24 +674,16 @@ class MotorsBus(abc.ABC): self.calibration = {} def set_half_turn_homings(self, motors: NameOrID | list[NameOrID] | None = None) -> dict[NameOrID, Value]: - """ - This assumes motors present positions are roughly in the middle of their desired range + """Centre each motor range around its current position. - Step 1: Set homing and min max to 0 + The function computes and writes a homing offset such that the present position becomes exactly one + half-turn (e.g. `2047` on a 12-bit encoder). - Step 2: Read Present_Position which will be Actual_Position since - Present_Position = Actual_Position ± Homing_Offset (1) - and Homing_Offset = 0 from step 1 + Args: + motors (NameOrID | list[NameOrID] | None, optional): Motors to adjust. Defaults to all motors (`None`). - Step 3: We want to set the Homing_Offset such that the current Present_Position to be half range of 1 - revolution. For instance, if 1 revolution corresponds to 4095 (4096 steps), this means we want the - current Present_Position to be 2047. - - In that example: - Present_Position = 2047 (2) - Actual_Position = X (read in step 2) - from (1) and (2): - => Homing_Offset = ±(X - 2048) + Returns: + dict[NameOrID, Value]: Mapping *motor → written homing offset*. """ if motors is None: motors = list(self.motors) @@ -588,10 +707,19 @@ class MotorsBus(abc.ABC): def record_ranges_of_motion( self, motors: NameOrID | list[NameOrID] | None = None, display_values: bool = True ) -> tuple[dict[NameOrID, Value], dict[NameOrID, Value]]: - """ - This assumes that the homing offsets have been set such that all possible values in the range of - motion are positive and that the zero is not crossed. To that end, `set_half_turn_homings` should - typically be called prior to this. + """Interactively record the min/max encoder values of each motor. + + Move the joints by hand (with torque disabled) while the method streams live positions. Press + :kbd:`Enter` to finish. + + Args: + motors (NameOrID | list[NameOrID] | None, optional): Motors to record. + Defaults to every motor (`None`). + display_values (bool, optional): When `True` (default) a live table is printed to the console. + + Returns: + tuple[dict[NameOrID, Value], dict[NameOrID, Value]]: Two dictionaries *mins* and *maxes* with the + extreme values observed for each motor. """ if motors is None: motors = list(self.motors) @@ -621,6 +749,7 @@ class MotorsBus(abc.ABC): # Move cursor up to overwrite the previous output move_cursor_up(len(motors) + 3) + # TODO(Steven, aliberts): add check to ensure mins and maxes are different return mins, maxes def _normalize(self, data_name: str, ids_values: dict[int, int]) -> dict[int, float]: @@ -702,6 +831,17 @@ class MotorsBus(abc.ABC): pass def ping(self, motor: NameOrID, num_retry: int = 0, raise_on_error: bool = False) -> int | None: + """Ping a single motor and return its model number. + + Args: + motor (NameOrID): Target motor (name or ID). + num_retry (int, optional): Extra attempts before giving up. Defaults to `0`. + raise_on_error (bool, optional): If `True` communication errors raise exceptions instead of + returning `None`. Defaults to `False`. + + Returns: + int | None: Motor model number or `None` on failure. + """ id_ = self._get_motor_id(motor) for n_try in range(1 + num_retry): model_number, comm, error = self.packet_handler.ping(self.port_handler, id_) @@ -724,6 +864,16 @@ class MotorsBus(abc.ABC): @abc.abstractmethod def broadcast_ping(self, num_retry: int = 0, raise_on_error: bool = False) -> dict[int, int] | None: + """Ping every ID on the bus using the broadcast address. + + Args: + num_retry (int, optional): Retry attempts. Defaults to `0`. + raise_on_error (bool, optional): When `True` failures raise an exception instead of returning + `None`. Defaults to `False`. + + Returns: + dict[int, int] | None: Mapping *id → model number* or `None` if the call failed. + """ pass def read( @@ -734,6 +884,18 @@ class MotorsBus(abc.ABC): normalize: bool = True, num_retry: int = 0, ) -> Value: + """Read a register from a motor. + + Args: + data_name (str): Control-table key (e.g. `"Present_Position"`). + motor (str): Motor name. + normalize (bool, optional): When `True` (default) scale the value to a user-friendly range as + defined by the calibration. + num_retry (int, optional): Retry attempts. Defaults to `0`. + + Returns: + Value: Raw or normalised value depending on *normalize*. + """ if not self.is_connected: raise DeviceNotConnectedError( f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`." @@ -791,6 +953,21 @@ class MotorsBus(abc.ABC): def write( self, data_name: str, motor: str, value: Value, *, normalize: bool = True, num_retry: int = 0 ) -> None: + """Write a value to a single motor's register. + + Contrary to :pymeth:`sync_write`, this expects a response status packet emitted by the motor, which + provides a guarantee that the value was written to the register successfully. In consequence, it is + slower than :pymeth:`sync_write` but it is more reliable. It should typically be used when configuring + motors. + + Args: + data_name (str): Register name. + motor (str): Motor name. + value (Value): Value to write. If *normalize* is `True` the value is first converted to raw + units. + normalize (bool, optional): Enable or disable normalisation. Defaults to `True`. + num_retry (int, optional): Retry attempts. Defaults to `0`. + """ if not self.is_connected: raise DeviceNotConnectedError( f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`." @@ -844,6 +1021,17 @@ class MotorsBus(abc.ABC): normalize: bool = True, num_retry: int = 0, ) -> dict[str, Value]: + """Read the same register from several motors at once. + + Args: + data_name (str): Register name. + motors (str | list[str] | None, optional): Motors to query. `None` (default) reads every motor. + normalize (bool, optional): Normalisation flag. Defaults to `True`. + num_retry (int, optional): Retry attempts. Defaults to `0`. + + Returns: + dict[str, Value]: Mapping *motor name → value*. + """ if not self.is_connected: raise DeviceNotConnectedError( f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`." @@ -928,6 +1116,19 @@ class MotorsBus(abc.ABC): normalize: bool = True, num_retry: int = 0, ) -> None: + """Write the same register on multiple motors. + + Contrary to :pymeth:`write`, this *does not* expects a response status packet emitted by the motor, which + can allow for lost packets. It is faster than :pymeth:`write` and should typically be used when + frequency matters and losing some packets is acceptable (e.g. teleoperation loops). + + Args: + data_name (str): Register name. + values (Value | dict[str, Value]): Either a single value (applied to every motor) or a mapping + *motor name → value*. + normalize (bool, optional): If `True` (default) convert values from the user range to raw units. + num_retry (int, optional): Retry attempts. Defaults to `0`. + """ if not self.is_connected: raise DeviceNotConnectedError( f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`." @@ -980,17 +1181,3 @@ class MotorsBus(abc.ABC): for id_, value in ids_values.items(): data = self._serialize_data(value, length) self.sync_writer.addParam(id_, data) - - def disconnect(self, disable_torque: bool = True) -> None: - if not self.is_connected: - raise DeviceNotConnectedError( - f"{self.__class__.__name__}('{self.port}') is not connected. Try running `{self.__class__.__name__}.connect()` first." - ) - - if disable_torque: - self.port_handler.clearPort() - self.port_handler.is_using = False - self.disable_torque(num_retry=5) - - self.port_handler.closePort() - logger.debug(f"{self.__class__.__name__} disconnected.")