Improve feetech mocking

This commit is contained in:
Simon Alibert
2025-03-22 01:19:51 +01:00
parent fc4a95f187
commit 857f335be9
2 changed files with 179 additions and 99 deletions

View File

@@ -77,6 +77,19 @@ class MockInstructionPacket(MockFeetechPacket):
0x00, # placeholder for checksum
] # fmt: skip
@classmethod
def ping(
cls,
scs_id: int,
) -> bytes:
"""
Builds a "Ping" broadcast instruction.
No parameters required.
"""
params, length = [], 2
return cls.build(scs_id=scs_id, params=params, length=length, instruct_type="Ping")
@classmethod
def sync_read(
cls,
@@ -128,6 +141,25 @@ class MockStatusPacket(MockFeetechPacket):
0x00, # placeholder for checksum
] # fmt: skip
@classmethod
def ping(cls, scs_id: int, model_nb: int = 1190, firm_ver: int = 50) -> bytes:
"""Builds a 'Ping' status packet.
Args:
scs_id (int): ID of the servo responding.
model_nb (int, optional): Desired 'model number' to be returned in the packet. Defaults to 1190
which corresponds to a XL330-M077-T.
firm_ver (int, optional): Desired 'firmware version' to be returned in the packet.
Defaults to 50.
Returns:
bytes: The raw 'Ping' status packet ready to be sent through serial.
"""
# raise NotImplementedError
params = [scs.SCS_LOBYTE(model_nb), scs.SCS_HIBYTE(model_nb), firm_ver]
length = 2
return cls.build(scs_id, params=params, length=length)
@classmethod
def present_position(cls, scs_id: int, pos: int | None = None, min_max_range: tuple = (0, 4095)) -> bytes:
"""Builds a 'Present_Position' status packet.
@@ -184,62 +216,69 @@ class MockMotors(MockSerial):
ctrl_table = SCS_SERIES_CONTROL_TABLE
def __init__(self, scs_ids: list[int]):
def __init__(self):
super().__init__()
self._ids = scs_ids
self.open()
def build_single_motor_stubs(
self, data_name: str, return_value: int | None = None, num_invalid_try: int | None = None
) -> None:
address, length = self.ctrl_table[data_name]
for idx in self._ids:
if data_name == "Present_Position":
sync_read_request_single = MockInstructionPacket.sync_read([idx], address, length)
sync_read_response_single = self._build_present_pos_send_fn(
[idx], [return_value], num_invalid_try
)
else:
raise NotImplementedError # TODO(aliberts): add ping?
self.stub(
name=f"SyncRead_{data_name}_{idx}",
receive_bytes=sync_read_request_single,
send_fn=sync_read_response_single,
)
def build_all_motors_stub(
self, data_name: str, return_values: list[int] | None = None, num_invalid_try: int | None = None
) -> None:
address, length = self.ctrl_table[data_name]
if data_name == "Present_Position":
sync_read_request_all = MockInstructionPacket.sync_read(self._ids, address, length)
sync_read_response_all = self._build_present_pos_send_fn(
self._ids, return_values, num_invalid_try
)
else:
raise NotImplementedError # TODO(aliberts): add ping?
self.stub(
name=f"SyncRead_{data_name}_all",
receive_bytes=sync_read_request_all,
send_fn=sync_read_response_all,
def build_broadcast_ping_stub(
self, ids_models_firmwares: dict[int, list[int]] | None = None, num_invalid_try: int = 0
) -> str:
ping_request = MockInstructionPacket.ping(scs.BROADCAST_ID)
return_packets = b"".join(
MockStatusPacket.ping(idx, model, firm_ver)
for idx, (model, firm_ver) in ids_models_firmwares.items()
)
ping_response = self._build_send_fn(return_packets, num_invalid_try)
def _build_present_pos_send_fn(
self, scs_ids: list[int], return_pos: list[int] | None = None, num_invalid_try: int | None = None
) -> Callable[[int], bytes]:
return_pos = [None for _ in scs_ids] if return_pos is None else return_pos
assert len(return_pos) == len(scs_ids)
stub_name = "Ping_" + "_".join([str(idx) for idx in ids_models_firmwares])
self.stub(
name=stub_name,
receive_bytes=ping_request,
send_fn=ping_response,
)
return stub_name
def build_ping_stub(
self, scs_id: int, model_nb: int, firm_ver: int = 50, num_invalid_try: int = 0
) -> str:
ping_request = MockInstructionPacket.ping(scs_id)
return_packet = MockStatusPacket.ping(scs_id, model_nb, firm_ver)
ping_response = self._build_send_fn(return_packet, num_invalid_try)
stub_name = f"Ping_{scs_id}"
self.stub(
name=stub_name,
receive_bytes=ping_request,
send_fn=ping_response,
)
return stub_name
def build_sync_read_stub(
self, data_name: str, ids_values: dict[int, int] | None = None, num_invalid_try: int = 0
) -> str:
address, length = self.ctrl_table[data_name]
sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length)
if data_name != "Present_Position":
raise NotImplementedError
return_packets = b"".join(
MockStatusPacket.present_position(idx, pos) for idx, pos in ids_values.items()
)
sync_read_response = self._build_send_fn(return_packets, num_invalid_try)
stub_name = f"Sync_Read_{data_name}_" + "_".join([str(idx) for idx in ids_values])
self.stub(
name=stub_name,
receive_bytes=sync_read_request,
send_fn=sync_read_response,
)
return stub_name
@staticmethod
def _build_send_fn(packet: bytes, num_invalid_try: int = 0) -> Callable[[int], bytes]:
def send_fn(_call_count: int) -> bytes:
if num_invalid_try is not None and num_invalid_try >= _call_count:
if num_invalid_try >= _call_count:
return b""
packets = b"".join(
MockStatusPacket.present_position(idx, pos)
for idx, pos in zip(scs_ids, return_pos, strict=True)
)
return packets
return packet
return send_fn