Refactor MockMotors, add return values

This commit is contained in:
Simon Alibert
2025-03-20 09:40:58 +01:00
parent c1c71fb994
commit 2c1bb766ff
2 changed files with 86 additions and 58 deletions

View File

@@ -288,59 +288,72 @@ class MockPortHandler(dxl.PortHandler):
class MockMotors(MockSerial):
"""
This class will simulate physical motors by responding with valid status packets upon receiving some
instruction packets. It is meant to test MotorsBus classes.
'data_name' supported:
- Present_Position
"""
ctrl_table = X_SERIES_CONTROL_TABLE
def __init__(self, dlx_ids: list[int], default_stubs: bool = True):
def __init__(self, dlx_ids: list[int]):
super().__init__()
self._ids = dlx_ids
self.open()
if default_stubs:
self._create_stubs("Present_Position")
def _create_stubs(self, data_name: str):
def build_single_motor_stubs(
self, data_name: str, return_value: int | None = None, num_invalid_try: int | None = None
) -> None:
address, length = self.ctrl_table[data_name]
# sync read all motors
sync_read_request_all = MockInstructionPacket.sync_read(self._ids, address, length)
sync_read_response_all = self._create_present_pos_send_fn(self._ids, data_name)
self.stub(
name=f"SyncRead_{data_name}_all",
receive_bytes=sync_read_request_all,
send_fn=sync_read_response_all,
)
# sync read single motors
for idx in self._ids:
sync_read_request_single = MockInstructionPacket.sync_read([idx], address, length)
sync_read_response_single = self._create_present_pos_send_fn([idx], data_name)
if data_name == "Present_Position":
sync_read_request_single = MockInstructionPacket.sync_read([idx], address, length)
sync_read_response_single = self._build_present_pos_send_fn(
[idx], [return_value], num_invalid_try
)
else:
raise NotImplementedError # TODO(aliberts): add ping?
self.stub(
name=f"SyncRead_{data_name}_{idx}",
receive_bytes=sync_read_request_single,
send_fn=sync_read_response_single,
)
def _create_present_pos_send_fn(
self, dxl_ids: list[int], data_name: str, num_invalid_try: int | None = None
def build_all_motors_stub(
self, data_name: str, return_values: list[int] | None = None, num_invalid_try: int | None = None
) -> None:
address, length = self.ctrl_table[data_name]
if data_name == "Present_Position":
sync_read_request_all = MockInstructionPacket.sync_read(self._ids, address, length)
sync_read_response_all = self._build_present_pos_send_fn(
self._ids, return_values, num_invalid_try
)
else:
raise NotImplementedError # TODO(aliberts): add ping?
self.stub(
name=f"SyncRead_{data_name}_all",
receive_bytes=sync_read_request_all,
send_fn=sync_read_response_all,
)
def _build_present_pos_send_fn(
self, dxl_ids: list[int], return_pos: list[int] | None = None, num_invalid_try: int | None = None
) -> Callable[[int], bytes]:
# if data_name == "Present_Position":
# packet_generator = MockStatusPacket.present_position
# else:
# # TODO(aliberts): add "Goal_Position"
# raise NotImplementedError
return_pos = [None for _ in dxl_ids] if return_pos is None else return_pos
assert len(return_pos) == len(dxl_ids)
def send_fn(_call_count: int) -> bytes:
if num_invalid_try is not None and num_invalid_try >= _call_count:
return bytes(0)
first_packet = MockStatusPacket.present_position(next(iter(dxl_ids)))
if len(dxl_ids) == 1:
return first_packet
packets = first_packet
for idx in dxl_ids:
packets += MockStatusPacket.present_position(dxl_id=idx)
return b""
packets = b"".join(
MockStatusPacket.present_position(idx, pos)
for idx, pos in zip(dxl_ids, return_pos, strict=True)
)
return packets
return send_fn