This commit is contained in:
Remi Cadene
2024-09-10 18:30:39 +02:00
parent 44b8394365
commit 3bd5ea4d7a
8 changed files with 451 additions and 65 deletions

View File

@@ -20,7 +20,7 @@ from functools import wraps
import pytest
import torch
from lerobot import available_robots
from lerobot import available_cameras, available_motors, available_robots
from lerobot.common.utils.import_utils import is_package_available
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
@@ -31,6 +31,8 @@ DEFAULT_CONFIG_PATH = "lerobot/configs/default.yaml"
ROBOT_CONFIG_PATH_TEMPLATE = "lerobot/configs/robot/{robot}.yaml"
TEST_ROBOT_TYPES = available_robots + [f"mocked_{robot_type}" for robot_type in available_robots]
TEST_CAMERA_TYPES = available_cameras + [f"mocked_{camera_type}" for camera_type in available_cameras]
TEST_MOTOR_TYPES = available_motors + [f"mocked_{motor_type}" for motor_type in available_motors]
def require_x86_64_kernel(func):
@@ -178,32 +180,22 @@ def require_robot(func):
request = kwargs.get("request")
robot_type = kwargs.get("robot_type")
if robot_type is None:
raise ValueError("The 'robot_type' must be an argument of the test function.")
if robot_type not in TEST_ROBOT_TYPES:
raise ValueError(
f"The camera type '{robot_type}' is not valid. Expected one of these '{TEST_ROBOT_TYPES}"
)
if request is None:
raise ValueError("The 'request' fixture must be an argument of the test function.")
# Run test with a monkeypatched version of the robot devices.
if robot_type.startswith("mocked_"):
kwargs["robot_type"] = robot_type.replace("mocked_", "")
monkeypatch = request.getfixturevalue("monkeypatch")
try:
import cv2
from tests.mock_opencv import MockVideoCapture
monkeypatch.setattr(cv2, "VideoCapture", MockVideoCapture)
except ImportError:
traceback.print_exc()
try:
import pyrealsense2 as rs
# TODO(rcadene):
mock_pipeline = None
monkeypatch.setattr(rs, "pipeline", mock_pipeline)
except ImportError:
traceback.print_exc()
mock_cameras(request)
mock_motors(request)
# Run test with a real robot. Skip test if robot connection fails.
else:
@@ -214,3 +206,117 @@ def require_robot(func):
return func(*args, **kwargs)
return wrapper
def require_camera(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Access the pytest request context to get the is_camera_available fixture
request = kwargs.get("request")
camera_type = kwargs.get("camera_type")
if camera_type is None:
raise ValueError("The 'camera_type' must be an argument of the test function.")
if camera_type not in TEST_CAMERA_TYPES:
raise ValueError(
f"The camera type '{camera_type}' is not valid. Expected one of these '{TEST_CAMERA_TYPES}"
)
if request is None:
raise ValueError("The 'request' fixture must be an argument of the test function.")
# Run test with a monkeypatched version of the robot devices.
if camera_type.startswith("mocked_"):
kwargs["camera_type"] = camera_type.replace("mocked_", "")
mock_cameras(request)
# Run test with a real robot. Skip test if robot connection fails.
else:
# `is_camera_available` is defined in `tests/conftest.py`
if not request.getfixturevalue("is_camera_available"):
pytest.skip(f"A {camera_type} camera is not available.")
return func(*args, **kwargs)
return wrapper
def require_motor(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Access the pytest request context to get the is_motor_available fixture
request = kwargs.get("request")
motor_type = kwargs.get("motor_type")
if motor_type is None:
raise ValueError("The 'motor_type' must be an argument of the test function.")
if motor_type not in TEST_MOTOR_TYPES:
raise ValueError(
f"The motor type '{motor_type}' is not valid. Expected one of these '{TEST_MOTOR_TYPES}"
)
if request is None:
raise ValueError("The 'request' fixture must be an argument of the test function.")
# Run test with a monkeypatched version of the robot devices.
if motor_type.startswith("mocked_"):
kwargs["motor_type"] = motor_type.replace("mocked_", "")
mock_motors(request)
# Run test with a real robot. Skip test if robot connection fails.
else:
# `is_motor_available` is defined in `tests/conftest.py`
if not request.getfixturevalue("is_motor_available"):
pytest.skip(f"A {motor_type} motor is not available.")
return func(*args, **kwargs)
return wrapper
def mock_cameras(request):
monkeypatch = request.getfixturevalue("monkeypatch")
try:
import cv2
from tests.mock_opencv import MockVideoCapture
monkeypatch.setattr(cv2, "VideoCapture", MockVideoCapture)
except ImportError:
traceback.print_exc()
try:
import pyrealsense2 as rs
from tests.mock_intelrealsense import MockConfig, MockFormat, MockPipeline, MockStream
monkeypatch.setattr(rs, "config", MockConfig)
monkeypatch.setattr(rs, "pipeline", MockPipeline)
monkeypatch.setattr(rs, "stream", MockStream)
monkeypatch.setattr(rs, "format", MockFormat)
except ImportError:
traceback.print_exc()
def mock_motors(request):
monkeypatch = request.getfixturevalue("monkeypatch")
try:
import dynamixel_sdk
from tests.mock_dynamixel import (
MockGroupSyncRead,
MockGroupSyncWrite,
MockPacketHandler,
MockPortHandler,
)
monkeypatch.setattr(dynamixel_sdk, "GroupSyncRead", MockGroupSyncRead)
monkeypatch.setattr(dynamixel_sdk, "GroupSyncWrite", MockGroupSyncWrite)
monkeypatch.setattr(dynamixel_sdk, "PacketHandler", MockPacketHandler)
monkeypatch.setattr(dynamixel_sdk, "PortHandler", MockPortHandler)
except ImportError:
traceback.print_exc()