diff --git a/lerobot/common/motors/dynamixel/dynamixel.py b/lerobot/common/motors/dynamixel/dynamixel.py index 3a8a80be..c26114ad 100644 --- a/lerobot/common/motors/dynamixel/dynamixel.py +++ b/lerobot/common/motors/dynamixel/dynamixel.py @@ -91,16 +91,11 @@ class DynamixelMotorsBus(MotorsBus): self._comm_success = dxl.COMM_SUCCESS self._no_error = 0x00 - def broadcast_ping( - self, num_retry: int = 0, raise_on_error: bool = False - ) -> dict[int, list[int, int]] | None: - for _ in range(1 + num_retry): - data_list, comm = self.packet_handler.broadcastPing(self.port_handler) - if self._is_comm_success(comm): - return data_list - - if raise_on_error: - raise ConnectionError(f"Broadcast ping returned a {comm} comm error.") + def _configure_motors(self) -> None: + # By default, Dynamixel motors have a 500µs delay response time (corresponding to a value of 250 on + # the 'Return_Delay_Time' address). We ensure this is reduced to the minimum of 2µs (value of 0). + for id_ in self.ids: + self.write("Return_Delay_Time", id_, 0) def _calibrate_values(self, ids_values: dict[int, int]) -> dict[int, float]: # TODO @@ -139,3 +134,14 @@ class DynamixelMotorsBus(MotorsBus): dxl.DXL_HIBYTE(dxl.DXL_HIWORD(value)), ] return data + + def broadcast_ping( + self, num_retry: int = 0, raise_on_error: bool = False + ) -> dict[int, list[int, int]] | None: + for _ in range(1 + num_retry): + data_list, comm = self.packet_handler.broadcastPing(self.port_handler) + if self._is_comm_success(comm): + return data_list + + if raise_on_error: + raise ConnectionError(f"Broadcast ping returned a {comm} comm error.") diff --git a/lerobot/common/motors/feetech/feetech.py b/lerobot/common/motors/feetech/feetech.py index 98144473..78c30045 100644 --- a/lerobot/common/motors/feetech/feetech.py +++ b/lerobot/common/motors/feetech/feetech.py @@ -71,11 +71,11 @@ class FeetechMotorsBus(MotorsBus): self._comm_success = scs.COMM_SUCCESS self._no_error = 0x00 - def broadcast_ping( - self, num_retry: int = 0, raise_on_error: bool = False - ) -> dict[int, list[int, int]] | None: - # TODO - raise NotImplementedError + def _configure_motors(self) -> None: + # By default, Feetech motors have a 500µs delay response time (corresponding to a value of 250 on the + # 'Return_Delay' address). We ensure this is reduced to the minimum of 2µs (value of 0). + for id_ in self.ids: + self.write("Return_Delay", id_, 0) def _calibrate_values(self, ids_values: dict[int, int]) -> dict[int, float]: # TODO @@ -114,3 +114,9 @@ class FeetechMotorsBus(MotorsBus): scs.SCS_HIBYTE(scs.SCS_HIWORD(value)), ] return data + + def broadcast_ping( + self, num_retry: int = 0, raise_on_error: bool = False + ) -> dict[int, list[int, int]] | None: + # TODO + raise NotImplementedError diff --git a/lerobot/common/motors/motors_bus.py b/lerobot/common/motors/motors_bus.py index 8329c0a0..25b01c5d 100644 --- a/lerobot/common/motors/motors_bus.py +++ b/lerobot/common/motors/motors_bus.py @@ -351,6 +351,10 @@ class MotorsBus(abc.ABC): self.set_timeout() logger.debug(f"{self.__class__.__name__} connected.") + @abc.abstractmethod + def _configure_motors(self) -> None: + pass + def set_timeout(self, timeout_ms: int | None = None): timeout_ms = timeout_ms if timeout_ms is not None else self.default_timeout self.port_handler.setPacketTimeoutMillis(timeout_ms) @@ -380,23 +384,6 @@ class MotorsBus(abc.ABC): logger.error(e) return False - def ping(self, motor: NameOrID, num_retry: int = 0, raise_on_error: bool = False) -> int | None: - idx = self._get_motor_id(motor) - for n_try in range(1 + num_retry): - model_number, comm, error = self.packet_handler.ping(self.port_handler, idx) - if self._is_comm_success(comm): - return model_number - logger.debug(f"ping failed for {idx=}: {n_try=} got {comm=} {error=}") - - if raise_on_error: - raise ConnectionError(f"Ping motor {motor} returned a {error} error code.") - - @abc.abstractmethod - def broadcast_ping( - self, num_retry: int = 0, raise_on_error: bool = False - ) -> dict[int, list[int, int]] | None: - pass - def set_calibration(self, calibration_fpath: Path) -> None: with open(calibration_fpath) as f: calibration = json.load(f) @@ -449,6 +436,23 @@ class MotorsBus(abc.ABC): """ pass + def ping(self, motor: NameOrID, num_retry: int = 0, raise_on_error: bool = False) -> int | None: + idx = self._get_motor_id(motor) + for n_try in range(1 + num_retry): + model_number, comm, error = self.packet_handler.ping(self.port_handler, idx) + if self._is_comm_success(comm): + return model_number + logger.debug(f"ping failed for {idx=}: {n_try=} got {comm=} {error=}") + + if raise_on_error: + raise ConnectionError(f"Ping motor {motor} returned a {error} error code.") + + @abc.abstractmethod + def broadcast_ping( + self, num_retry: int = 0, raise_on_error: bool = False + ) -> dict[int, list[int, int]] | None: + pass + @overload def sync_read(self, data_name: str, motors: None = ..., num_retry: int = ...) -> dict[str, Value]: ... @overload