diff --git a/tests/conftest.py b/tests/conftest.py index 8c754cb3..ca8b75af 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -13,58 +13,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import traceback -import pytest -from lerobot.common.utils.utils import init_hydra_config -from tests.test_cameras import make_camera -from tests.test_motors import make_motors_bus -from tests.utils import DEVICE, ROBOT_CONFIG_PATH_TEMPLATE +from tests.utils import DEVICE def pytest_collection_finish(): print(f"\nTesting with {DEVICE=}") - - -@pytest.fixture -def is_robot_available(robot_type): - try: - from lerobot.common.robot_devices.robots.factory import make_robot - - config_path = ROBOT_CONFIG_PATH_TEMPLATE.format(robot=robot_type) - robot_cfg = init_hydra_config(config_path) - robot = make_robot(robot_cfg) - robot.connect() - del robot - return True - except Exception: - traceback.print_exc() - print(f"\nA {robot_type} robot is not available.") - return False - - -@pytest.fixture -def is_camera_available(camera_type): - try: - camera = make_camera(camera_type) - camera.connect() - del camera - return True - except Exception: - traceback.print_exc() - print(f"\nA {camera_type} camera is not available.") - return False - - -@pytest.fixture -def is_motor_available(motor_type): - try: - motors_bus = make_motors_bus(motor_type) - motors_bus.connect() - del motors_bus - return True - except Exception: - traceback.print_exc() - print(f"\nA {motor_type} motor is not available.") - return False diff --git a/tests/test_cameras.py b/tests/test_cameras.py index a5d30f6c..a2f2a42c 100644 --- a/tests/test_cameras.py +++ b/tests/test_cameras.py @@ -21,17 +21,11 @@ pytest -sx 'tests/test_cameras.py::test_camera[intelrealsense-True]' ``` """ -import os - import numpy as np import pytest from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError -from tests.utils import TEST_CAMERA_TYPES, require_camera - -# Camera indices used for connecting physical cameras -OPENCV_CAMERA_INDEX = int(os.environ.get("LEROBOT_TEST_OPENCV_CAMERA_INDEX", 0)) -INTELREALSENSE_CAMERA_INDEX = int(os.environ.get("LEROBOT_TEST_INTELREALSENSE_CAMERA_INDEX", 128422271614)) +from tests.utils import TEST_CAMERA_TYPES, make_camera, require_camera # Maximum absolute difference between two consecutive images recored by a camera. # This value differs with respect to the camera. @@ -42,23 +36,6 @@ def compute_max_pixel_difference(first_image, second_image): return np.abs(first_image.astype(float) - second_image.astype(float)).max() -def make_camera(camera_type, **kwargs): - if camera_type == "opencv": - from lerobot.common.robot_devices.cameras.opencv import OpenCVCamera - - camera_index = kwargs.pop("camera_index", OPENCV_CAMERA_INDEX) - return OpenCVCamera(camera_index, **kwargs) - - elif camera_type == "intelrealsense": - from lerobot.common.robot_devices.cameras.intelrealsense import IntelRealSenseCamera - - camera_index = kwargs.pop("camera_index", INTELREALSENSE_CAMERA_INDEX) - return IntelRealSenseCamera(camera_index, **kwargs) - - else: - raise ValueError(f"The camera type '{camera_type}' is not valid.") - - @pytest.mark.parametrize("camera_type, mock", TEST_CAMERA_TYPES) @require_camera def test_camera(request, camera_type, mock): diff --git a/tests/test_motors.py b/tests/test_motors.py index 10380330..672ecc6f 100644 --- a/tests/test_motors.py +++ b/tests/test_motors.py @@ -30,31 +30,8 @@ import time import numpy as np import pytest -from lerobot.common.robot_devices.motors.utils import MotorsBus from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError -from tests.utils import TEST_MOTOR_TYPES, mock_input, require_motor - -DYNAMIXEL_PORT = "/dev/tty.usbmodem575E0032081" -DYNAMIXEL_MOTORS = { - "shoulder_pan": [1, "xl430-w250"], - "shoulder_lift": [2, "xl430-w250"], - "elbow_flex": [3, "xl330-m288"], - "wrist_flex": [4, "xl330-m288"], - "wrist_roll": [5, "xl330-m288"], - "gripper": [6, "xl330-m288"], -} - - -def make_motors_bus(motor_type: str, **kwargs) -> MotorsBus: - if motor_type == "dynamixel": - from lerobot.common.robot_devices.motors.dynamixel import DynamixelMotorsBus - - port = kwargs.pop("port", DYNAMIXEL_PORT) - motors = kwargs.pop("motors", DYNAMIXEL_MOTORS) - return DynamixelMotorsBus(port, motors, **kwargs) - - else: - raise ValueError(f"The motor type '{motor_type}' is not valid.") +from tests.utils import TEST_MOTOR_TYPES, make_motors_bus, mock_input, require_motor @pytest.mark.parametrize("motor_type, mock", TEST_MOTOR_TYPES) diff --git a/tests/test_robots.py b/tests/test_robots.py index ebba8d38..cd49049f 100644 --- a/tests/test_robots.py +++ b/tests/test_robots.py @@ -28,18 +28,8 @@ from pathlib import Path import pytest import torch -from lerobot.common.robot_devices.robots.factory import make_robot as make_robot_from_cfg -from lerobot.common.robot_devices.robots.utils import Robot from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError -from lerobot.common.utils.utils import init_hydra_config -from tests.utils import ROBOT_CONFIG_PATH_TEMPLATE, TEST_ROBOT_TYPES, require_robot - - -def make_robot(robot_type: str, overrides: list[str] | None = None) -> Robot: - config_path = ROBOT_CONFIG_PATH_TEMPLATE.format(robot=robot_type) - robot_cfg = init_hydra_config(config_path, overrides) - robot = make_robot_from_cfg(robot_cfg) - return robot +from tests.utils import TEST_ROBOT_TYPES, make_robot, require_robot @pytest.mark.parametrize("robot_type, mock", TEST_ROBOT_TYPES) diff --git a/tests/utils.py b/tests/utils.py index dd2c904e..cff1970e 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -13,6 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import os import platform import traceback from functools import wraps @@ -21,7 +22,12 @@ import pytest import torch from lerobot import available_cameras, available_motors, available_robots +from lerobot.common.robot_devices.cameras.utils import Camera +from lerobot.common.robot_devices.motors.utils import MotorsBus +from lerobot.common.robot_devices.robots.factory import make_robot as make_robot_from_cfg +from lerobot.common.robot_devices.robots.utils import Robot from lerobot.common.utils.import_utils import is_package_available +from lerobot.common.utils.utils import init_hydra_config DEVICE = "cuda" if torch.cuda.is_available() else "cpu" @@ -42,6 +48,20 @@ TEST_MOTOR_TYPES = [] for motor_type in available_motors: TEST_MOTOR_TYPES += [(motor_type, True), (motor_type, False)] +# Camera indices used for connecting physical cameras +OPENCV_CAMERA_INDEX = int(os.environ.get("LEROBOT_TEST_OPENCV_CAMERA_INDEX", 0)) +INTELREALSENSE_CAMERA_INDEX = int(os.environ.get("LEROBOT_TEST_INTELREALSENSE_CAMERA_INDEX", 128422271614)) + +DYNAMIXEL_PORT = "/dev/tty.usbmodem575E0032081" +DYNAMIXEL_MOTORS = { + "shoulder_pan": [1, "xl430-w250"], + "shoulder_lift": [2, "xl430-w250"], + "elbow_flex": [3, "xl330-m288"], + "wrist_flex": [4, "xl330-m288"], + "wrist_roll": [5, "xl330-m288"], + "gripper": [6, "xl330-m288"], +} + def require_x86_64_kernel(func): """ @@ -184,7 +204,7 @@ def require_robot(func): @wraps(func) def wrapper(*args, **kwargs): - # Access the pytest request context to get the is_robot_available fixture + # Access the pytest request context to get the mockeypatch fixture request = kwargs.get("request") robot_type = kwargs.get("robot_type") mock = kwargs.get("mock") @@ -219,8 +239,7 @@ def require_robot(func): # Run test with a real robot. Skip test if robot connection fails. else: - # `is_robot_available` is defined in `tests/conftest.py` - if not request.getfixturevalue("is_robot_available"): + if not is_robot_available(robot_type): pytest.skip(f"A {robot_type} robot is not available.") return func(*args, **kwargs) @@ -231,7 +250,7 @@ def require_robot(func): def require_camera(func): @wraps(func) def wrapper(*args, **kwargs): - # Access the pytest request context to get the is_camera_available fixture + # Access the pytest request context to get the mockeypatch fixture request = kwargs.get("request") camera_type = kwargs.get("camera_type") mock = kwargs.get("mock") @@ -254,8 +273,7 @@ def require_camera(func): # Run test with a real robot. Skip test if robot connection fails. else: - # `is_camera_available` is defined in `tests/conftest.py` - if not request.getfixturevalue("is_camera_available"): + if not is_camera_available(camera_type): pytest.skip(f"A {camera_type} camera is not available.") return func(*args, **kwargs) @@ -266,7 +284,7 @@ def require_camera(func): def require_motor(func): @wraps(func) def wrapper(*args, **kwargs): - # Access the pytest request context to get the is_motor_available fixture + # Access the pytest request context to get the mockeypatch fixture request = kwargs.get("request") motor_type = kwargs.get("motor_type") mock = kwargs.get("mock") @@ -289,8 +307,7 @@ def require_motor(func): # Run test with a real robot. Skip test if robot connection fails. else: - # `is_motor_available` is defined in `tests/conftest.py` - if not request.getfixturevalue("is_motor_available"): + if not is_motor_available(motor_type): pytest.skip(f"A {motor_type} motor is not available.") return func(*args, **kwargs) @@ -371,3 +388,79 @@ def mock_motors(request): except ImportError: traceback.print_exc() pytest.skip("To avoid skipping tests mocking dynamixel motors, run `pip install dynamixel-sdk`.") + + +def make_robot(robot_type: str, overrides: list[str] | None = None) -> Robot: + config_path = ROBOT_CONFIG_PATH_TEMPLATE.format(robot=robot_type) + robot_cfg = init_hydra_config(config_path, overrides) + robot = make_robot_from_cfg(robot_cfg) + return robot + + +def make_camera(camera_type, **kwargs) -> Camera: + if camera_type == "opencv": + from lerobot.common.robot_devices.cameras.opencv import OpenCVCamera + + camera_index = kwargs.pop("camera_index", OPENCV_CAMERA_INDEX) + return OpenCVCamera(camera_index, **kwargs) + + elif camera_type == "intelrealsense": + from lerobot.common.robot_devices.cameras.intelrealsense import IntelRealSenseCamera + + camera_index = kwargs.pop("camera_index", INTELREALSENSE_CAMERA_INDEX) + return IntelRealSenseCamera(camera_index, **kwargs) + + else: + raise ValueError(f"The camera type '{camera_type}' is not valid.") + + +def make_motors_bus(motor_type: str, **kwargs) -> MotorsBus: + if motor_type == "dynamixel": + from lerobot.common.robot_devices.motors.dynamixel import DynamixelMotorsBus + + port = kwargs.pop("port", DYNAMIXEL_PORT) + motors = kwargs.pop("motors", DYNAMIXEL_MOTORS) + return DynamixelMotorsBus(port, motors, **kwargs) + + else: + raise ValueError(f"The motor type '{motor_type}' is not valid.") + + +def is_robot_available(robot_type): + try: + from lerobot.common.robot_devices.robots.factory import make_robot + + config_path = ROBOT_CONFIG_PATH_TEMPLATE.format(robot=robot_type) + robot_cfg = init_hydra_config(config_path) + robot = make_robot(robot_cfg) + robot.connect() + del robot + return True + except Exception: + traceback.print_exc() + print(f"\nA {robot_type} robot is not available.") + return False + + +def is_camera_available(camera_type): + try: + camera = make_camera(camera_type) + camera.connect() + del camera + return True + except Exception: + traceback.print_exc() + print(f"\nA {camera_type} camera is not available.") + return False + + +def is_motor_available(motor_type): + try: + motors_bus = make_motors_bus(motor_type) + motors_bus.connect() + del motors_bus + return True + except Exception: + traceback.print_exc() + print(f"\nA {motor_type} motor is not available.") + return False