(nit) move write

This commit is contained in:
Simon Alibert
2025-04-10 00:51:23 +02:00
parent 443fed216c
commit 4005065223
3 changed files with 107 additions and 107 deletions

View File

@@ -784,6 +784,50 @@ class MotorsBus(abc.ABC):
return value, comm, error
def write(
self, data_name: str, motor: str, value: Value, *, normalize: bool = True, num_retry: int = 0
) -> None:
if not self.is_connected:
raise DeviceNotConnectedError(
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
)
id_ = self.motors[motor].id
model = self.motors[motor].model
addr, n_bytes = get_address(self.model_ctrl_table, model, data_name)
if normalize and data_name in self.normalized_data:
value = self._unnormalize(data_name, {id_: value})[id_]
value = self._encode_sign(data_name, {id_: value})[id_]
comm, error = self._write(addr, n_bytes, id_, value, num_retry=num_retry)
if not self._is_comm_success(comm):
raise ConnectionError(
f"Failed to write '{data_name}' on {id_=} with '{value}' after {num_retry + 1} tries."
f"\n{self.packet_handler.getTxRxResult(comm)}"
)
elif self._is_error(error):
raise RuntimeError(
f"Failed to write '{data_name}' on {id_=} with '{value}' after {num_retry + 1} tries."
f"\n{self.packet_handler.getRxPacketError(error)}"
)
def _write(
self, addr: int, n_bytes: int, motor_id: int, value: int, num_retry: int = 0
) -> tuple[int, int]:
data = self._split_int_to_bytes(value, n_bytes)
for n_try in range(1 + num_retry):
comm, error = self.packet_handler.writeTxRx(self.port_handler, motor_id, addr, n_bytes, data)
if self._is_comm_success(comm):
break
logger.debug(
f"Failed to sync write @{addr=} ({n_bytes=}) on id={motor_id} with {value=} ({n_try=}): "
+ self.packet_handler.getTxRxResult(comm)
)
return comm, error
def sync_read(
self,
data_name: str,
@@ -914,50 +958,6 @@ class MotorsBus(abc.ABC):
data = self._split_int_to_bytes(value, n_bytes)
self.sync_writer.addParam(id_, data)
def write(
self, data_name: str, motor: str, value: Value, *, normalize: bool = True, num_retry: int = 0
) -> None:
if not self.is_connected:
raise DeviceNotConnectedError(
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
)
id_ = self.motors[motor].id
model = self.motors[motor].model
addr, n_bytes = get_address(self.model_ctrl_table, model, data_name)
if normalize and data_name in self.normalized_data:
value = self._unnormalize(data_name, {id_: value})[id_]
value = self._encode_sign(data_name, {id_: value})[id_]
comm, error = self._write(addr, n_bytes, id_, value, num_retry=num_retry)
if not self._is_comm_success(comm):
raise ConnectionError(
f"Failed to write '{data_name}' on {id_=} with '{value}' after {num_retry + 1} tries."
f"\n{self.packet_handler.getTxRxResult(comm)}"
)
elif self._is_error(error):
raise RuntimeError(
f"Failed to write '{data_name}' on {id_=} with '{value}' after {num_retry + 1} tries."
f"\n{self.packet_handler.getRxPacketError(error)}"
)
def _write(
self, addr: int, n_bytes: int, motor_id: int, value: int, num_retry: int = 0
) -> tuple[int, int]:
data = self._split_int_to_bytes(value, n_bytes)
for n_try in range(1 + num_retry):
comm, error = self.packet_handler.writeTxRx(self.port_handler, motor_id, addr, n_bytes, data)
if self._is_comm_success(comm):
break
logger.debug(
f"Failed to sync write @{addr=} ({n_bytes=}) on id={motor_id} with {value=} ({n_try=}): "
+ self.packet_handler.getTxRxResult(comm)
)
return comm, error
def disconnect(self, disable_torque: bool = True) -> None:
if not self.is_connected:
raise DeviceNotConnectedError(