forked from tangger/lerobot
Return models (str) with pings
This commit is contained in:
@@ -18,20 +18,22 @@
|
||||
# https://emanual.robotis.com/docs/en/dxl/protocol2/#fast-sync-read-0x8a
|
||||
# -> Need to check compatibility across models
|
||||
|
||||
import logging
|
||||
from copy import deepcopy
|
||||
from enum import Enum
|
||||
|
||||
from ..motors_bus import Motor, MotorsBus
|
||||
from .tables import MODEL_BAUDRATE_TABLE, MODEL_CONTROL_TABLE, MODEL_RESOLUTION
|
||||
from .tables import MODEL_BAUDRATE_TABLE, MODEL_CONTROL_TABLE, MODEL_NUMBER, MODEL_RESOLUTION
|
||||
|
||||
PROTOCOL_VERSION = 2.0
|
||||
BAUDRATE = 1_000_000
|
||||
DEFAULT_TIMEOUT_MS = 1000
|
||||
MAX_ID_RANGE = 252
|
||||
|
||||
CALIBRATION_REQUIRED = ["Goal_Position", "Present_Position"]
|
||||
CONVERT_UINT32_TO_INT32_REQUIRED = ["Goal_Position", "Present_Position"]
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class OperatingMode(Enum):
|
||||
# DYNAMIXEL only controls current(torque) regardless of speed and position. This mode is ideal for a
|
||||
@@ -73,6 +75,7 @@ class DynamixelMotorsBus(MotorsBus):
|
||||
model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE)
|
||||
model_resolution_table = deepcopy(MODEL_RESOLUTION)
|
||||
model_baudrate_table = deepcopy(MODEL_BAUDRATE_TABLE)
|
||||
model_number_table = deepcopy(MODEL_NUMBER)
|
||||
calibration_required = deepcopy(CALIBRATION_REQUIRED)
|
||||
default_timeout = DEFAULT_TIMEOUT_MS
|
||||
|
||||
@@ -135,13 +138,19 @@ class DynamixelMotorsBus(MotorsBus):
|
||||
]
|
||||
return data
|
||||
|
||||
def broadcast_ping(
|
||||
self, num_retry: int = 0, raise_on_error: bool = False
|
||||
) -> dict[int, list[int, int]] | None:
|
||||
for _ in range(1 + num_retry):
|
||||
def broadcast_ping(self, num_retry: int = 0, raise_on_error: bool = False) -> dict[int, str] | None:
|
||||
for n_try in range(1 + num_retry):
|
||||
data_list, comm = self.packet_handler.broadcastPing(self.port_handler)
|
||||
if self._is_comm_success(comm):
|
||||
return data_list
|
||||
break
|
||||
logger.debug(f"Broadcast failed on port '{self.port}' ({n_try=})")
|
||||
logger.debug(self.packet_handler.getRxPacketError(comm))
|
||||
|
||||
if raise_on_error:
|
||||
raise ConnectionError(f"Broadcast ping returned a {comm} comm error.")
|
||||
if not self._is_comm_success(comm):
|
||||
if raise_on_error:
|
||||
logger.error(self.packet_handler.getRxPacketError(comm))
|
||||
raise ConnectionError
|
||||
|
||||
return data_list if data_list else None
|
||||
|
||||
return {id_: self._model_nb_to_model(data[0]) for id_, data in data_list.items()}
|
||||
|
||||
@@ -242,6 +242,7 @@ class MotorsBus(abc.ABC):
|
||||
model_ctrl_table: dict[str, dict]
|
||||
model_resolution_table: dict[str, int]
|
||||
model_baudrate_table: dict[str, dict]
|
||||
model_number_table: dict[str, int]
|
||||
calibration_required: list[str]
|
||||
default_timeout: int
|
||||
|
||||
@@ -265,6 +266,7 @@ class MotorsBus(abc.ABC):
|
||||
|
||||
self._id_to_model_dict = {m.id: m.model for m in self.motors.values()}
|
||||
self._id_to_name_dict = {m.id: name for name, m in self.motors.items()}
|
||||
self._model_nb_to_model_dict = {v: k for k, v in self.model_number_table.items()}
|
||||
|
||||
def __len__(self):
|
||||
return len(self.motors)
|
||||
@@ -297,6 +299,9 @@ class MotorsBus(abc.ABC):
|
||||
def ids(self) -> list[int]:
|
||||
return [m.id for m in self.motors.values()]
|
||||
|
||||
def _model_nb_to_model(self, motor_nb: int) -> str:
|
||||
return self._model_nb_to_model_dict[motor_nb]
|
||||
|
||||
def _id_to_model(self, motor_id: int) -> str:
|
||||
return self._id_to_model_dict[motor_id]
|
||||
|
||||
@@ -436,21 +441,33 @@ class MotorsBus(abc.ABC):
|
||||
"""
|
||||
pass
|
||||
|
||||
def ping(self, motor: NameOrID, num_retry: int = 0, raise_on_error: bool = False) -> int | None:
|
||||
def ping(self, motor: NameOrID, num_retry: int = 0, raise_on_error: bool = False) -> str | None:
|
||||
idx = self._get_motor_id(motor)
|
||||
for n_try in range(1 + num_retry):
|
||||
model_number, comm, error = self.packet_handler.ping(self.port_handler, idx)
|
||||
if self._is_comm_success(comm):
|
||||
return model_number
|
||||
break
|
||||
logger.debug(f"ping failed for {idx=}: {n_try=} got {comm=} {error=}")
|
||||
|
||||
if raise_on_error:
|
||||
raise ConnectionError(f"Ping motor {motor} returned a {error} error code.")
|
||||
if not self._is_comm_success(comm):
|
||||
if raise_on_error:
|
||||
logger.error(self.packet_handler.getRxPacketError(comm))
|
||||
raise ConnectionError
|
||||
else:
|
||||
return
|
||||
if self._is_error(error):
|
||||
if raise_on_error:
|
||||
logger.error(self.packet_handler.getTxRxResult(comm))
|
||||
raise ConnectionError
|
||||
else:
|
||||
return
|
||||
|
||||
return self._model_nb_to_model(model_number)
|
||||
|
||||
@abc.abstractmethod
|
||||
def broadcast_ping(
|
||||
self, num_retry: int = 0, raise_on_error: bool = False
|
||||
) -> dict[int, list[int, int]] | None:
|
||||
) -> dict[int, list[int, str]] | None:
|
||||
pass
|
||||
|
||||
@overload
|
||||
|
||||
Reference in New Issue
Block a user