fix unit tests

This commit is contained in:
Remi Cadene
2024-09-26 13:51:45 +02:00
parent a236382590
commit 8b36223832
5 changed files with 106 additions and 117 deletions

View File

@@ -13,58 +13,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import traceback
import pytest
from lerobot.common.utils.utils import init_hydra_config
from tests.test_cameras import make_camera
from tests.test_motors import make_motors_bus
from tests.utils import DEVICE, ROBOT_CONFIG_PATH_TEMPLATE
from tests.utils import DEVICE
def pytest_collection_finish():
print(f"\nTesting with {DEVICE=}")
@pytest.fixture
def is_robot_available(robot_type):
try:
from lerobot.common.robot_devices.robots.factory import make_robot
config_path = ROBOT_CONFIG_PATH_TEMPLATE.format(robot=robot_type)
robot_cfg = init_hydra_config(config_path)
robot = make_robot(robot_cfg)
robot.connect()
del robot
return True
except Exception:
traceback.print_exc()
print(f"\nA {robot_type} robot is not available.")
return False
@pytest.fixture
def is_camera_available(camera_type):
try:
camera = make_camera(camera_type)
camera.connect()
del camera
return True
except Exception:
traceback.print_exc()
print(f"\nA {camera_type} camera is not available.")
return False
@pytest.fixture
def is_motor_available(motor_type):
try:
motors_bus = make_motors_bus(motor_type)
motors_bus.connect()
del motors_bus
return True
except Exception:
traceback.print_exc()
print(f"\nA {motor_type} motor is not available.")
return False

View File

@@ -21,17 +21,11 @@ pytest -sx 'tests/test_cameras.py::test_camera[intelrealsense-True]'
```
"""
import os
import numpy as np
import pytest
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from tests.utils import TEST_CAMERA_TYPES, require_camera
# Camera indices used for connecting physical cameras
OPENCV_CAMERA_INDEX = int(os.environ.get("LEROBOT_TEST_OPENCV_CAMERA_INDEX", 0))
INTELREALSENSE_CAMERA_INDEX = int(os.environ.get("LEROBOT_TEST_INTELREALSENSE_CAMERA_INDEX", 128422271614))
from tests.utils import TEST_CAMERA_TYPES, make_camera, require_camera
# Maximum absolute difference between two consecutive images recored by a camera.
# This value differs with respect to the camera.
@@ -42,23 +36,6 @@ def compute_max_pixel_difference(first_image, second_image):
return np.abs(first_image.astype(float) - second_image.astype(float)).max()
def make_camera(camera_type, **kwargs):
if camera_type == "opencv":
from lerobot.common.robot_devices.cameras.opencv import OpenCVCamera
camera_index = kwargs.pop("camera_index", OPENCV_CAMERA_INDEX)
return OpenCVCamera(camera_index, **kwargs)
elif camera_type == "intelrealsense":
from lerobot.common.robot_devices.cameras.intelrealsense import IntelRealSenseCamera
camera_index = kwargs.pop("camera_index", INTELREALSENSE_CAMERA_INDEX)
return IntelRealSenseCamera(camera_index, **kwargs)
else:
raise ValueError(f"The camera type '{camera_type}' is not valid.")
@pytest.mark.parametrize("camera_type, mock", TEST_CAMERA_TYPES)
@require_camera
def test_camera(request, camera_type, mock):

View File

@@ -30,31 +30,8 @@ import time
import numpy as np
import pytest
from lerobot.common.robot_devices.motors.utils import MotorsBus
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from tests.utils import TEST_MOTOR_TYPES, mock_input, require_motor
DYNAMIXEL_PORT = "/dev/tty.usbmodem575E0032081"
DYNAMIXEL_MOTORS = {
"shoulder_pan": [1, "xl430-w250"],
"shoulder_lift": [2, "xl430-w250"],
"elbow_flex": [3, "xl330-m288"],
"wrist_flex": [4, "xl330-m288"],
"wrist_roll": [5, "xl330-m288"],
"gripper": [6, "xl330-m288"],
}
def make_motors_bus(motor_type: str, **kwargs) -> MotorsBus:
if motor_type == "dynamixel":
from lerobot.common.robot_devices.motors.dynamixel import DynamixelMotorsBus
port = kwargs.pop("port", DYNAMIXEL_PORT)
motors = kwargs.pop("motors", DYNAMIXEL_MOTORS)
return DynamixelMotorsBus(port, motors, **kwargs)
else:
raise ValueError(f"The motor type '{motor_type}' is not valid.")
from tests.utils import TEST_MOTOR_TYPES, make_motors_bus, mock_input, require_motor
@pytest.mark.parametrize("motor_type, mock", TEST_MOTOR_TYPES)

View File

@@ -28,18 +28,8 @@ from pathlib import Path
import pytest
import torch
from lerobot.common.robot_devices.robots.factory import make_robot as make_robot_from_cfg
from lerobot.common.robot_devices.robots.utils import Robot
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from lerobot.common.utils.utils import init_hydra_config
from tests.utils import ROBOT_CONFIG_PATH_TEMPLATE, TEST_ROBOT_TYPES, require_robot
def make_robot(robot_type: str, overrides: list[str] | None = None) -> Robot:
config_path = ROBOT_CONFIG_PATH_TEMPLATE.format(robot=robot_type)
robot_cfg = init_hydra_config(config_path, overrides)
robot = make_robot_from_cfg(robot_cfg)
return robot
from tests.utils import TEST_ROBOT_TYPES, make_robot, require_robot
@pytest.mark.parametrize("robot_type, mock", TEST_ROBOT_TYPES)

View File

@@ -13,6 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import platform
import traceback
from functools import wraps
@@ -21,7 +22,12 @@ import pytest
import torch
from lerobot import available_cameras, available_motors, available_robots
from lerobot.common.robot_devices.cameras.utils import Camera
from lerobot.common.robot_devices.motors.utils import MotorsBus
from lerobot.common.robot_devices.robots.factory import make_robot as make_robot_from_cfg
from lerobot.common.robot_devices.robots.utils import Robot
from lerobot.common.utils.import_utils import is_package_available
from lerobot.common.utils.utils import init_hydra_config
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
@@ -42,6 +48,20 @@ TEST_MOTOR_TYPES = []
for motor_type in available_motors:
TEST_MOTOR_TYPES += [(motor_type, True), (motor_type, False)]
# Camera indices used for connecting physical cameras
OPENCV_CAMERA_INDEX = int(os.environ.get("LEROBOT_TEST_OPENCV_CAMERA_INDEX", 0))
INTELREALSENSE_CAMERA_INDEX = int(os.environ.get("LEROBOT_TEST_INTELREALSENSE_CAMERA_INDEX", 128422271614))
DYNAMIXEL_PORT = "/dev/tty.usbmodem575E0032081"
DYNAMIXEL_MOTORS = {
"shoulder_pan": [1, "xl430-w250"],
"shoulder_lift": [2, "xl430-w250"],
"elbow_flex": [3, "xl330-m288"],
"wrist_flex": [4, "xl330-m288"],
"wrist_roll": [5, "xl330-m288"],
"gripper": [6, "xl330-m288"],
}
def require_x86_64_kernel(func):
"""
@@ -184,7 +204,7 @@ def require_robot(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Access the pytest request context to get the is_robot_available fixture
# Access the pytest request context to get the mockeypatch fixture
request = kwargs.get("request")
robot_type = kwargs.get("robot_type")
mock = kwargs.get("mock")
@@ -219,8 +239,7 @@ def require_robot(func):
# Run test with a real robot. Skip test if robot connection fails.
else:
# `is_robot_available` is defined in `tests/conftest.py`
if not request.getfixturevalue("is_robot_available"):
if not is_robot_available(robot_type):
pytest.skip(f"A {robot_type} robot is not available.")
return func(*args, **kwargs)
@@ -231,7 +250,7 @@ def require_robot(func):
def require_camera(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Access the pytest request context to get the is_camera_available fixture
# Access the pytest request context to get the mockeypatch fixture
request = kwargs.get("request")
camera_type = kwargs.get("camera_type")
mock = kwargs.get("mock")
@@ -254,8 +273,7 @@ def require_camera(func):
# Run test with a real robot. Skip test if robot connection fails.
else:
# `is_camera_available` is defined in `tests/conftest.py`
if not request.getfixturevalue("is_camera_available"):
if not is_camera_available(camera_type):
pytest.skip(f"A {camera_type} camera is not available.")
return func(*args, **kwargs)
@@ -266,7 +284,7 @@ def require_camera(func):
def require_motor(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Access the pytest request context to get the is_motor_available fixture
# Access the pytest request context to get the mockeypatch fixture
request = kwargs.get("request")
motor_type = kwargs.get("motor_type")
mock = kwargs.get("mock")
@@ -289,8 +307,7 @@ def require_motor(func):
# Run test with a real robot. Skip test if robot connection fails.
else:
# `is_motor_available` is defined in `tests/conftest.py`
if not request.getfixturevalue("is_motor_available"):
if not is_motor_available(motor_type):
pytest.skip(f"A {motor_type} motor is not available.")
return func(*args, **kwargs)
@@ -371,3 +388,79 @@ def mock_motors(request):
except ImportError:
traceback.print_exc()
pytest.skip("To avoid skipping tests mocking dynamixel motors, run `pip install dynamixel-sdk`.")
def make_robot(robot_type: str, overrides: list[str] | None = None) -> Robot:
config_path = ROBOT_CONFIG_PATH_TEMPLATE.format(robot=robot_type)
robot_cfg = init_hydra_config(config_path, overrides)
robot = make_robot_from_cfg(robot_cfg)
return robot
def make_camera(camera_type, **kwargs) -> Camera:
if camera_type == "opencv":
from lerobot.common.robot_devices.cameras.opencv import OpenCVCamera
camera_index = kwargs.pop("camera_index", OPENCV_CAMERA_INDEX)
return OpenCVCamera(camera_index, **kwargs)
elif camera_type == "intelrealsense":
from lerobot.common.robot_devices.cameras.intelrealsense import IntelRealSenseCamera
camera_index = kwargs.pop("camera_index", INTELREALSENSE_CAMERA_INDEX)
return IntelRealSenseCamera(camera_index, **kwargs)
else:
raise ValueError(f"The camera type '{camera_type}' is not valid.")
def make_motors_bus(motor_type: str, **kwargs) -> MotorsBus:
if motor_type == "dynamixel":
from lerobot.common.robot_devices.motors.dynamixel import DynamixelMotorsBus
port = kwargs.pop("port", DYNAMIXEL_PORT)
motors = kwargs.pop("motors", DYNAMIXEL_MOTORS)
return DynamixelMotorsBus(port, motors, **kwargs)
else:
raise ValueError(f"The motor type '{motor_type}' is not valid.")
def is_robot_available(robot_type):
try:
from lerobot.common.robot_devices.robots.factory import make_robot
config_path = ROBOT_CONFIG_PATH_TEMPLATE.format(robot=robot_type)
robot_cfg = init_hydra_config(config_path)
robot = make_robot(robot_cfg)
robot.connect()
del robot
return True
except Exception:
traceback.print_exc()
print(f"\nA {robot_type} robot is not available.")
return False
def is_camera_available(camera_type):
try:
camera = make_camera(camera_type)
camera.connect()
del camera
return True
except Exception:
traceback.print_exc()
print(f"\nA {camera_type} camera is not available.")
return False
def is_motor_available(motor_type):
try:
motors_bus = make_motors_bus(motor_type)
motors_bus.connect()
del motors_bus
return True
except Exception:
traceback.print_exc()
print(f"\nA {motor_type} motor is not available.")
return False