Add _configure_motors & move ping methods
This commit is contained in:
@@ -91,16 +91,11 @@ class DynamixelMotorsBus(MotorsBus):
|
|||||||
self._comm_success = dxl.COMM_SUCCESS
|
self._comm_success = dxl.COMM_SUCCESS
|
||||||
self._no_error = 0x00
|
self._no_error = 0x00
|
||||||
|
|
||||||
def broadcast_ping(
|
def _configure_motors(self) -> None:
|
||||||
self, num_retry: int = 0, raise_on_error: bool = False
|
# By default, Dynamixel motors have a 500µs delay response time (corresponding to a value of 250 on
|
||||||
) -> dict[int, list[int, int]] | None:
|
# the 'Return_Delay_Time' address). We ensure this is reduced to the minimum of 2µs (value of 0).
|
||||||
for _ in range(1 + num_retry):
|
for id_ in self.ids:
|
||||||
data_list, comm = self.packet_handler.broadcastPing(self.port_handler)
|
self.write("Return_Delay_Time", id_, 0)
|
||||||
if self._is_comm_success(comm):
|
|
||||||
return data_list
|
|
||||||
|
|
||||||
if raise_on_error:
|
|
||||||
raise ConnectionError(f"Broadcast ping returned a {comm} comm error.")
|
|
||||||
|
|
||||||
def _calibrate_values(self, ids_values: dict[int, int]) -> dict[int, float]:
|
def _calibrate_values(self, ids_values: dict[int, int]) -> dict[int, float]:
|
||||||
# TODO
|
# TODO
|
||||||
@@ -139,3 +134,14 @@ class DynamixelMotorsBus(MotorsBus):
|
|||||||
dxl.DXL_HIBYTE(dxl.DXL_HIWORD(value)),
|
dxl.DXL_HIBYTE(dxl.DXL_HIWORD(value)),
|
||||||
]
|
]
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
def broadcast_ping(
|
||||||
|
self, num_retry: int = 0, raise_on_error: bool = False
|
||||||
|
) -> dict[int, list[int, int]] | None:
|
||||||
|
for _ in range(1 + num_retry):
|
||||||
|
data_list, comm = self.packet_handler.broadcastPing(self.port_handler)
|
||||||
|
if self._is_comm_success(comm):
|
||||||
|
return data_list
|
||||||
|
|
||||||
|
if raise_on_error:
|
||||||
|
raise ConnectionError(f"Broadcast ping returned a {comm} comm error.")
|
||||||
|
|||||||
@@ -71,11 +71,11 @@ class FeetechMotorsBus(MotorsBus):
|
|||||||
self._comm_success = scs.COMM_SUCCESS
|
self._comm_success = scs.COMM_SUCCESS
|
||||||
self._no_error = 0x00
|
self._no_error = 0x00
|
||||||
|
|
||||||
def broadcast_ping(
|
def _configure_motors(self) -> None:
|
||||||
self, num_retry: int = 0, raise_on_error: bool = False
|
# By default, Feetech motors have a 500µs delay response time (corresponding to a value of 250 on the
|
||||||
) -> dict[int, list[int, int]] | None:
|
# 'Return_Delay' address). We ensure this is reduced to the minimum of 2µs (value of 0).
|
||||||
# TODO
|
for id_ in self.ids:
|
||||||
raise NotImplementedError
|
self.write("Return_Delay", id_, 0)
|
||||||
|
|
||||||
def _calibrate_values(self, ids_values: dict[int, int]) -> dict[int, float]:
|
def _calibrate_values(self, ids_values: dict[int, int]) -> dict[int, float]:
|
||||||
# TODO
|
# TODO
|
||||||
@@ -114,3 +114,9 @@ class FeetechMotorsBus(MotorsBus):
|
|||||||
scs.SCS_HIBYTE(scs.SCS_HIWORD(value)),
|
scs.SCS_HIBYTE(scs.SCS_HIWORD(value)),
|
||||||
]
|
]
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
def broadcast_ping(
|
||||||
|
self, num_retry: int = 0, raise_on_error: bool = False
|
||||||
|
) -> dict[int, list[int, int]] | None:
|
||||||
|
# TODO
|
||||||
|
raise NotImplementedError
|
||||||
|
|||||||
@@ -351,6 +351,10 @@ class MotorsBus(abc.ABC):
|
|||||||
self.set_timeout()
|
self.set_timeout()
|
||||||
logger.debug(f"{self.__class__.__name__} connected.")
|
logger.debug(f"{self.__class__.__name__} connected.")
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def _configure_motors(self) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
def set_timeout(self, timeout_ms: int | None = None):
|
def set_timeout(self, timeout_ms: int | None = None):
|
||||||
timeout_ms = timeout_ms if timeout_ms is not None else self.default_timeout
|
timeout_ms = timeout_ms if timeout_ms is not None else self.default_timeout
|
||||||
self.port_handler.setPacketTimeoutMillis(timeout_ms)
|
self.port_handler.setPacketTimeoutMillis(timeout_ms)
|
||||||
@@ -380,23 +384,6 @@ class MotorsBus(abc.ABC):
|
|||||||
logger.error(e)
|
logger.error(e)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def ping(self, motor: NameOrID, num_retry: int = 0, raise_on_error: bool = False) -> int | None:
|
|
||||||
idx = self._get_motor_id(motor)
|
|
||||||
for n_try in range(1 + num_retry):
|
|
||||||
model_number, comm, error = self.packet_handler.ping(self.port_handler, idx)
|
|
||||||
if self._is_comm_success(comm):
|
|
||||||
return model_number
|
|
||||||
logger.debug(f"ping failed for {idx=}: {n_try=} got {comm=} {error=}")
|
|
||||||
|
|
||||||
if raise_on_error:
|
|
||||||
raise ConnectionError(f"Ping motor {motor} returned a {error} error code.")
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def broadcast_ping(
|
|
||||||
self, num_retry: int = 0, raise_on_error: bool = False
|
|
||||||
) -> dict[int, list[int, int]] | None:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def set_calibration(self, calibration_fpath: Path) -> None:
|
def set_calibration(self, calibration_fpath: Path) -> None:
|
||||||
with open(calibration_fpath) as f:
|
with open(calibration_fpath) as f:
|
||||||
calibration = json.load(f)
|
calibration = json.load(f)
|
||||||
@@ -449,6 +436,23 @@ class MotorsBus(abc.ABC):
|
|||||||
"""
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def ping(self, motor: NameOrID, num_retry: int = 0, raise_on_error: bool = False) -> int | None:
|
||||||
|
idx = self._get_motor_id(motor)
|
||||||
|
for n_try in range(1 + num_retry):
|
||||||
|
model_number, comm, error = self.packet_handler.ping(self.port_handler, idx)
|
||||||
|
if self._is_comm_success(comm):
|
||||||
|
return model_number
|
||||||
|
logger.debug(f"ping failed for {idx=}: {n_try=} got {comm=} {error=}")
|
||||||
|
|
||||||
|
if raise_on_error:
|
||||||
|
raise ConnectionError(f"Ping motor {motor} returned a {error} error code.")
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def broadcast_ping(
|
||||||
|
self, num_retry: int = 0, raise_on_error: bool = False
|
||||||
|
) -> dict[int, list[int, int]] | None:
|
||||||
|
pass
|
||||||
|
|
||||||
@overload
|
@overload
|
||||||
def sync_read(self, data_name: str, motors: None = ..., num_retry: int = ...) -> dict[str, Value]: ...
|
def sync_read(self, data_name: str, motors: None = ..., num_retry: int = ...) -> dict[str, Value]: ...
|
||||||
@overload
|
@overload
|
||||||
|
|||||||
Reference in New Issue
Block a user