Rename idx -> id_
This commit is contained in:
@@ -281,9 +281,9 @@ class MockInstructionPacket(MockDynamixelPacketv2):
|
|||||||
+2 is for the CRC at the end.
|
+2 is for the CRC at the end.
|
||||||
"""
|
"""
|
||||||
data = []
|
data = []
|
||||||
for idx, value in ids_values.items():
|
for id_, value in ids_values.items():
|
||||||
split_value = DynamixelMotorsBus._split_int_to_bytes(value, data_length)
|
split_value = DynamixelMotorsBus._split_int_to_bytes(value, data_length)
|
||||||
data += [idx, *split_value]
|
data += [id_, *split_value]
|
||||||
params = [
|
params = [
|
||||||
dxl.DXL_LOBYTE(start_address),
|
dxl.DXL_LOBYTE(start_address),
|
||||||
dxl.DXL_HIBYTE(start_address),
|
dxl.DXL_HIBYTE(start_address),
|
||||||
@@ -444,10 +444,10 @@ class MockMotors(MockSerial):
|
|||||||
self, ids_models: dict[int, list[int]] | None = None, num_invalid_try: int = 0
|
self, ids_models: dict[int, list[int]] | None = None, num_invalid_try: int = 0
|
||||||
) -> str:
|
) -> str:
|
||||||
ping_request = MockInstructionPacket.ping(dxl.BROADCAST_ID)
|
ping_request = MockInstructionPacket.ping(dxl.BROADCAST_ID)
|
||||||
return_packets = b"".join(MockStatusPacket.ping(idx, model) for idx, model in ids_models.items())
|
return_packets = b"".join(MockStatusPacket.ping(id_, model) for id_, model in ids_models.items())
|
||||||
ping_response = self._build_send_fn(return_packets, num_invalid_try)
|
ping_response = self._build_send_fn(return_packets, num_invalid_try)
|
||||||
|
|
||||||
stub_name = "Ping_" + "_".join([str(idx) for idx in ids_models])
|
stub_name = "Ping_" + "_".join([str(id_) for id_ in ids_models])
|
||||||
self.stub(
|
self.stub(
|
||||||
name=stub_name,
|
name=stub_name,
|
||||||
receive_bytes=ping_request,
|
receive_bytes=ping_request,
|
||||||
@@ -482,10 +482,10 @@ class MockMotors(MockSerial):
|
|||||||
address, length = self.ctrl_table[data_name]
|
address, length = self.ctrl_table[data_name]
|
||||||
sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length)
|
sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length)
|
||||||
return_packets = b"".join(
|
return_packets = b"".join(
|
||||||
MockStatusPacket.present_position(idx, pos) for idx, pos in ids_values.items()
|
MockStatusPacket.present_position(id_, pos) for id_, pos in ids_values.items()
|
||||||
)
|
)
|
||||||
sync_read_response = self._build_send_fn(return_packets, num_invalid_try)
|
sync_read_response = self._build_send_fn(return_packets, num_invalid_try)
|
||||||
stub_name = f"Sync_Read_{data_name}_" + "_".join([str(idx) for idx in ids_values])
|
stub_name = f"Sync_Read_{data_name}_" + "_".join([str(id_) for id_ in ids_values])
|
||||||
self.stub(
|
self.stub(
|
||||||
name=stub_name,
|
name=stub_name,
|
||||||
receive_bytes=sync_read_request,
|
receive_bytes=sync_read_request,
|
||||||
@@ -498,7 +498,7 @@ class MockMotors(MockSerial):
|
|||||||
) -> str:
|
) -> str:
|
||||||
address, length = self.ctrl_table[data_name]
|
address, length = self.ctrl_table[data_name]
|
||||||
sync_read_request = MockInstructionPacket.sync_write(ids_values, address, length)
|
sync_read_request = MockInstructionPacket.sync_write(ids_values, address, length)
|
||||||
stub_name = f"Sync_Write_{data_name}_" + "_".join([str(idx) for idx in ids_values])
|
stub_name = f"Sync_Write_{data_name}_" + "_".join([str(id_) for id_ in ids_values])
|
||||||
self.stub(
|
self.stub(
|
||||||
name=stub_name,
|
name=stub_name,
|
||||||
receive_bytes=sync_read_request,
|
receive_bytes=sync_read_request,
|
||||||
|
|||||||
@@ -46,8 +46,8 @@ class MockFeetechPacket(abc.ABC):
|
|||||||
@staticmethod
|
@staticmethod
|
||||||
def _add_checksum(packet: list[int]) -> list[int]:
|
def _add_checksum(packet: list[int]) -> list[int]:
|
||||||
checksum = 0
|
checksum = 0
|
||||||
for idx in range(2, len(packet) - 1): # except header & checksum
|
for id_ in range(2, len(packet) - 1): # except header & checksum
|
||||||
checksum += packet[idx]
|
checksum += packet[id_]
|
||||||
|
|
||||||
packet[-1] = scs.SCS_LOBYTE(~checksum)
|
packet[-1] = scs.SCS_LOBYTE(~checksum)
|
||||||
|
|
||||||
@@ -171,9 +171,9 @@ class MockInstructionPacket(MockFeetechPacket):
|
|||||||
+1 is for the checksum at the end.
|
+1 is for the checksum at the end.
|
||||||
"""
|
"""
|
||||||
data = []
|
data = []
|
||||||
for idx, value in ids_values.items():
|
for id_, value in ids_values.items():
|
||||||
split_value = FeetechMotorsBus._split_int_to_bytes(value, data_length)
|
split_value = FeetechMotorsBus._split_int_to_bytes(value, data_length)
|
||||||
data += [idx, *split_value]
|
data += [id_, *split_value]
|
||||||
params = [start_address, data_length, *data]
|
params = [start_address, data_length, *data]
|
||||||
length = len(ids_values) * (1 + data_length) + 4
|
length = len(ids_values) * (1 + data_length) + 4
|
||||||
return cls.build(scs_id=scs.BROADCAST_ID, params=params, length=length, instruct_type="Sync_Write")
|
return cls.build(scs_id=scs.BROADCAST_ID, params=params, length=length, instruct_type="Sync_Write")
|
||||||
@@ -331,9 +331,9 @@ class MockMotors(MockSerial):
|
|||||||
|
|
||||||
def build_broadcast_ping_stub(self, ids: list[int] | None = None, num_invalid_try: int = 0) -> str:
|
def build_broadcast_ping_stub(self, ids: list[int] | None = None, num_invalid_try: int = 0) -> str:
|
||||||
ping_request = MockInstructionPacket.ping(scs.BROADCAST_ID)
|
ping_request = MockInstructionPacket.ping(scs.BROADCAST_ID)
|
||||||
return_packets = b"".join(MockStatusPacket.ping(idx) for idx in ids)
|
return_packets = b"".join(MockStatusPacket.ping(id_) for id_ in ids)
|
||||||
ping_response = self._build_send_fn(return_packets, num_invalid_try)
|
ping_response = self._build_send_fn(return_packets, num_invalid_try)
|
||||||
stub_name = "Ping_" + "_".join([str(idx) for idx in ids])
|
stub_name = "Ping_" + "_".join([str(id_) for id_ in ids])
|
||||||
self.stub(
|
self.stub(
|
||||||
name=stub_name,
|
name=stub_name,
|
||||||
receive_bytes=ping_request,
|
receive_bytes=ping_request,
|
||||||
@@ -386,17 +386,17 @@ class MockMotors(MockSerial):
|
|||||||
sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length)
|
sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length)
|
||||||
if data_name == "Present_Position":
|
if data_name == "Present_Position":
|
||||||
return_packets = b"".join(
|
return_packets = b"".join(
|
||||||
MockStatusPacket.present_position(idx, pos) for idx, pos in ids_values.items()
|
MockStatusPacket.present_position(id_, pos) for id_, pos in ids_values.items()
|
||||||
)
|
)
|
||||||
elif data_name == "Model_Number":
|
elif data_name == "Model_Number":
|
||||||
return_packets = b"".join(
|
return_packets = b"".join(
|
||||||
MockStatusPacket.model_number(idx, model_nb) for idx, model_nb in ids_values.items()
|
MockStatusPacket.model_number(id_, model_nb) for id_, model_nb in ids_values.items()
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
sync_read_response = self._build_send_fn(return_packets, num_invalid_try)
|
sync_read_response = self._build_send_fn(return_packets, num_invalid_try)
|
||||||
stub_name = f"Sync_Read_{data_name}_" + "_".join([str(idx) for idx in ids_values])
|
stub_name = f"Sync_Read_{data_name}_" + "_".join([str(id_) for id_ in ids_values])
|
||||||
self.stub(
|
self.stub(
|
||||||
name=stub_name,
|
name=stub_name,
|
||||||
receive_bytes=sync_read_request,
|
receive_bytes=sync_read_request,
|
||||||
@@ -409,7 +409,7 @@ class MockMotors(MockSerial):
|
|||||||
) -> str:
|
) -> str:
|
||||||
address, length = self.ctrl_table[data_name]
|
address, length = self.ctrl_table[data_name]
|
||||||
sync_read_request = MockInstructionPacket.sync_write(ids_values, address, length)
|
sync_read_request = MockInstructionPacket.sync_write(ids_values, address, length)
|
||||||
stub_name = f"Sync_Write_{data_name}_" + "_".join([str(idx) for idx in ids_values])
|
stub_name = f"Sync_Write_{data_name}_" + "_".join([str(id_) for id_ in ids_values])
|
||||||
self.stub(
|
self.stub(
|
||||||
name=stub_name,
|
name=stub_name,
|
||||||
receive_bytes=sync_read_request,
|
receive_bytes=sync_read_request,
|
||||||
|
|||||||
@@ -92,17 +92,17 @@ def test_abc_implementation(dummy_motors):
|
|||||||
DynamixelMotorsBus(port="/dev/dummy-port", motors=dummy_motors)
|
DynamixelMotorsBus(port="/dev/dummy-port", motors=dummy_motors)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("idx", [1, 2, 3])
|
@pytest.mark.parametrize("id_", [1, 2, 3])
|
||||||
def test_ping(idx, mock_motors, dummy_motors):
|
def test_ping(id_, mock_motors, dummy_motors):
|
||||||
expected_model_nb = MODEL_NUMBER[dummy_motors[f"dummy_{idx}"].model]
|
expected_model_nb = MODEL_NUMBER[dummy_motors[f"dummy_{id_}"].model]
|
||||||
stub_name = mock_motors.build_ping_stub(idx, expected_model_nb)
|
stub_name = mock_motors.build_ping_stub(id_, expected_model_nb)
|
||||||
motors_bus = DynamixelMotorsBus(
|
motors_bus = DynamixelMotorsBus(
|
||||||
port=mock_motors.port,
|
port=mock_motors.port,
|
||||||
motors=dummy_motors,
|
motors=dummy_motors,
|
||||||
)
|
)
|
||||||
motors_bus.connect(assert_motors_exist=False)
|
motors_bus.connect(assert_motors_exist=False)
|
||||||
|
|
||||||
ping_model_nb = motors_bus.ping(idx)
|
ping_model_nb = motors_bus.ping(id_)
|
||||||
|
|
||||||
assert ping_model_nb == expected_model_nb
|
assert ping_model_nb == expected_model_nb
|
||||||
assert mock_motors.stubs[stub_name].called
|
assert mock_motors.stubs[stub_name].called
|
||||||
|
|||||||
@@ -93,18 +93,18 @@ def test_abc_implementation(dummy_motors):
|
|||||||
|
|
||||||
|
|
||||||
# @pytest.mark.skip("TODO")
|
# @pytest.mark.skip("TODO")
|
||||||
@pytest.mark.parametrize("idx", [1, 2, 3])
|
@pytest.mark.parametrize("id_", [1, 2, 3])
|
||||||
def test_ping(idx, mock_motors, dummy_motors):
|
def test_ping(id_, mock_motors, dummy_motors):
|
||||||
expected_model_nb = MODEL_NUMBER[dummy_motors[f"dummy_{idx}"].model]
|
expected_model_nb = MODEL_NUMBER[dummy_motors[f"dummy_{id_}"].model]
|
||||||
ping_stub = mock_motors.build_ping_stub(idx)
|
ping_stub = mock_motors.build_ping_stub(id_)
|
||||||
mobel_nb_stub = mock_motors.build_read_stub("Model_Number", idx, expected_model_nb)
|
mobel_nb_stub = mock_motors.build_read_stub("Model_Number", id_, expected_model_nb)
|
||||||
motors_bus = FeetechMotorsBus(
|
motors_bus = FeetechMotorsBus(
|
||||||
port=mock_motors.port,
|
port=mock_motors.port,
|
||||||
motors=dummy_motors,
|
motors=dummy_motors,
|
||||||
)
|
)
|
||||||
motors_bus.connect(assert_motors_exist=False)
|
motors_bus.connect(assert_motors_exist=False)
|
||||||
|
|
||||||
ping_model_nb = motors_bus.ping(idx)
|
ping_model_nb = motors_bus.ping(id_)
|
||||||
|
|
||||||
assert ping_model_nb == expected_model_nb
|
assert ping_model_nb == expected_model_nb
|
||||||
assert mock_motors.stubs[ping_stub].called
|
assert mock_motors.stubs[ping_stub].called
|
||||||
|
|||||||
Reference in New Issue
Block a user