Mock dynamixel_sdk

This commit is contained in:
Remi Cadene
2024-09-12 01:08:44 +02:00
parent 3bd5ea4d7a
commit 4151630c24
3 changed files with 52 additions and 17 deletions

View File

@@ -1,9 +1,18 @@
from dynamixel_sdk import COMM_SUCCESS
DEFAULT_BAUDRATE = 9_600
class PortHandler:
def mock_convert_to_bytes(value, bytes):
del bytes # unused
return value
class MockPortHandler:
def __init__(self, port):
self.port = port
# default fresh from factory baudrate
self.baudrate = DEFAULT_BAUDRATE
def openPort(self): # noqa: N802
return True
@@ -14,35 +23,53 @@ class PortHandler:
def setPacketTimeoutMillis(self, timeout_ms): # noqa: N802
del timeout_ms # unused
def getBaudRate(self): # noqa: N802
return self.baudrate
class PacketHandler:
def setBaudRate(self, baudrate): # noqa: N802
self.baudrate = baudrate
class MockPacketHandler:
def __init__(self, protocol_version):
del protocol_version # unused
# Use packet_handler.data to communicate across Read and Write
self.data = {}
class GroupSyncRead:
class MockGroupSyncRead:
def __init__(self, port_handler, packet_handler, address, bytes):
pass
self.packet_handler = packet_handler
def addParam(self, motor_index): # noqa: N802
pass
if motor_index not in self.packet_handler.data:
# Initialize motor default values
self.packet_handler.data[motor_index] = {
# Key (int) are from X_SERIES_CONTROL_TABLE
7: motor_index, # ID
8: DEFAULT_BAUDRATE, # Baud_rate
10: 0, # Drive_Mode
64: 0, # Torque_Enable
132: 0, # Present_Position
}
def txRxPacket(self): # noqa: N802
return COMM_SUCCESS
def getData(self, index, address, bytes): # noqa: N802
return value # noqa: F821
return self.packet_handler.data[index][address]
class GroupSyncWrite:
class MockGroupSyncWrite:
def __init__(self, port_handler, packet_handler, address, bytes):
pass
self.packet_handler = packet_handler
self.address = address
def addParam(self, index, data): # noqa: N802
pass
self.changeParam(index, data)
def txPacket(self): # noqa: N802
return COMM_SUCCESS
def changeParam(self, index, data): # noqa: N802
pass
self.packet_handler.data[index][self.address] = data

View File

@@ -62,7 +62,7 @@ def make_motors_bus(motor_type: str, **kwargs) -> MotorsBus:
# TODO(rcadene): implement mocked version of this test
@pytest.mark.parametrize("motor_type", available_motors)
@require_motor
def test_find_port(request, robot_type):
def test_find_port(request, motor_type):
from lerobot.common.robot_devices.motors.dynamixel import find_port
find_port()
@@ -71,10 +71,10 @@ def test_find_port(request, robot_type):
# TODO(rcadene): implement mocked version of this test
@pytest.mark.parametrize("motor_type", available_motors)
@require_motor
def test_configure_motors_all_ids_1(request, robot_type):
def test_configure_motors_all_ids_1(request, motor_type):
input("Are you sure you want to re-configure the motors? Press enter to continue...")
# This test expect the configuration was already correct.
motors_bus = make_motors_bus(robot_type)
motors_bus = make_motors_bus(motor_type)
motors_bus.connect()
motors_bus.write("Baud_Rate", [0] * len(motors_bus.motors))
motors_bus.set_bus_baudrate(9_600)
@@ -82,7 +82,7 @@ def test_configure_motors_all_ids_1(request, robot_type):
del motors_bus
# Test configure
motors_bus = make_motors_bus(robot_type)
motors_bus = make_motors_bus(motor_type)
motors_bus.connect()
assert motors_bus.are_motors_configured()
del motors_bus
@@ -90,8 +90,8 @@ def test_configure_motors_all_ids_1(request, robot_type):
@pytest.mark.parametrize("motor_type", TEST_MOTOR_TYPES)
@require_motor
def test_motors_bus(request, robot_type):
motors_bus = make_motors_bus(robot_type)
def test_motors_bus(request, motor_type):
motors_bus = make_motors_bus(motor_type)
# Test reading and writting before connecting raises an error
with pytest.raises(RobotDeviceNotConnectedError):
@@ -105,7 +105,7 @@ def test_motors_bus(request, robot_type):
del motors_bus
# Test connecting
motors_bus = make_motors_bus(robot_type)
motors_bus = make_motors_bus(motor_type)
motors_bus.connect()
# Test connecting twice raises an error

View File

@@ -312,11 +312,19 @@ def mock_motors(request):
MockGroupSyncWrite,
MockPacketHandler,
MockPortHandler,
mock_convert_to_bytes,
)
monkeypatch.setattr(dynamixel_sdk, "GroupSyncRead", MockGroupSyncRead)
monkeypatch.setattr(dynamixel_sdk, "GroupSyncWrite", MockGroupSyncWrite)
monkeypatch.setattr(dynamixel_sdk, "PacketHandler", MockPacketHandler)
monkeypatch.setattr(dynamixel_sdk, "PortHandler", MockPortHandler)
# Import dynamixel AFTER mocking dynamixel_sdk to use mocked classes
from lerobot.common.robot_devices.motors import dynamixel
# TODO(rcadene): remove need to mock `convert_to_bytes` by implemented the inverse transform
# `convert_bytes_to_value`
monkeypatch.setattr(dynamixel, "convert_to_bytes", mock_convert_to_bytes)
except ImportError:
traceback.print_exc()