Add calibration tests
This commit is contained in:
@@ -493,6 +493,37 @@ class MockMotors(MockSerial):
|
||||
)
|
||||
return stub_name
|
||||
|
||||
def build_sequential_sync_read_stub(
|
||||
self, data_name: str, ids_values: dict[int, list[int]] | None = None
|
||||
) -> str:
|
||||
"""
|
||||
'data_name' supported:
|
||||
- Present_Position
|
||||
"""
|
||||
sequence_length = len(next(iter(ids_values.values())))
|
||||
assert all(len(positions) == sequence_length for positions in ids_values.values())
|
||||
if data_name != "Present_Position":
|
||||
raise NotImplementedError
|
||||
|
||||
address, length = self.ctrl_table[data_name]
|
||||
sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length)
|
||||
sequential_packets = []
|
||||
for count in range(sequence_length):
|
||||
return_packets = b"".join(
|
||||
MockStatusPacket.present_position(id_, positions[count])
|
||||
for id_, positions in ids_values.items()
|
||||
)
|
||||
sequential_packets.append(return_packets)
|
||||
|
||||
sync_read_response = self._build_sequential_send_fn(sequential_packets)
|
||||
stub_name = f"Seq_Sync_Read_{data_name}_" + "_".join([str(id_) for id_ in ids_values])
|
||||
self.stub(
|
||||
name=stub_name,
|
||||
receive_bytes=sync_read_request,
|
||||
send_fn=sync_read_response,
|
||||
)
|
||||
return stub_name
|
||||
|
||||
def build_sync_write_stub(
|
||||
self, data_name: str, ids_values: dict[int, int] | None = None, num_invalid_try: int = 0
|
||||
) -> str:
|
||||
@@ -528,3 +559,10 @@ class MockMotors(MockSerial):
|
||||
return packet
|
||||
|
||||
return send_fn
|
||||
|
||||
@staticmethod
|
||||
def _build_sequential_send_fn(packets: list[bytes]) -> Callable[[int], bytes]:
|
||||
def send_fn(_call_count: int) -> bytes:
|
||||
return packets[_call_count - 1]
|
||||
|
||||
return send_fn
|
||||
|
||||
@@ -404,6 +404,37 @@ class MockMotors(MockSerial):
|
||||
)
|
||||
return stub_name
|
||||
|
||||
def build_sequential_sync_read_stub(
|
||||
self, data_name: str, ids_values: dict[int, list[int]] | None = None
|
||||
) -> str:
|
||||
"""
|
||||
'data_name' supported:
|
||||
- Present_Position
|
||||
"""
|
||||
sequence_length = len(next(iter(ids_values.values())))
|
||||
assert all(len(positions) == sequence_length for positions in ids_values.values())
|
||||
if data_name != "Present_Position":
|
||||
raise NotImplementedError
|
||||
|
||||
address, length = self.ctrl_table[data_name]
|
||||
sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length)
|
||||
sequential_packets = []
|
||||
for count in range(sequence_length):
|
||||
return_packets = b"".join(
|
||||
MockStatusPacket.present_position(id_, positions[count])
|
||||
for id_, positions in ids_values.items()
|
||||
)
|
||||
sequential_packets.append(return_packets)
|
||||
|
||||
sync_read_response = self._build_sequential_send_fn(sequential_packets)
|
||||
stub_name = f"Seq_Sync_Read_{data_name}_" + "_".join([str(id_) for id_ in ids_values])
|
||||
self.stub(
|
||||
name=stub_name,
|
||||
receive_bytes=sync_read_request,
|
||||
send_fn=sync_read_response,
|
||||
)
|
||||
return stub_name
|
||||
|
||||
def build_sync_write_stub(
|
||||
self, data_name: str, ids_values: dict[int, int] | None = None, num_invalid_try: int = 0
|
||||
) -> str:
|
||||
@@ -439,3 +470,10 @@ class MockMotors(MockSerial):
|
||||
return packet
|
||||
|
||||
return send_fn
|
||||
|
||||
@staticmethod
|
||||
def _build_sequential_send_fn(packets: list[bytes]) -> Callable[[int], bytes]:
|
||||
def send_fn(_call_count: int) -> bytes:
|
||||
return packets[_call_count - 1]
|
||||
|
||||
return send_fn
|
||||
|
||||
Reference in New Issue
Block a user