diff --git a/tests/mocks/mock_dynamixel.py b/tests/mocks/mock_dynamixel.py index 42fd85df..0d100bb1 100644 --- a/tests/mocks/mock_dynamixel.py +++ b/tests/mocks/mock_dynamixel.py @@ -1,5 +1,4 @@ import abc -import random from typing import Callable import dynamixel_sdk as dxl @@ -178,7 +177,7 @@ class MockInstructionPacket(MockDynamixelPacketv2): Helper class to build valid Dynamixel Protocol 2.0 Instruction Packets. Protocol 2.0 Instruction Packet structure - (from https://emanual.robotis.com/docs/en/dxl/protocol2/#instruction-packet) + https://emanual.robotis.com/docs/en/dxl/protocol2/#instruction-packet | Header | Packet ID | Length | Instruction | Params | CRC | | ------------------- | --------- | ----------- | ----------- | ----------------- | ----------- | @@ -206,7 +205,7 @@ class MockInstructionPacket(MockDynamixelPacketv2): ) -> bytes: """ Builds a "Ping" broadcast instruction. - (from https://emanual.robotis.com/docs/en/dxl/protocol2/#ping-0x01) + https://emanual.robotis.com/docs/en/dxl/protocol2/#ping-0x01 No parameters required. """ @@ -222,7 +221,7 @@ class MockInstructionPacket(MockDynamixelPacketv2): ) -> bytes: """ Builds a "Sync_Read" broadcast instruction. - (from https://emanual.robotis.com/docs/en/dxl/protocol2/#sync-read-0x82) + https://emanual.robotis.com/docs/en/dxl/protocol2/#sync-read-0x82 The parameters for Sync_Read (Protocol 2.0) are: param[0] = start_address L @@ -256,7 +255,7 @@ class MockInstructionPacket(MockDynamixelPacketv2): ) -> bytes: """ Builds a "Sync_Write" broadcast instruction. - (from https://emanual.robotis.com/docs/en/dxl/protocol2/#sync-write-0x83) + https://emanual.robotis.com/docs/en/dxl/protocol2/#sync-write-0x83 The parameters for Sync_Write (Protocol 2.0) are: param[0] = start_address L @@ -304,7 +303,7 @@ class MockInstructionPacket(MockDynamixelPacketv2): ) -> bytes: """ Builds a "Write" instruction. - (from https://emanual.robotis.com/docs/en/dxl/protocol2/#write-0x03) + https://emanual.robotis.com/docs/en/dxl/protocol2/#write-0x03 The parameters for Write (Protocol 2.0) are: param[0] = start_address L @@ -334,7 +333,7 @@ class MockStatusPacket(MockDynamixelPacketv2): Helper class to build valid Dynamixel Protocol 2.0 Status Packets. Protocol 2.0 Status Packet structure - (from https://emanual.robotis.com/docs/en/dxl/protocol2/#status-packet) + https://emanual.robotis.com/docs/en/dxl/protocol2/#status-packet | Header | Packet ID | Length | Instruction | Error | Params | CRC | | ------------------- | --------- | ----------- | ----------- | ----- | ----------------- | ----------- | @@ -357,8 +356,8 @@ class MockStatusPacket(MockDynamixelPacketv2): @classmethod def ping(cls, dxl_id: int, model_nb: int = 1190, firm_ver: int = 50) -> bytes: - """Builds a 'Ping' status packet. - + """ + Builds a 'Ping' status packet. https://emanual.robotis.com/docs/en/dxl/protocol2/#ping-0x01 Args: @@ -376,22 +375,22 @@ class MockStatusPacket(MockDynamixelPacketv2): return cls.build(dxl_id, params=params, length=length) @classmethod - def present_position(cls, dxl_id: int, pos: int | None = None, min_max_range: tuple = (0, 4095)) -> bytes: - """Builds a 'Present_Position' status packet. + def read(cls, dxl_id: int, value: int, param_length: int) -> bytes: + """ + Builds a 'Read' status packet (also works for 'Sync Read') + https://emanual.robotis.com/docs/en/dxl/protocol2/#read-0x02 + https://emanual.robotis.com/docs/en/dxl/protocol2/#sync-read-0x82 Args: dxl_id (int): ID of the servo responding. - pos (int | None, optional): Desired 'Present_Position' to be returned in the packet. If None, it - will use a random value in the min_max_range. Defaults to None. - min_max_range (tuple, optional): Min/max range to generate the position values used for when 'pos' - is None. Note that the bounds are included in the range. Defaults to (0, 4095). + value (int): Desired value to be returned in the packet. + param_length (int): The address length as reported in the control table. Returns: bytes: The raw 'Present_Position' status packet ready to be sent through serial. """ - pos = random.randint(*min_max_range) if pos is None else pos - params = [dxl.DXL_LOBYTE(pos), dxl.DXL_HIBYTE(pos), 0, 0] - length = 8 + params = DynamixelMotorsBus._split_int_to_bytes(value, param_length) + length = param_length + 4 return cls.build(dxl_id, params=params, length=length) @@ -472,18 +471,9 @@ class MockMotors(MockSerial): def build_sync_read_stub( self, data_name: str, ids_values: dict[int, int] | None = None, num_invalid_try: int = 0 ) -> str: - """ - 'data_name' supported: - - Present_Position - """ - if data_name != "Present_Position": - raise NotImplementedError - address, length = self.ctrl_table[data_name] sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length) - return_packets = b"".join( - MockStatusPacket.present_position(id_, pos) for id_, pos in ids_values.items() - ) + return_packets = b"".join(MockStatusPacket.read(id_, pos, length) for id_, pos in ids_values.items()) sync_read_response = self._build_send_fn(return_packets, num_invalid_try) stub_name = f"Sync_Read_{data_name}_" + "_".join([str(id_) for id_ in ids_values]) self.stub( @@ -496,22 +486,14 @@ class MockMotors(MockSerial): def build_sequential_sync_read_stub( self, data_name: str, ids_values: dict[int, list[int]] | None = None ) -> str: - """ - 'data_name' supported: - - Present_Position - """ sequence_length = len(next(iter(ids_values.values()))) assert all(len(positions) == sequence_length for positions in ids_values.values()) - if data_name != "Present_Position": - raise NotImplementedError - address, length = self.ctrl_table[data_name] sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length) sequential_packets = [] for count in range(sequence_length): return_packets = b"".join( - MockStatusPacket.present_position(id_, positions[count]) - for id_, positions in ids_values.items() + MockStatusPacket.read(id_, positions[count], length) for id_, positions in ids_values.items() ) sequential_packets.append(return_packets) diff --git a/tests/mocks/mock_feetech.py b/tests/mocks/mock_feetech.py index 291052ae..1636c113 100644 --- a/tests/mocks/mock_feetech.py +++ b/tests/mocks/mock_feetech.py @@ -1,5 +1,4 @@ import abc -import random from typing import Callable import scservo_sdk as scs @@ -248,40 +247,19 @@ class MockStatusPacket(MockFeetechPacket): return cls.build(scs_id, params=[], length=2, error=error) @classmethod - def present_position(cls, scs_id: int, pos: int | None = None, min_max_range: tuple = (0, 4095)) -> bytes: - """Builds a 'Present_Position' status packet. + def read(cls, scs_id: int, value: int, param_length: int) -> bytes: + """Builds a 'Read' status packet. Args: - scs_id (int): List of the servos ids. - pos (int | None, optional): Desired 'Present_Position' to be returned in the packet. If None, it - will use a random value in the min_max_range. Defaults to None. - min_max_range (tuple, optional): Min/max range to generate the position values used for when 'pos' - is None. Note that the bounds are included in the range. Defaults to (0, 4095). + scs_id (int): ID of the servo responding. + value (int): Desired value to be returned in the packet. + param_length (int): The address length as reported in the control table. Returns: - bytes: The raw 'Present_Position' status packet ready to be sent through serial. + bytes: The raw 'Sync Read' status packet ready to be sent through serial. """ - pos = random.randint(*min_max_range) if pos is None else pos - params = [scs.SCS_LOBYTE(pos), scs.SCS_HIBYTE(pos)] - length = 4 - return cls.build(scs_id, params=params, length=length) - - @classmethod - def model_number(cls, scs_id: int, model_nb: int | None = None) -> bytes: - """Builds a 'Present_Position' status packet. - - Args: - scs_id (int): List of the servos ids. - pos (int | None, optional): Desired 'Present_Position' to be returned in the packet. If None, it - will use a random value in the min_max_range. Defaults to None. - min_max_range (tuple, optional): Min/max range to generate the position values used for when 'pos' - is None. Note that the bounds are included in the range. Defaults to (0, 4095). - - Returns: - bytes: The raw 'Present_Position' status packet ready to be sent through serial. - """ - params = [scs.SCS_LOBYTE(model_nb), scs.SCS_HIBYTE(model_nb)] - length = 4 + params = FeetechMotorsBus._split_int_to_bytes(value, param_length) + length = param_length + 2 return cls.build(scs_id, params=params, length=length) @@ -368,7 +346,7 @@ class MockMotors(MockSerial): raise NotImplementedError address, length = self.ctrl_table[data_name] read_request = MockInstructionPacket.read(scs_id, address, length) - return_packet = MockStatusPacket.model_number(scs_id, value) + return_packet = MockStatusPacket.read(scs_id, value, length) read_response = self._build_send_fn(return_packet, num_invalid_try) stub_name = f"Read_{data_name}_{scs_id}" self.stub( @@ -381,23 +359,9 @@ class MockMotors(MockSerial): def build_sync_read_stub( self, data_name: str, ids_values: dict[int, int] | None = None, num_invalid_try: int = 0 ) -> str: - """ - 'data_name' supported: - - Present_Position - - Model_Number - """ address, length = self.ctrl_table[data_name] sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length) - if data_name == "Present_Position": - return_packets = b"".join( - MockStatusPacket.present_position(id_, pos) for id_, pos in ids_values.items() - ) - elif data_name == "Model_Number": - return_packets = b"".join( - MockStatusPacket.model_number(id_, model_nb) for id_, model_nb in ids_values.items() - ) - else: - raise NotImplementedError + return_packets = b"".join(MockStatusPacket.read(id_, pos, length) for id_, pos in ids_values.items()) sync_read_response = self._build_send_fn(return_packets, num_invalid_try) stub_name = f"Sync_Read_{data_name}_" + "_".join([str(id_) for id_ in ids_values]) @@ -411,22 +375,14 @@ class MockMotors(MockSerial): def build_sequential_sync_read_stub( self, data_name: str, ids_values: dict[int, list[int]] | None = None ) -> str: - """ - 'data_name' supported: - - Present_Position - """ sequence_length = len(next(iter(ids_values.values()))) assert all(len(positions) == sequence_length for positions in ids_values.values()) - if data_name != "Present_Position": - raise NotImplementedError - address, length = self.ctrl_table[data_name] sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length) sequential_packets = [] for count in range(sequence_length): return_packets = b"".join( - MockStatusPacket.present_position(id_, positions[count]) - for id_, positions in ids_values.items() + MockStatusPacket.read(id_, positions[count], length) for id_, positions in ids_values.items() ) sequential_packets.append(return_packets)