From 10119c1a5955dc855c2f630caede7b8cdd5f0b2a Mon Sep 17 00:00:00 2001 From: Simon Alibert Date: Sun, 18 May 2025 11:51:47 +0200 Subject: [PATCH] Move MockMotorsBus --- tests/mocks/mock_motors_bus.py | 137 +++++++++++++++++++++++++++++++ tests/motors/test_motors_bus.py | 140 ++------------------------------ 2 files changed, 143 insertions(+), 134 deletions(-) create mode 100644 tests/mocks/mock_motors_bus.py diff --git a/tests/mocks/mock_motors_bus.py b/tests/mocks/mock_motors_bus.py new file mode 100644 index 00000000..2a35dfb3 --- /dev/null +++ b/tests/mocks/mock_motors_bus.py @@ -0,0 +1,137 @@ +# ruff: noqa: N802 + +from lerobot.common.motors.motors_bus import ( + Motor, + MotorsBus, +) + +DUMMY_CTRL_TABLE_1 = { + "Firmware_Version": (0, 1), + "Model_Number": (1, 2), + "Present_Position": (3, 4), + "Goal_Position": (11, 2), +} + +DUMMY_CTRL_TABLE_2 = { + "Model_Number": (0, 2), + "Firmware_Version": (2, 1), + "Present_Position": (3, 4), + "Present_Velocity": (7, 4), + "Goal_Position": (11, 4), + "Goal_Velocity": (15, 4), + "Lock": (19, 1), +} + +DUMMY_MODEL_CTRL_TABLE = { + "model_1": DUMMY_CTRL_TABLE_1, + "model_2": DUMMY_CTRL_TABLE_2, + "model_3": DUMMY_CTRL_TABLE_2, +} + +DUMMY_BAUDRATE_TABLE = { + 0: 1_000_000, + 1: 500_000, + 2: 250_000, +} + +DUMMY_MODEL_BAUDRATE_TABLE = { + "model_1": DUMMY_BAUDRATE_TABLE, + "model_2": DUMMY_BAUDRATE_TABLE, + "model_3": DUMMY_BAUDRATE_TABLE, +} + +DUMMY_ENCODING_TABLE = { + "Present_Position": 8, + "Goal_Position": 10, +} + +DUMMY_MODEL_ENCODING_TABLE = { + "model_1": DUMMY_ENCODING_TABLE, + "model_2": DUMMY_ENCODING_TABLE, + "model_3": DUMMY_ENCODING_TABLE, +} + +DUMMY_MODEL_NUMBER_TABLE = { + "model_1": 1234, + "model_2": 5678, + "model_3": 5799, +} + +DUMMY_MODEL_RESOLUTION_TABLE = { + "model_1": 4096, + "model_2": 1024, + "model_3": 4096, +} + + +class MockPortHandler: + def __init__(self, port_name): + self.is_open: bool = False + self.baudrate: int + self.packet_start_time: float + self.packet_timeout: float + self.tx_time_per_byte: float + self.is_using: bool = False + self.port_name: str = port_name + self.ser = None + + def openPort(self): + self.is_open = True + return self.is_open + + def closePort(self): + self.is_open = False + + def clearPort(self): ... + def setPortName(self, port_name): + self.port_name = port_name + + def getPortName(self): + return self.port_name + + def setBaudRate(self, baudrate): + self.baudrate: baudrate + + def getBaudRate(self): + return self.baudrate + + def getBytesAvailable(self): ... + def readPort(self, length): ... + def writePort(self, packet): ... + def setPacketTimeout(self, packet_length): ... + def setPacketTimeoutMillis(self, msec): ... + def isPacketTimeout(self): ... + def getCurrentTime(self): ... + def getTimeSinceStart(self): ... + def setupPort(self, cflag_baud): ... + def getCFlagBaud(self, baudrate): ... + + +class MockMotorsBus(MotorsBus): + available_baudrates = [500_000, 1_000_000] + default_timeout = 1000 + model_baudrate_table = DUMMY_MODEL_BAUDRATE_TABLE + model_ctrl_table = DUMMY_MODEL_CTRL_TABLE + model_encoding_table = DUMMY_MODEL_ENCODING_TABLE + model_number_table = DUMMY_MODEL_NUMBER_TABLE + model_resolution_table = DUMMY_MODEL_RESOLUTION_TABLE + normalized_data = ["Present_Position", "Goal_Position"] + + def __init__(self, port: str, motors: dict[str, Motor]): + super().__init__(port, motors) + self.port_handler = MockPortHandler(port) + + def _assert_protocol_is_compatible(self, instruction_name): ... + def _handshake(self): ... + def _find_single_motor(self, motor, initial_baudrate): ... + def configure_motors(self): ... + def read_calibration(self): ... + def write_calibration(self, calibration_dict): ... + def disable_torque(self, motors, num_retry): ... + def _disable_torque(self, motor, model, num_retry): ... + def enable_torque(self, motors, num_retry): ... + def _get_half_turn_homings(self, positions): ... + def _encode_sign(self, data_name, ids_values): ... + def _decode_sign(self, data_name, ids_values): ... + def _split_into_byte_chunks(self, value, length): ... + def broadcast_ping(self, num_retry, raise_on_error): ... diff --git a/tests/motors/test_motors_bus.py b/tests/motors/test_motors_bus.py index 9056e870..fab2d6a1 100644 --- a/tests/motors/test_motors_bus.py +++ b/tests/motors/test_motors_bus.py @@ -1,5 +1,3 @@ -# ruff: noqa: N802 - import re from unittest.mock import patch @@ -8,142 +6,16 @@ import pytest from lerobot.common.motors.motors_bus import ( Motor, MotorNormMode, - MotorsBus, assert_same_address, get_address, get_ctrl_table, ) - -DUMMY_CTRL_TABLE_1 = { - "Firmware_Version": (0, 1), - "Model_Number": (1, 2), - "Present_Position": (3, 4), - "Goal_Position": (11, 2), -} - -DUMMY_CTRL_TABLE_2 = { - "Model_Number": (0, 2), - "Firmware_Version": (2, 1), - "Present_Position": (3, 4), - "Present_Velocity": (7, 4), - "Goal_Position": (11, 4), - "Goal_Velocity": (15, 4), - "Lock": (19, 1), -} - -DUMMY_MODEL_CTRL_TABLE = { - "model_1": DUMMY_CTRL_TABLE_1, - "model_2": DUMMY_CTRL_TABLE_2, - "model_3": DUMMY_CTRL_TABLE_2, -} - -DUMMY_BAUDRATE_TABLE = { - 0: 1_000_000, - 1: 500_000, - 2: 250_000, -} - -DUMMY_MODEL_BAUDRATE_TABLE = { - "model_1": DUMMY_BAUDRATE_TABLE, - "model_2": DUMMY_BAUDRATE_TABLE, - "model_3": DUMMY_BAUDRATE_TABLE, -} - -DUMMY_ENCODING_TABLE = { - "Present_Position": 8, - "Goal_Position": 10, -} - -DUMMY_MODEL_ENCODING_TABLE = { - "model_1": DUMMY_ENCODING_TABLE, - "model_2": DUMMY_ENCODING_TABLE, - "model_3": DUMMY_ENCODING_TABLE, -} - -DUMMY_MODEL_NUMBER_TABLE = { - "model_1": 1234, - "model_2": 5678, - "model_3": 5799, -} - -DUMMY_MODEL_RESOLUTION_TABLE = { - "model_1": 4096, - "model_2": 1024, - "model_3": 4096, -} - - -class MockPortHandler: - def __init__(self, port_name): - self.is_open: bool = False - self.baudrate: int - self.packet_start_time: float - self.packet_timeout: float - self.tx_time_per_byte: float - self.is_using: bool = False - self.port_name: str = port_name - self.ser = None - - def openPort(self): - self.is_open = True - return self.is_open - - def closePort(self): - self.is_open = False - - def clearPort(self): ... - def setPortName(self, port_name): - self.port_name = port_name - - def getPortName(self): - return self.port_name - - def setBaudRate(self, baudrate): - self.baudrate: baudrate - - def getBaudRate(self): - return self.baudrate - - def getBytesAvailable(self): ... - def readPort(self, length): ... - def writePort(self, packet): ... - def setPacketTimeout(self, packet_length): ... - def setPacketTimeoutMillis(self, msec): ... - def isPacketTimeout(self): ... - def getCurrentTime(self): ... - def getTimeSinceStart(self): ... - def setupPort(self, cflag_baud): ... - def getCFlagBaud(self, baudrate): ... - - -class MockMotorsBus(MotorsBus): - available_baudrates = [500_000, 1_000_000] - default_timeout = 1000 - model_baudrate_table = DUMMY_MODEL_BAUDRATE_TABLE - model_ctrl_table = DUMMY_MODEL_CTRL_TABLE - model_encoding_table = DUMMY_MODEL_ENCODING_TABLE - model_number_table = DUMMY_MODEL_NUMBER_TABLE - model_resolution_table = DUMMY_MODEL_RESOLUTION_TABLE - normalized_data = ["Present_Position", "Goal_Position"] - - def __init__(self, port: str, motors: dict[str, Motor]): - super().__init__(port, motors) - self.port_handler = MockPortHandler(port) - - def _assert_protocol_is_compatible(self, instruction_name): ... - def _handshake(self): ... - def _find_single_motor(self, motor, initial_baudrate): ... - def configure_motors(self): ... - def read_calibration(self): ... - def write_calibration(self, calibration_dict): ... - def disable_torque(self, motors, num_retry): ... - def _disable_torque(self, motor, model, num_retry): ... - def enable_torque(self, motors, num_retry): ... - def _get_half_turn_homings(self, positions): ... - def _encode_sign(self, data_name, ids_values): ... - def _decode_sign(self, data_name, ids_values): ... - def _split_into_byte_chunks(self, value, length): ... - def broadcast_ping(self, num_retry, raise_on_error): ... +from tests.mocks.mock_motors_bus import ( + DUMMY_CTRL_TABLE_1, + DUMMY_CTRL_TABLE_2, + DUMMY_MODEL_CTRL_TABLE, + MockMotorsBus, +) @pytest.fixture