From 4151630c2448aced5c8332aed2f9737b33f4b90d Mon Sep 17 00:00:00 2001 From: Remi Cadene Date: Thu, 12 Sep 2024 01:08:44 +0200 Subject: [PATCH] Mock dynamixel_sdk --- tests/mock_dynamixel.py | 47 ++++++++++++++++++++++++++++++++--------- tests/test_motors.py | 14 ++++++------ tests/utils.py | 8 +++++++ 3 files changed, 52 insertions(+), 17 deletions(-) diff --git a/tests/mock_dynamixel.py b/tests/mock_dynamixel.py index 16831a726..9a2e62612 100644 --- a/tests/mock_dynamixel.py +++ b/tests/mock_dynamixel.py @@ -1,9 +1,18 @@ from dynamixel_sdk import COMM_SUCCESS +DEFAULT_BAUDRATE = 9_600 -class PortHandler: + +def mock_convert_to_bytes(value, bytes): + del bytes # unused + return value + + +class MockPortHandler: def __init__(self, port): self.port = port + # default fresh from factory baudrate + self.baudrate = DEFAULT_BAUDRATE def openPort(self): # noqa: N802 return True @@ -14,35 +23,53 @@ class PortHandler: def setPacketTimeoutMillis(self, timeout_ms): # noqa: N802 del timeout_ms # unused + def getBaudRate(self): # noqa: N802 + return self.baudrate -class PacketHandler: + def setBaudRate(self, baudrate): # noqa: N802 + self.baudrate = baudrate + + +class MockPacketHandler: def __init__(self, protocol_version): del protocol_version # unused + # Use packet_handler.data to communicate across Read and Write + self.data = {} -class GroupSyncRead: +class MockGroupSyncRead: def __init__(self, port_handler, packet_handler, address, bytes): - pass + self.packet_handler = packet_handler def addParam(self, motor_index): # noqa: N802 - pass + if motor_index not in self.packet_handler.data: + # Initialize motor default values + self.packet_handler.data[motor_index] = { + # Key (int) are from X_SERIES_CONTROL_TABLE + 7: motor_index, # ID + 8: DEFAULT_BAUDRATE, # Baud_rate + 10: 0, # Drive_Mode + 64: 0, # Torque_Enable + 132: 0, # Present_Position + } def txRxPacket(self): # noqa: N802 return COMM_SUCCESS def getData(self, index, address, bytes): # noqa: N802 - return value # noqa: F821 + return self.packet_handler.data[index][address] -class GroupSyncWrite: +class MockGroupSyncWrite: def __init__(self, port_handler, packet_handler, address, bytes): - pass + self.packet_handler = packet_handler + self.address = address def addParam(self, index, data): # noqa: N802 - pass + self.changeParam(index, data) def txPacket(self): # noqa: N802 return COMM_SUCCESS def changeParam(self, index, data): # noqa: N802 - pass + self.packet_handler.data[index][self.address] = data diff --git a/tests/test_motors.py b/tests/test_motors.py index 63ee3f1db..dfdb33183 100644 --- a/tests/test_motors.py +++ b/tests/test_motors.py @@ -62,7 +62,7 @@ def make_motors_bus(motor_type: str, **kwargs) -> MotorsBus: # TODO(rcadene): implement mocked version of this test @pytest.mark.parametrize("motor_type", available_motors) @require_motor -def test_find_port(request, robot_type): +def test_find_port(request, motor_type): from lerobot.common.robot_devices.motors.dynamixel import find_port find_port() @@ -71,10 +71,10 @@ def test_find_port(request, robot_type): # TODO(rcadene): implement mocked version of this test @pytest.mark.parametrize("motor_type", available_motors) @require_motor -def test_configure_motors_all_ids_1(request, robot_type): +def test_configure_motors_all_ids_1(request, motor_type): input("Are you sure you want to re-configure the motors? Press enter to continue...") # This test expect the configuration was already correct. - motors_bus = make_motors_bus(robot_type) + motors_bus = make_motors_bus(motor_type) motors_bus.connect() motors_bus.write("Baud_Rate", [0] * len(motors_bus.motors)) motors_bus.set_bus_baudrate(9_600) @@ -82,7 +82,7 @@ def test_configure_motors_all_ids_1(request, robot_type): del motors_bus # Test configure - motors_bus = make_motors_bus(robot_type) + motors_bus = make_motors_bus(motor_type) motors_bus.connect() assert motors_bus.are_motors_configured() del motors_bus @@ -90,8 +90,8 @@ def test_configure_motors_all_ids_1(request, robot_type): @pytest.mark.parametrize("motor_type", TEST_MOTOR_TYPES) @require_motor -def test_motors_bus(request, robot_type): - motors_bus = make_motors_bus(robot_type) +def test_motors_bus(request, motor_type): + motors_bus = make_motors_bus(motor_type) # Test reading and writting before connecting raises an error with pytest.raises(RobotDeviceNotConnectedError): @@ -105,7 +105,7 @@ def test_motors_bus(request, robot_type): del motors_bus # Test connecting - motors_bus = make_motors_bus(robot_type) + motors_bus = make_motors_bus(motor_type) motors_bus.connect() # Test connecting twice raises an error diff --git a/tests/utils.py b/tests/utils.py index af8d7faf7..433be47da 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -312,11 +312,19 @@ def mock_motors(request): MockGroupSyncWrite, MockPacketHandler, MockPortHandler, + mock_convert_to_bytes, ) monkeypatch.setattr(dynamixel_sdk, "GroupSyncRead", MockGroupSyncRead) monkeypatch.setattr(dynamixel_sdk, "GroupSyncWrite", MockGroupSyncWrite) monkeypatch.setattr(dynamixel_sdk, "PacketHandler", MockPacketHandler) monkeypatch.setattr(dynamixel_sdk, "PortHandler", MockPortHandler) + + # Import dynamixel AFTER mocking dynamixel_sdk to use mocked classes + from lerobot.common.robot_devices.motors import dynamixel + + # TODO(rcadene): remove need to mock `convert_to_bytes` by implemented the inverse transform + # `convert_bytes_to_value` + monkeypatch.setattr(dynamixel, "convert_to_bytes", mock_convert_to_bytes) except ImportError: traceback.print_exc()