fix unit tests

This commit is contained in:
Remi Cadene
2024-09-26 16:28:08 +02:00
parent 395720a5de
commit 48be576cc6
6 changed files with 349 additions and 186 deletions

View File

@@ -14,9 +14,72 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import traceback
from tests.utils import DEVICE import pytest
from lerobot import available_cameras, available_motors, available_robots
from lerobot.common.utils.utils import init_hydra_config
from tests.utils import DEVICE, ROBOT_CONFIG_PATH_TEMPLATE, make_camera, make_motors_bus
def pytest_collection_finish(): def pytest_collection_finish():
print(f"\nTesting with {DEVICE=}") print(f"\nTesting with {DEVICE=}")
@pytest.fixture
def is_robot_available(robot_type):
if robot_type not in available_robots:
raise ValueError(
f"The robot type '{robot_type}' is not valid. Expected one of these '{available_robots}"
)
try:
from lerobot.common.robot_devices.robots.factory import make_robot
config_path = ROBOT_CONFIG_PATH_TEMPLATE.format(robot=robot_type)
robot_cfg = init_hydra_config(config_path)
robot = make_robot(robot_cfg)
robot.connect()
del robot
return True
except Exception:
traceback.print_exc()
print(f"\nA {robot_type} robot is not available.")
return False
@pytest.fixture
def is_camera_available(camera_type):
if camera_type not in available_cameras:
raise ValueError(
f"The camera type '{camera_type}' is not valid. Expected one of these '{available_cameras}"
)
try:
camera = make_camera(camera_type)
camera.connect()
del camera
return True
except Exception:
traceback.print_exc()
print(f"\nA {camera_type} camera is not available.")
return False
@pytest.fixture
def is_motor_available(motor_type):
if motor_type not in available_motors:
raise ValueError(
f"The motor type '{motor_type}' is not valid. Expected one of these '{available_motors}"
)
try:
motors_bus = make_motors_bus(motor_type)
motors_bus.connect()
del motors_bus
return True
except Exception:
traceback.print_exc()
print(f"\nA {motor_type} motor is not available.")
return False

View File

@@ -24,8 +24,9 @@ pytest -sx 'tests/test_cameras.py::test_camera[intelrealsense-True]'
import numpy as np import numpy as np
import pytest import pytest
from lerobot import available_cameras
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from tests.utils import TEST_CAMERA_TYPES, make_camera, require_camera from tests.utils import make_camera, require_camera, require_mock_camera
# Maximum absolute difference between two consecutive images recored by a camera. # Maximum absolute difference between two consecutive images recored by a camera.
# This value differs with respect to the camera. # This value differs with respect to the camera.
@@ -36,9 +37,7 @@ def compute_max_pixel_difference(first_image, second_image):
return np.abs(first_image.astype(float) - second_image.astype(float)).max() return np.abs(first_image.astype(float) - second_image.astype(float)).max()
@pytest.mark.parametrize("camera_type, mock", TEST_CAMERA_TYPES) def _test_camera(camera_type):
@require_camera
def test_camera(request, camera_type, mock):
"""Test assumes that `camera.read()` returns the same image when called multiple times in a row. """Test assumes that `camera.read()` returns the same image when called multiple times in a row.
So the environment should not change (you shouldnt be in front of the camera) and the camera should not be moving. So the environment should not change (you shouldnt be in front of the camera) and the camera should not be moving.
@@ -140,9 +139,7 @@ def test_camera(request, camera_type, mock):
del camera del camera
@pytest.mark.parametrize("camera_type, mock", TEST_CAMERA_TYPES) def _test_save_images_from_cameras(tmpdir, camera_type):
@require_camera
def test_save_images_from_cameras(tmpdir, request, camera_type, mock):
# TODO(rcadene): refactor # TODO(rcadene): refactor
if camera_type == "opencv": if camera_type == "opencv":
from lerobot.common.robot_devices.cameras.opencv import save_images_from_cameras from lerobot.common.robot_devices.cameras.opencv import save_images_from_cameras
@@ -150,3 +147,27 @@ def test_save_images_from_cameras(tmpdir, request, camera_type, mock):
from lerobot.common.robot_devices.cameras.intelrealsense import save_images_from_cameras from lerobot.common.robot_devices.cameras.intelrealsense import save_images_from_cameras
save_images_from_cameras(tmpdir, record_time_s=1) save_images_from_cameras(tmpdir, record_time_s=1)
@pytest.mark.parametrize("camera_type", available_cameras)
@require_mock_camera
def test_camera_mock(monkeypatch, camera_type):
_test_camera(camera_type)
@pytest.mark.parametrize("camera_type", available_cameras)
@require_camera
def test_camera(request, camera_type):
_test_camera(camera_type)
@pytest.mark.parametrize("camera_type", available_cameras)
@require_mock_camera
def test_save_images_from_cameras_mock(tmpdir, monkeypatch, camera_type):
_test_save_images_from_cameras(tmpdir, camera_type)
@pytest.mark.parametrize("camera_type", available_cameras)
@require_camera
def test_save_images_from_cameras(tmpdir, request, camera_type):
_test_save_images_from_cameras(tmpdir, camera_type)

View File

@@ -27,16 +27,15 @@ from pathlib import Path
import pytest import pytest
from lerobot import available_robots
from lerobot.common.policies.factory import make_policy from lerobot.common.policies.factory import make_policy
from lerobot.common.utils.utils import init_hydra_config from lerobot.common.utils.utils import init_hydra_config
from lerobot.scripts.control_robot import calibrate, get_available_arms, record, replay, teleoperate from lerobot.scripts.control_robot import calibrate, get_available_arms, record, replay, teleoperate
from tests.test_robots import make_robot from tests.test_robots import make_robot
from tests.utils import DEFAULT_CONFIG_PATH, DEVICE, TEST_ROBOT_TYPES, require_robot from tests.utils import DEFAULT_CONFIG_PATH, DEVICE, require_mock_robot, require_robot
@pytest.mark.parametrize("robot_type, mock", TEST_ROBOT_TYPES) def _test_teleoperate(robot_type):
@require_robot
def test_teleoperate(request, robot_type, mock):
robot = make_robot(robot_type) robot = make_robot(robot_type)
teleoperate(robot, teleop_time_s=1) teleoperate(robot, teleop_time_s=1)
teleoperate(robot, fps=30, teleop_time_s=1) teleoperate(robot, fps=30, teleop_time_s=1)
@@ -44,17 +43,13 @@ def test_teleoperate(request, robot_type, mock):
del robot del robot
@pytest.mark.parametrize("robot_type, mock", TEST_ROBOT_TYPES) def _test_calibrate(robot_type):
@require_robot
def test_calibrate(request, robot_type, mock):
robot = make_robot(robot_type) robot = make_robot(robot_type)
calibrate(robot, arms=get_available_arms(robot)) calibrate(robot, arms=get_available_arms(robot))
del robot del robot
@pytest.mark.parametrize("robot_type, mock", TEST_ROBOT_TYPES) def _test_record_without_cameras(tmpdir, robot_type):
@require_robot
def test_record_without_cameras(tmpdir, request, robot_type, mock):
root = Path(tmpdir) root = Path(tmpdir)
repo_id = "lerobot/debug" repo_id = "lerobot/debug"
@@ -73,9 +68,7 @@ def test_record_without_cameras(tmpdir, request, robot_type, mock):
) )
@pytest.mark.parametrize("robot_type, mock", TEST_ROBOT_TYPES) def _test_record_and_replay_and_policy(tmpdir, robot_type):
@require_robot
def test_record_and_replay_and_policy(tmpdir, request, robot_type, mock):
env_name = "koch_real" env_name = "koch_real"
policy_name = "act_koch_real" policy_name = "act_koch_real"
@@ -122,3 +115,51 @@ def test_record_and_replay_and_policy(tmpdir, request, robot_type, mock):
) )
del robot del robot
@pytest.mark.parametrize("robot_type", available_robots)
@require_mock_robot
def test_teleoperate_mock(monkeypatch, robot_type):
_test_teleoperate(robot_type)
@pytest.mark.parametrize("robot_type", available_robots)
@require_robot
def test_teleoperate(request, robot_type):
_test_teleoperate(robot_type)
@pytest.mark.parametrize("robot_type", available_robots)
@require_mock_robot
def test_calibrate_mock(monkeypatch, robot_type):
_test_calibrate(robot_type)
@pytest.mark.parametrize("robot_type", available_robots)
@require_robot
def test_calibrate(request, robot_type):
_test_calibrate(robot_type)
@pytest.mark.parametrize("robot_type", available_robots)
@require_mock_robot
def test_record_without_cameras_mock(tmpdir, monkeypatch, robot_type):
_test_record_without_cameras(tmpdir, robot_type)
@pytest.mark.parametrize("robot_type", available_robots)
@require_robot
def test_record_without_cameras(tmpdir, request, robot_type):
_test_record_without_cameras(tmpdir, robot_type)
@pytest.mark.parametrize("robot_type", available_robots)
@require_mock_robot
def test_record_and_replay_and_policy_mock(tmpdir, monkeypatch, robot_type):
_test_record_and_replay_and_policy(tmpdir, robot_type)
@pytest.mark.parametrize("robot_type", available_robots)
@require_robot
def test_record_and_replay_and_policy(tmpdir, request, robot_type):
_test_record_and_replay_and_policy(tmpdir, robot_type)

View File

@@ -30,34 +30,12 @@ import time
import numpy as np import numpy as np
import pytest import pytest
from lerobot import available_motors
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from tests.utils import TEST_MOTOR_TYPES, make_motors_bus, mock_input, require_motor from tests.utils import make_motors_bus, mock_builtins_input, require_mock_motor, require_motor
@pytest.mark.parametrize("motor_type, mock", TEST_MOTOR_TYPES) def _test_configure_motors_all_ids_1(motor_type):
@require_motor
def test_find_port(request, motor_type, mock):
from lerobot.common.robot_devices.motors.dynamixel import find_port
if mock:
# To run find_port without user input
monkeypatch = request.getfixturevalue("monkeypatch")
monkeypatch.setattr("builtins.input", mock_input)
with pytest.raises(OSError):
find_port()
else:
find_port()
@pytest.mark.parametrize("motor_type, mock", TEST_MOTOR_TYPES)
@require_motor
def test_configure_motors_all_ids_1(request, motor_type, mock):
if mock:
# To run find_port without user input
monkeypatch = request.getfixturevalue("monkeypatch")
monkeypatch.setattr("builtins.input", mock_input)
input("Are you sure you want to re-configure the motors? Press enter to continue...") input("Are you sure you want to re-configure the motors? Press enter to continue...")
# This test expect the configuration was already correct. # This test expect the configuration was already correct.
motors_bus = make_motors_bus(motor_type) motors_bus = make_motors_bus(motor_type)
@@ -74,9 +52,7 @@ def test_configure_motors_all_ids_1(request, motor_type, mock):
del motors_bus del motors_bus
@pytest.mark.parametrize("motor_type, mock", TEST_MOTOR_TYPES) def _test_motors_bus(request, motor_type, mock):
@require_motor
def test_motors_bus(request, motor_type, mock):
motors_bus = make_motors_bus(motor_type) motors_bus = make_motors_bus(motor_type)
# Test reading and writting before connecting raises an error # Test reading and writting before connecting raises an error
@@ -133,3 +109,46 @@ def test_motors_bus(request, motor_type, mock):
time.sleep(1) time.sleep(1)
new_values = motors_bus.read("Present_Position") new_values = motors_bus.read("Present_Position")
assert (new_values == values).all() assert (new_values == values).all()
@pytest.mark.parametrize("motor_type", available_motors)
@require_mock_motor
def test_find_port_mock(monkeypatch, motor_type):
from lerobot.common.robot_devices.motors.dynamixel import find_port
# To run find_port without user input
mock_builtins_input(monkeypatch)
with pytest.raises(OSError):
find_port()
@pytest.mark.parametrize("motor_type", available_motors)
@require_motor
def test_find_port(request, motor_type):
from lerobot.common.robot_devices.motors.dynamixel import find_port
find_port()
@pytest.mark.parametrize("motor_type", available_motors)
@require_mock_motor
def test_configure_motors_all_ids_1_mock(monkeypatch, motor_type):
_test_configure_motors_all_ids_1(motor_type)
@pytest.mark.parametrize("motor_type", available_motors)
@require_motor
def test_configure_motors_all_ids_1(request, motor_type):
_test_configure_motors_all_ids_1(motor_type)
@pytest.mark.parametrize("motor_type", available_motors)
@require_mock_motor
def test_motors_bus_mock(monkeypatch, motor_type):
_test_motors_bus(motor_type)
@pytest.mark.parametrize("motor_type", available_motors)
@require_motor
def test_motors_bus(request, motor_type):
_test_motors_bus(motor_type)

View File

@@ -28,13 +28,12 @@ from pathlib import Path
import pytest import pytest
import torch import torch
from lerobot import available_robots
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from tests.utils import TEST_ROBOT_TYPES, make_robot, require_robot from tests.utils import make_robot, require_mock_robot, require_robot
@pytest.mark.parametrize("robot_type, mock", TEST_ROBOT_TYPES) def _test_robot(tmpdir, robot_type, mock):
@require_robot
def test_robot(tmpdir, request, robot_type, mock):
# TODO(rcadene): measure fps in nightly? # TODO(rcadene): measure fps in nightly?
# TODO(rcadene): test logs # TODO(rcadene): test logs
# TODO(rcadene): add compatibility with other robots # TODO(rcadene): add compatibility with other robots
@@ -134,3 +133,15 @@ def test_robot(tmpdir, request, robot_type, mock):
for name in robot.cameras: for name in robot.cameras:
assert not robot.cameras[name].is_connected assert not robot.cameras[name].is_connected
del robot del robot
@pytest.mark.parametrize("robot_type", available_robots)
@require_mock_robot
def test_robot_mock(tmpdir, monkeypatch, robot_type):
_test_robot(tmpdir, robot_type, mock=True)
@pytest.mark.parametrize("robot_type", available_robots)
@require_robot
def test_robot(tmpdir, request, robot_type):
_test_robot(tmpdir, robot_type, mock=False)

View File

@@ -207,40 +207,15 @@ def require_robot(func):
# Access the pytest request context to get the mockeypatch fixture # Access the pytest request context to get the mockeypatch fixture
request = kwargs.get("request") request = kwargs.get("request")
robot_type = kwargs.get("robot_type") robot_type = kwargs.get("robot_type")
mock = kwargs.get("mock")
if robot_type is None: if robot_type is None:
raise ValueError("The 'robot_type' must be an argument of the test function.") raise ValueError("The 'robot_type' must be an argument of the test function.")
if request is None: if request is None:
raise ValueError("The 'request' fixture must be an argument of the test function.") raise ValueError("The 'request' fixture must be an argument of the test function.")
if mock is None:
raise ValueError("The 'mock' variable must be an argument of the test function.")
if robot_type not in available_robots:
raise ValueError(
f"The camera type '{robot_type}' is not valid. Expected one of these '{available_robots}"
)
# Run test with a monkeypatched version of the robot devices.
if mock:
# TODO(rcadene): redesign mocking to not have this hardcoded logic
if robot_type in ["koch", "koch_bimanual"]:
camera_type = "opencv"
elif robot_type == "aloha":
camera_type = "intelrealsense"
else:
camera_type = "all"
mock_cameras(request, camera_type)
mock_motors(request)
# To run calibration without user input
monkeypatch = request.getfixturevalue("monkeypatch")
monkeypatch.setattr("builtins.input", mock_input)
# Run test with a real robot. Skip test if robot connection fails. # Run test with a real robot. Skip test if robot connection fails.
else: if not request.getfixturevalue("is_robot_available"):
if not is_robot_available(robot_type): pytest.skip(f"A {robot_type} robot is not available.")
pytest.skip(f"A {robot_type} robot is not available.")
return func(*args, **kwargs) return func(*args, **kwargs)
@@ -250,31 +225,16 @@ def require_robot(func):
def require_camera(func): def require_camera(func):
@wraps(func) @wraps(func)
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
# Access the pytest request context to get the mockeypatch fixture
request = kwargs.get("request") request = kwargs.get("request")
camera_type = kwargs.get("camera_type") camera_type = kwargs.get("camera_type")
mock = kwargs.get("mock")
if request is None: if request is None:
raise ValueError("The 'request' fixture must be an argument of the test function.") raise ValueError("The 'request' fixture must be an argument of the test function.")
if camera_type is None: if camera_type is None:
raise ValueError("The 'camera_type' must be an argument of the test function.") raise ValueError("The 'camera_type' must be an argument of the test function.")
if mock is None:
raise ValueError("The 'mock' variable must be an argument of the test function.")
if camera_type not in available_cameras: if not request.getfixturevalue("is_camera_available"):
raise ValueError( pytest.skip(f"A {camera_type} camera is not available.")
f"The camera type '{camera_type}' is not valid. Expected one of these '{available_cameras}"
)
# Run test with a monkeypatched version of the robot devices.
if mock:
mock_cameras(request, camera_type)
# Run test with a real robot. Skip test if robot connection fails.
else:
if not is_camera_available(camera_type):
pytest.skip(f"A {camera_type} camera is not available.")
return func(*args, **kwargs) return func(*args, **kwargs)
@@ -287,44 +247,117 @@ def require_motor(func):
# Access the pytest request context to get the mockeypatch fixture # Access the pytest request context to get the mockeypatch fixture
request = kwargs.get("request") request = kwargs.get("request")
motor_type = kwargs.get("motor_type") motor_type = kwargs.get("motor_type")
mock = kwargs.get("mock")
if request is None: if request is None:
raise ValueError("The 'request' fixture must be an argument of the test function.") raise ValueError("The 'request' fixture must be an argument of the test function.")
if motor_type is None: if motor_type is None:
raise ValueError("The 'motor_type' must be an argument of the test function.") raise ValueError("The 'motor_type' must be an argument of the test function.")
if mock is None:
raise ValueError("The 'mock' variable must be an argument of the test function.")
if motor_type not in available_motors: if not request.getfixturevalue("is_motor_available"):
raise ValueError( pytest.skip(f"A {motor_type} motor is not available.")
f"The motor type '{motor_type}' is not valid. Expected one of these '{available_motors}"
)
# Run test with a monkeypatched version of the robot devices.
if mock:
mock_motors(request)
# Run test with a real robot. Skip test if robot connection fails.
else:
if not is_motor_available(motor_type):
pytest.skip(f"A {motor_type} motor is not available.")
return func(*args, **kwargs) return func(*args, **kwargs)
return wrapper return wrapper
def mock_input(text=None): def require_mock_robot(func):
if text is not None: """
print(text) Decorator over test function to mock the robot
The decorated function must have two arguments `monkeypatch` and `robot_type`.
Example of usage:
```python
@pytest.mark.parametrize(
"robot_type", ["koch", "aloha"]
)
@require_robot
def test_require_robot(request, robot_type):
pass
```
"""
@wraps(func)
def wrapper(*args, **kwargs):
# Access the pytest request context to get the mockeypatch fixture
monkeypatch = kwargs.get("monkeypatch")
robot_type = kwargs.get("robot_type")
if monkeypatch is None:
raise ValueError("The 'monkeypatch' fixture must be an argument of the test function.")
if robot_type is None:
raise ValueError("The 'robot_type' must be an argument of the test function.")
mock_robot(monkeypatch, robot_type)
return func(*args, **kwargs)
return wrapper
def mock_cameras(request, camera_type="all"): def require_mock_camera(func):
# TODO(rcadene): Redesign the mocking tests @wraps(func)
monkeypatch = request.getfixturevalue("monkeypatch") def wrapper(*args, **kwargs):
# Access the pytest request context to get the mockeypatch fixture
monkeypatch = kwargs.get("monkeypatch")
camera_type = kwargs.get("camera_type")
if camera_type in ["opencv", "all"]: if monkeypatch is None:
raise ValueError("The 'monkeypatch' fixture must be an argument of the test function.")
if camera_type is None:
raise ValueError("The 'camera_type' must be an argument of the test function.")
mock_camera(monkeypatch, camera_type)
return func(*args, **kwargs)
return wrapper
def require_mock_motor(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Access the pytest request context to get the mockeypatch fixture
monkeypatch = kwargs.get("monkeypatch")
motor_type = kwargs.get("motor_type")
if monkeypatch is None:
raise ValueError("The 'monkeypatch' fixture must be an argument of the test function.")
if motor_type is None:
raise ValueError("The 'motor_type' must be an argument of the test function.")
mock_motor(monkeypatch, motor_type)
return func(*args, **kwargs)
return wrapper
def mock_robot(monkeypatch, robot_type):
if robot_type not in available_robots:
raise ValueError(
f"The camera type '{robot_type}' is not valid. Expected one of these '{available_robots}"
)
if robot_type in ["koch", "koch_bimanual"]:
mock_camera(monkeypatch, "opencv")
mock_motor(monkeypatch, "dynamixel")
elif robot_type == "aloha":
mock_camera(monkeypatch, "intelrealsense")
mock_motor(monkeypatch, "dynamixel")
else:
raise NotImplementedError("Implement mocking logic for new robot.")
# To run calibration without user input
mock_builtins_input(monkeypatch)
def mock_camera(monkeypatch, camera_type):
if camera_type not in available_cameras:
raise ValueError(
f"The motor type '{camera_type}' is not valid. Expected one of these '{available_cameras}"
)
if camera_type == "opencv":
try: try:
import cv2 import cv2
@@ -335,7 +368,7 @@ def mock_cameras(request, camera_type="all"):
traceback.print_exc() traceback.print_exc()
pytest.skip("To avoid skipping tests mocking opencv cameras, run `pip install opencv-python`.") pytest.skip("To avoid skipping tests mocking opencv cameras, run `pip install opencv-python`.")
if camera_type in ["intelrealsense", "all"]: elif camera_type == "intelrealsense":
try: try:
import pyrealsense2 as rs import pyrealsense2 as rs
@@ -357,37 +390,52 @@ def mock_cameras(request, camera_type="all"):
pytest.skip( pytest.skip(
"To avoid skipping tests mocking intelrealsense cameras, run `pip install pyrealsense2`." "To avoid skipping tests mocking intelrealsense cameras, run `pip install pyrealsense2`."
) )
else:
raise NotImplementedError("Implement mocking logic for new camera.")
def mock_motors(request): def mock_motor(monkeypatch, motor_type):
# TODO(rcadene): Redesign the mocking tests if motor_type not in available_motors:
monkeypatch = request.getfixturevalue("monkeypatch") raise ValueError(
f"The motor type '{motor_type}' is not valid. Expected one of these '{available_motors}"
try:
import dynamixel_sdk
from tests.mock_dynamixel import (
MockGroupSyncRead,
MockGroupSyncWrite,
MockPacketHandler,
MockPortHandler,
mock_convert_to_bytes,
) )
monkeypatch.setattr(dynamixel_sdk, "GroupSyncRead", MockGroupSyncRead) if motor_type == "dynamixel":
monkeypatch.setattr(dynamixel_sdk, "GroupSyncWrite", MockGroupSyncWrite) try:
monkeypatch.setattr(dynamixel_sdk, "PacketHandler", MockPacketHandler) import dynamixel_sdk
monkeypatch.setattr(dynamixel_sdk, "PortHandler", MockPortHandler)
# Import dynamixel AFTER mocking dynamixel_sdk to use mocked classes from tests.mock_dynamixel import (
from lerobot.common.robot_devices.motors import dynamixel MockGroupSyncRead,
MockGroupSyncWrite,
MockPacketHandler,
MockPortHandler,
mock_convert_to_bytes,
)
# TODO(rcadene): remove need to mock `convert_to_bytes` by implemented the inverse transform monkeypatch.setattr(dynamixel_sdk, "GroupSyncRead", MockGroupSyncRead)
# `convert_bytes_to_value` monkeypatch.setattr(dynamixel_sdk, "GroupSyncWrite", MockGroupSyncWrite)
monkeypatch.setattr(dynamixel, "convert_to_bytes", mock_convert_to_bytes) monkeypatch.setattr(dynamixel_sdk, "PacketHandler", MockPacketHandler)
except ImportError: monkeypatch.setattr(dynamixel_sdk, "PortHandler", MockPortHandler)
traceback.print_exc()
pytest.skip("To avoid skipping tests mocking dynamixel motors, run `pip install dynamixel-sdk`.") # Import dynamixel AFTER mocking dynamixel_sdk to use mocked classes
from lerobot.common.robot_devices.motors import dynamixel
# TODO(rcadene): remove need to mock `convert_to_bytes` by implemented the inverse transform
# `convert_bytes_to_value`
monkeypatch.setattr(dynamixel, "convert_to_bytes", mock_convert_to_bytes)
except ImportError:
traceback.print_exc()
pytest.skip("To avoid skipping tests mocking dynamixel motors, run `pip install dynamixel-sdk`.")
else:
raise NotImplementedError("Implement mocking logic for new motor.")
def mock_builtins_input(monkeypatch):
def print_text(text=None):
if text is not None:
print(text)
monkeypatch.setattr("builtins.input", print_text)
def make_robot(robot_type: str, overrides: list[str] | None = None) -> Robot: def make_robot(robot_type: str, overrides: list[str] | None = None) -> Robot:
@@ -424,43 +472,3 @@ def make_motors_bus(motor_type: str, **kwargs) -> MotorsBus:
else: else:
raise ValueError(f"The motor type '{motor_type}' is not valid.") raise ValueError(f"The motor type '{motor_type}' is not valid.")
def is_robot_available(robot_type):
try:
from lerobot.common.robot_devices.robots.factory import make_robot
config_path = ROBOT_CONFIG_PATH_TEMPLATE.format(robot=robot_type)
robot_cfg = init_hydra_config(config_path)
robot = make_robot(robot_cfg)
robot.connect()
del robot
return True
except Exception:
traceback.print_exc()
print(f"\nA {robot_type} robot is not available.")
return False
def is_camera_available(camera_type):
try:
camera = make_camera(camera_type)
camera.connect()
del camera
return True
except Exception:
traceback.print_exc()
print(f"\nA {camera_type} camera is not available.")
return False
def is_motor_available(motor_type):
try:
motors_bus = make_motors_bus(motor_type)
motors_bus.connect()
del motors_bus
return True
except Exception:
traceback.print_exc()
print(f"\nA {motor_type} motor is not available.")
return False