Add more segmented tests (base motor bus & feetech), add feetech protocol 1 support

This commit is contained in:
Simon Alibert
2025-04-14 11:56:53 +02:00
parent e0b292ab51
commit bdbca09cb2
6 changed files with 749 additions and 350 deletions

View File

@@ -10,27 +10,6 @@ from lerobot.common.motors.feetech.feetech import _split_into_byte_chunks, patch
from .mock_serial_patch import WaitableStub
# https://files.waveshare.com/upload/2/27/Communication_Protocol_User_Manual-EN%28191218-0923%29.pdf
INSTRUCTION_TYPES = {
"Read": scs.INST_PING, # Read data from the Device
"Ping": scs.INST_READ, # Checks whether the Packet has arrived at a device with the same ID as the specified packet ID
"Write": scs.INST_WRITE, # Write data to the Device
"Reg_Write": scs.INST_REG_WRITE, # Register the Instruction Packet in standby status; Packet can later be executed using the Action command
"Action": scs.INST_ACTION, # Executes a Packet that was registered beforehand using Reg Write
"Factory_Reset": 0x06, # Resets the Control Table to its initial factory default settings
"Sync_Write": scs.INST_SYNC_WRITE, # Write data to multiple devices with the same Address with the same length at once
"Sync_Read": scs.INST_SYNC_READ, # Read data from multiple devices with the same Address with the same length at once
} # fmt: skip
ERROR_TYPE = {
"Success": 0x00,
"Voltage": scs.ERRBIT_VOLTAGE,
"Angle": scs.ERRBIT_ANGLE,
"Overheat": scs.ERRBIT_OVERHEAT,
"Overele": scs.ERRBIT_OVERELE,
"Overload": scs.ERRBIT_OVERLOAD,
}
class MockFeetechPacket(abc.ABC):
@classmethod
@@ -68,15 +47,14 @@ class MockInstructionPacket(MockFeetechPacket):
"""
@classmethod
def _build(cls, scs_id: int, params: list[int], length: int, instruct_type: str) -> list[int]:
instruct_value = INSTRUCTION_TYPES[instruct_type]
def _build(cls, scs_id: int, params: list[int], length: int, instruction: int) -> list[int]:
return [
0xFF, 0xFF, # header
scs_id, # servo id
length, # length
instruct_value, # instruction type
*params, # data bytes
0x00, # placeholder for checksum
0xFF, 0xFF, # header
scs_id, # servo id
length, # length
instruction, # instruction type
*params, # data bytes
0x00, # placeholder for checksum
] # fmt: skip
@classmethod
@@ -89,7 +67,7 @@ class MockInstructionPacket(MockFeetechPacket):
No parameters required.
"""
return cls.build(scs_id=scs_id, params=[], length=2, instruct_type="Ping")
return cls.build(scs_id=scs_id, params=[], length=2, instruction=scs.INST_PING)
@classmethod
def read(
@@ -113,7 +91,7 @@ class MockInstructionPacket(MockFeetechPacket):
"""
params = [start_address, data_length]
length = 4
return cls.build(scs_id=scs_id, params=params, length=length, instruct_type="Read")
return cls.build(scs_id=scs_id, params=params, length=length, instruction=scs.INST_READ)
@classmethod
def write(
@@ -142,7 +120,7 @@ class MockInstructionPacket(MockFeetechPacket):
data = _split_into_byte_chunks(value, data_length)
params = [start_address, *data]
length = data_length + 3
return cls.build(scs_id=scs_id, params=params, length=length, instruct_type="Write")
return cls.build(scs_id=scs_id, params=params, length=length, instruction=scs.INST_WRITE)
@classmethod
def sync_read(
@@ -167,7 +145,9 @@ class MockInstructionPacket(MockFeetechPacket):
"""
params = [start_address, data_length, *scs_ids]
length = len(scs_ids) + 4
return cls.build(scs_id=scs.BROADCAST_ID, params=params, length=length, instruct_type="Sync_Read")
return cls.build(
scs_id=scs.BROADCAST_ID, params=params, length=length, instruction=scs.INST_SYNC_READ
)
@classmethod
def sync_write(
@@ -205,7 +185,9 @@ class MockInstructionPacket(MockFeetechPacket):
data += [id_, *split_value]
params = [start_address, data_length, *data]
length = len(ids_values) * (1 + data_length) + 4
return cls.build(scs_id=scs.BROADCAST_ID, params=params, length=length, instruct_type="Sync_Write")
return cls.build(
scs_id=scs.BROADCAST_ID, params=params, length=length, instruction=scs.INST_SYNC_WRITE
)
class MockStatusPacket(MockFeetechPacket):
@@ -222,19 +204,18 @@ class MockStatusPacket(MockFeetechPacket):
"""
@classmethod
def _build(cls, scs_id: int, params: list[int], length: int, error: str = "Success") -> list[int]:
err_byte = ERROR_TYPE[error]
def _build(cls, scs_id: int, params: list[int], length: int, error: int = 0) -> list[int]:
return [
0xFF, 0xFF, # header
scs_id, # servo id
length, # length
err_byte, # status
error, # status
*params, # data bytes
0x00, # placeholder for checksum
] # fmt: skip
@classmethod
def ping(cls, scs_id: int, error: str = "Success") -> bytes:
def ping(cls, scs_id: int, error: int = 0) -> bytes:
"""Builds a 'Ping' status packet.
Args:
@@ -247,7 +228,7 @@ class MockStatusPacket(MockFeetechPacket):
return cls.build(scs_id, params=[], length=2, error=error)
@classmethod
def read(cls, scs_id: int, value: int, param_length: int) -> bytes:
def read(cls, scs_id: int, value: int, param_length: int, error: int = 0) -> bytes:
"""Builds a 'Read' status packet.
Args:
@@ -260,7 +241,7 @@ class MockStatusPacket(MockFeetechPacket):
"""
params = _split_into_byte_chunks(value, param_length)
length = param_length + 2
return cls.build(scs_id, params=params, length=length)
return cls.build(scs_id, params=params, length=length, error=error)
class MockPortHandler(scs.PortHandler):
@@ -323,11 +304,11 @@ class MockMotors(MockSerial):
)
return stub_name
def build_ping_stub(self, scs_id: int, num_invalid_try: int = 0) -> str:
def build_ping_stub(self, scs_id: int, num_invalid_try: int = 0, error: int = 0) -> str:
ping_request = MockInstructionPacket.ping(scs_id)
return_packet = MockStatusPacket.ping(scs_id)
return_packet = MockStatusPacket.ping(scs_id, error)
ping_response = self._build_send_fn(return_packet, num_invalid_try)
stub_name = f"Ping_{scs_id}"
stub_name = f"Ping_{scs_id}_{error}"
self.stub(
name=stub_name,
receive_bytes=ping_request,
@@ -336,13 +317,19 @@ class MockMotors(MockSerial):
return stub_name
def build_read_stub(
self, data_name: str, scs_id: int, value: int | None = None, num_invalid_try: int = 0
self,
address: int,
length: int,
scs_id: int,
value: int,
reply: bool = True,
error: int = 0,
num_invalid_try: int = 0,
) -> str:
address, length = self.ctrl_table[data_name]
read_request = MockInstructionPacket.read(scs_id, address, length)
return_packet = MockStatusPacket.read(scs_id, value, length)
return_packet = MockStatusPacket.read(scs_id, value, length, error) if reply else b""
read_response = self._build_send_fn(return_packet, num_invalid_try)
stub_name = f"Read_{data_name}_{scs_id}"
stub_name = f"Read_{address}_{length}_{scs_id}_{value}_{error}"
self.stub(
name=stub_name,
receive_bytes=read_request,
@@ -350,15 +337,42 @@ class MockMotors(MockSerial):
)
return stub_name
def build_sync_read_stub(
self, data_name: str, ids_values: dict[int, int] | None = None, num_invalid_try: int = 0
def build_write_stub(
self,
address: int,
length: int,
scs_id: int,
value: int,
reply: bool = True,
error: int = 0,
num_invalid_try: int = 0,
) -> str:
address, length = self.ctrl_table[data_name]
sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length)
return_packets = b"".join(MockStatusPacket.read(id_, pos, length) for id_, pos in ids_values.items())
sync_read_request = MockInstructionPacket.write(scs_id, value, address, length)
return_packet = MockStatusPacket.build(scs_id, params=[], length=2, error=error) if reply else b""
stub_name = f"Write_{address}_{length}_{scs_id}"
self.stub(
name=stub_name,
receive_bytes=sync_read_request,
send_fn=self._build_send_fn(return_packet, num_invalid_try),
)
return stub_name
def build_sync_read_stub(
self,
address: int,
length: int,
ids_values: dict[int, int],
reply: bool = True,
num_invalid_try: int = 0,
) -> str:
sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length)
return_packets = (
b"".join(MockStatusPacket.read(id_, pos, length) for id_, pos in ids_values.items())
if reply
else b""
)
sync_read_response = self._build_send_fn(return_packets, num_invalid_try)
stub_name = f"Sync_Read_{data_name}_" + "_".join([str(id_) for id_ in ids_values])
stub_name = f"Sync_Read_{address}_{length}_" + "_".join([str(id_) for id_ in ids_values])
self.stub(
name=stub_name,
receive_bytes=sync_read_request,
@@ -367,11 +381,10 @@ class MockMotors(MockSerial):
return stub_name
def build_sequential_sync_read_stub(
self, data_name: str, ids_values: dict[int, list[int]] | None = None
self, address: int, length: int, ids_values: dict[int, list[int]] | None = None
) -> str:
sequence_length = len(next(iter(ids_values.values())))
assert all(len(positions) == sequence_length for positions in ids_values.values())
address, length = self.ctrl_table[data_name]
sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length)
sequential_packets = []
for count in range(sequence_length):
@@ -381,7 +394,7 @@ class MockMotors(MockSerial):
sequential_packets.append(return_packets)
sync_read_response = self._build_sequential_send_fn(sequential_packets)
stub_name = f"Seq_Sync_Read_{data_name}_" + "_".join([str(id_) for id_ in ids_values])
stub_name = f"Seq_Sync_Read_{address}_{length}_" + "_".join([str(id_) for id_ in ids_values])
self.stub(
name=stub_name,
receive_bytes=sync_read_request,
@@ -390,11 +403,10 @@ class MockMotors(MockSerial):
return stub_name
def build_sync_write_stub(
self, data_name: str, ids_values: dict[int, int] | None = None, num_invalid_try: int = 0
self, address: int, length: int, ids_values: dict[int, int], num_invalid_try: int = 0
) -> str:
address, length = self.ctrl_table[data_name]
sync_read_request = MockInstructionPacket.sync_write(ids_values, address, length)
stub_name = f"Sync_Write_{data_name}_" + "_".join([str(id_) for id_ in ids_values])
stub_name = f"Sync_Write_{address}_{length}_" + "_".join([str(id_) for id_ in ids_values])
self.stub(
name=stub_name,
receive_bytes=sync_read_request,
@@ -402,20 +414,6 @@ class MockMotors(MockSerial):
)
return stub_name
def build_write_stub(
self, data_name: str, scs_id: int, value: int, error: str = "Success", num_invalid_try: int = 0
) -> str:
address, length = self.ctrl_table[data_name]
sync_read_request = MockInstructionPacket.write(scs_id, value, address, length)
return_packet = MockStatusPacket.build(scs_id, params=[], length=2, error=error)
stub_name = f"Write_{data_name}_{scs_id}"
self.stub(
name=stub_name,
receive_bytes=sync_read_request,
send_fn=self._build_send_fn(return_packet, num_invalid_try),
)
return stub_name
@staticmethod
def _build_send_fn(packet: bytes, num_invalid_try: int = 0) -> Callable[[int], bytes]:
def send_fn(_call_count: int) -> bytes: