diff --git a/tests/mocks/mock_dynamixel.py b/tests/mocks/mock_dynamixel.py index 92e47c59..42fd85df 100644 --- a/tests/mocks/mock_dynamixel.py +++ b/tests/mocks/mock_dynamixel.py @@ -493,6 +493,37 @@ class MockMotors(MockSerial): ) return stub_name + def build_sequential_sync_read_stub( + self, data_name: str, ids_values: dict[int, list[int]] | None = None + ) -> str: + """ + 'data_name' supported: + - Present_Position + """ + sequence_length = len(next(iter(ids_values.values()))) + assert all(len(positions) == sequence_length for positions in ids_values.values()) + if data_name != "Present_Position": + raise NotImplementedError + + address, length = self.ctrl_table[data_name] + sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length) + sequential_packets = [] + for count in range(sequence_length): + return_packets = b"".join( + MockStatusPacket.present_position(id_, positions[count]) + for id_, positions in ids_values.items() + ) + sequential_packets.append(return_packets) + + sync_read_response = self._build_sequential_send_fn(sequential_packets) + stub_name = f"Seq_Sync_Read_{data_name}_" + "_".join([str(id_) for id_ in ids_values]) + self.stub( + name=stub_name, + receive_bytes=sync_read_request, + send_fn=sync_read_response, + ) + return stub_name + def build_sync_write_stub( self, data_name: str, ids_values: dict[int, int] | None = None, num_invalid_try: int = 0 ) -> str: @@ -528,3 +559,10 @@ class MockMotors(MockSerial): return packet return send_fn + + @staticmethod + def _build_sequential_send_fn(packets: list[bytes]) -> Callable[[int], bytes]: + def send_fn(_call_count: int) -> bytes: + return packets[_call_count - 1] + + return send_fn diff --git a/tests/mocks/mock_feetech.py b/tests/mocks/mock_feetech.py index c5fab0d9..473433bf 100644 --- a/tests/mocks/mock_feetech.py +++ b/tests/mocks/mock_feetech.py @@ -404,6 +404,37 @@ class MockMotors(MockSerial): ) return stub_name + def build_sequential_sync_read_stub( + self, data_name: str, ids_values: dict[int, list[int]] | None = None + ) -> str: + """ + 'data_name' supported: + - Present_Position + """ + sequence_length = len(next(iter(ids_values.values()))) + assert all(len(positions) == sequence_length for positions in ids_values.values()) + if data_name != "Present_Position": + raise NotImplementedError + + address, length = self.ctrl_table[data_name] + sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length) + sequential_packets = [] + for count in range(sequence_length): + return_packets = b"".join( + MockStatusPacket.present_position(id_, positions[count]) + for id_, positions in ids_values.items() + ) + sequential_packets.append(return_packets) + + sync_read_response = self._build_sequential_send_fn(sequential_packets) + stub_name = f"Seq_Sync_Read_{data_name}_" + "_".join([str(id_) for id_ in ids_values]) + self.stub( + name=stub_name, + receive_bytes=sync_read_request, + send_fn=sync_read_response, + ) + return stub_name + def build_sync_write_stub( self, data_name: str, ids_values: dict[int, int] | None = None, num_invalid_try: int = 0 ) -> str: @@ -439,3 +470,10 @@ class MockMotors(MockSerial): return packet return send_fn + + @staticmethod + def _build_sequential_send_fn(packets: list[bytes]) -> Callable[[int], bytes]: + def send_fn(_call_count: int) -> bytes: + return packets[_call_count - 1] + + return send_fn diff --git a/tests/motors/test_dynamixel.py b/tests/motors/test_dynamixel.py index b2483f5b..83fe23bc 100644 --- a/tests/motors/test_dynamixel.py +++ b/tests/motors/test_dynamixel.py @@ -1,12 +1,13 @@ import sys from typing import Generator -from unittest.mock import patch +from unittest.mock import MagicMock, patch import dynamixel_sdk as dxl import pytest from lerobot.common.motors import Motor, MotorNormMode from lerobot.common.motors.dynamixel import MODEL_NUMBER, DynamixelMotorsBus +from lerobot.common.utils.encoding_utils import encode_twos_complement from tests.mocks.mock_dynamixel import MockMotors, MockPortHandler @@ -432,3 +433,92 @@ def test_write_by_name(data_name, dxl_id, value, mock_motors, dummy_motors): motors_bus.write(data_name, f"dummy_{dxl_id}", value, normalize=False) assert mock_motors.stubs[stub_name].called + + +def test_reset_calibration(mock_motors, dummy_motors): + write_homing_stubs = [] + write_mins_stubs = [] + write_maxes_stubs = [] + for motor in dummy_motors.values(): + write_homing_stubs.append(mock_motors.build_write_stub("Homing_Offset", motor.id, 0)) + write_mins_stubs.append(mock_motors.build_write_stub("Min_Position_Limit", motor.id, 0)) + write_maxes_stubs.append(mock_motors.build_write_stub("Max_Position_Limit", motor.id, 4095)) + + motors_bus = DynamixelMotorsBus( + port=mock_motors.port, + motors=dummy_motors, + ) + motors_bus.connect(assert_motors_exist=False) + + motors_bus.reset_calibration() + + assert all(mock_motors.stubs[stub].called for stub in write_homing_stubs) + assert all(mock_motors.stubs[stub].called for stub in write_mins_stubs) + assert all(mock_motors.stubs[stub].called for stub in write_maxes_stubs) + + +def test_set_half_turn_homings(mock_motors, dummy_motors): + """ + For this test, we assume that the homing offsets are already 0 such that + Present_Position == Actual_Position + """ + current_positions = { + 1: 1337, + 2: 42, + 3: 3672, + } + expected_homings = { + 1: 710, # 2047 - 1337 + 2: 2005, # 2047 - 42 + 3: -1625, # 2047 - 3672 + } + read_pos_stub = mock_motors.build_sync_read_stub("Present_Position", current_positions) + write_homing_stubs = [] + for id_, homing in expected_homings.items(): + encoded_homing = encode_twos_complement(homing, 4) + stub = mock_motors.build_write_stub("Homing_Offset", id_, encoded_homing) + write_homing_stubs.append(stub) + + motors_bus = DynamixelMotorsBus( + port=mock_motors.port, + motors=dummy_motors, + ) + motors_bus.connect(assert_motors_exist=False) + motors_bus.reset_calibration = MagicMock() + + motors_bus.set_half_turn_homings() + + motors_bus.reset_calibration.assert_called_once() + assert mock_motors.stubs[read_pos_stub].called + assert all(mock_motors.stubs[stub].called for stub in write_homing_stubs) + + +def test_record_ranges_of_motion(mock_motors, dummy_motors): + positions = { + 1: [351, 42, 1337], + 2: [28, 3600, 2444], + 3: [4002, 2999, 146], + } + expected_mins = { + "dummy_1": 42, + "dummy_2": 28, + "dummy_3": 146, + } + expected_maxes = { + "dummy_1": 1337, + "dummy_2": 3600, + "dummy_3": 4002, + } + read_pos_stub = mock_motors.build_sequential_sync_read_stub("Present_Position", positions) + with patch("lerobot.common.motors.motors_bus.enter_pressed", side_effect=[False, True]): + motors_bus = DynamixelMotorsBus( + port=mock_motors.port, + motors=dummy_motors, + ) + motors_bus.connect(assert_motors_exist=False) + + mins, maxes = motors_bus.record_ranges_of_motion(display_values=False) + + assert mock_motors.stubs[read_pos_stub].calls == 3 + assert mins == expected_mins + assert maxes == expected_maxes diff --git a/tests/motors/test_feetech.py b/tests/motors/test_feetech.py index 023ce332..5a9298c2 100644 --- a/tests/motors/test_feetech.py +++ b/tests/motors/test_feetech.py @@ -1,12 +1,13 @@ import sys from typing import Generator -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest import scservo_sdk as scs from lerobot.common.motors import Motor, MotorNormMode from lerobot.common.motors.feetech import MODEL_NUMBER, FeetechMotorsBus +from lerobot.common.utils.encoding_utils import encode_sign_magnitude from tests.mocks.mock_feetech import MockMotors, MockPortHandler @@ -453,3 +454,92 @@ def test_write_by_name(data_name, dxl_id, value, mock_motors, dummy_motors): motors_bus.write(data_name, f"dummy_{dxl_id}", value, normalize=False) assert mock_motors.stubs[stub_name].called + + +def test_reset_calibration(mock_motors, dummy_motors): + write_homing_stubs = [] + write_mins_stubs = [] + write_maxes_stubs = [] + for motor in dummy_motors.values(): + write_homing_stubs.append(mock_motors.build_write_stub("Homing_Offset", motor.id, 0)) + write_mins_stubs.append(mock_motors.build_write_stub("Min_Position_Limit", motor.id, 0)) + write_maxes_stubs.append(mock_motors.build_write_stub("Max_Position_Limit", motor.id, 4095)) + + motors_bus = FeetechMotorsBus( + port=mock_motors.port, + motors=dummy_motors, + ) + motors_bus.connect(assert_motors_exist=False) + + motors_bus.reset_calibration() + + assert all(mock_motors.stubs[stub].called for stub in write_homing_stubs) + assert all(mock_motors.stubs[stub].called for stub in write_mins_stubs) + assert all(mock_motors.stubs[stub].called for stub in write_maxes_stubs) + + +def test_set_half_turn_homings(mock_motors, dummy_motors): + """ + For this test, we assume that the homing offsets are already 0 such that + Present_Position == Actual_Position + """ + current_positions = { + 1: 1337, + 2: 42, + 3: 3672, + } + expected_homings = { + 1: -710, # 1337 - 2047 + 2: -2005, # 42 - 2047 + 3: 1625, # 3672 - 2047 + } + read_pos_stub = mock_motors.build_sync_read_stub("Present_Position", current_positions) + write_homing_stubs = [] + for id_, homing in expected_homings.items(): + encoded_homing = encode_sign_magnitude(homing, 11) + stub = mock_motors.build_write_stub("Homing_Offset", id_, encoded_homing) + write_homing_stubs.append(stub) + + motors_bus = FeetechMotorsBus( + port=mock_motors.port, + motors=dummy_motors, + ) + motors_bus.connect(assert_motors_exist=False) + motors_bus.reset_calibration = MagicMock() + + motors_bus.set_half_turn_homings() + + motors_bus.reset_calibration.assert_called_once() + assert mock_motors.stubs[read_pos_stub].called + assert all(mock_motors.stubs[stub].called for stub in write_homing_stubs) + + +def test_record_ranges_of_motion(mock_motors, dummy_motors): + positions = { + 1: [351, 42, 1337], + 2: [28, 3600, 2444], + 3: [4002, 2999, 146], + } + expected_mins = { + "dummy_1": 42, + "dummy_2": 28, + "dummy_3": 146, + } + expected_maxes = { + "dummy_1": 1337, + "dummy_2": 3600, + "dummy_3": 4002, + } + read_pos_stub = mock_motors.build_sequential_sync_read_stub("Present_Position", positions) + with patch("lerobot.common.motors.motors_bus.enter_pressed", side_effect=[False, True]): + motors_bus = FeetechMotorsBus( + port=mock_motors.port, + motors=dummy_motors, + ) + motors_bus.connect(assert_motors_exist=False) + + mins, maxes = motors_bus.record_ranges_of_motion(display_values=False) + + assert mock_motors.stubs[read_pos_stub].calls == 3 + assert mins == expected_mins + assert maxes == expected_maxes